NeighbourAnt.java
| Index Score | ||
|---|---|---|
![]() |
![]() |
ants.p2p |
![]() |
![]() |
ANtsP2P |
View: Reasons, Metrics, Source Code
These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.
//******************************************************************
//******************************************************************
//********** ANts Peer To Peer Sources *************
//
// ANts P2P realizes a third generation P2P net. It protects your
// privacy while you are connected and makes you not trackable, hiding
// your identity (ip) and crypting everything you are sending/receiving
// from others.
// Copyright (C) 2004 Roberto Rossi
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
package ants.p2p;
import java.net.*;
import java.io.*;
import java.util.*;
import javax.crypto.*;
import ants.p2p.messages.*;
import ants.p2p.query.*;
import ants.p2p.http.*;
import ants.p2p.utils.net.*;
import org.apache.log4j.*;
import com.jcraft.jzlib.*;
public class NeighbourAnt extends Thread implements Neighbour{
Ant local;
String ip;
int port;
int remoteServerPort;
Cipher enc;
Cipher dec;
Socket s;
BandwidthFilterIn bfIn;
BandwidthFilterOut bfOut;
long queuedMessages = 0;
boolean failure = false;
boolean terminate = false;
boolean isRequirer;
long timeElapsed;
long lastProbedAt;
long lastActiveUploadTime = System.currentTimeMillis();
long lastActiveDownloadTime = System.currentTimeMillis();
final int pingSize = 32;
long queryInCheckpoint = 0;
long floodInCheckpoint = 0;
long floodOutCheckpoint = 0;
int receivedQueries = 0;
int receivedFloods = 0;
int routedQueries = 0;
int routedFloods = 0;
public static int bandwidthLimit = 30 * 1024;
public static int queryBandwidthLimit = 10 * 1024;
public static final int maxQueryRatePerMinute = 60;
Prober prober;
public static double totalCompressedSizeOut;
public static double totalUncompressionSizeOut;
public static double totalCompressedSizeIn;
public static double totalUncompressionSizeIn;
public static double totalQueryCompressedSizeOut;
public static double totalQueryCompressedSizeIn;
public static double totalDataCompressedSizeOut;
public static double totalDataCompressedSizeIn;
public static double totalControlCompressedSizeOut;
public static double totalControlCompressedSizeIn;
public static double totalDownloaded = 0;
public static double totalUploaded = 0;
static Logger _logger = Logger.getLogger(NeighbourAnt.class.getName());
private Object socketLock = new Object();
public NeighbourAnt(Ant local, String ip, int port, int remoteServerPort, Cipher enc,
Cipher dec, Socket s, boolean isRequirer, long timeElapsed) throws IOException {
this.s = s;
this.ip = ip;
this.port = port;
this.remoteServerPort = remoteServerPort;
this.enc = enc;
this.dec = dec;
this.local = local;
this.isRequirer=isRequirer;
this.timeElapsed = timeElapsed;
this.lastProbedAt = System.currentTimeMillis();
_logger.info(this.getIdent() +": Net probe done at "+new java.util.Date(this.lastProbedAt)+"["+this.timeElapsed+"]");
this.bfIn = new BandwidthFilterIn(s.getInputStream());
this.bfOut = new BandwidthFilterOut(s.getOutputStream());
this.setPriority(10);
}
public void incQueuedMessages(){
this.modifyQueuedMessages(1);
}
public void decQueuedMessages(){
this.modifyQueuedMessages(-1);
}
public long getQueuedMessages(){
return this.queuedMessages;
}
private synchronized void modifyQueuedMessages(int modifier){
this.queuedMessages += modifier;
}
public long getTimeElapsed(){
return this.timeElapsed;
}
public long getLastProbedAt(){
return this.lastProbedAt;
}
public long getLastActiveDownloadTime(){
return this.lastActiveDownloadTime;
}
public long getLastActiveUploadTime(){
return this.lastActiveUploadTime;
}
public void setLastActiveDownloadTime(){
this.lastActiveDownloadTime = System.currentTimeMillis();
}
public void setLastActiveUploadTime(){
this.lastActiveUploadTime = System.currentTimeMillis();
}
public boolean isConnected(){
if(!this.terminate && this.s != null)
return !this.s.isClosed();
else
return false;
}
public boolean isRequirer(){
return this.isRequirer;
}
public void terminate(){
try{
this.terminate = true;
_logger.info("Terminating neighbour "+this.getIdent()+" socket: "+s);
if (s != null) {
_logger.info("Closing "+this.getIdent()+" socket: "+s);
this.s.close();
_logger.info("Neighbour terminated ["+ this.s.isClosed() +"] " + this.getIdent());
}else{
_logger.info("Neighbour terminated (null socket) " + this.getIdent());
}
}catch(Exception ex){_logger.error("Neighbour terminate error",ex);}
}
public String getIdent(){
if(ip.startsWith("/"))
ip = ip.substring(1);
return ip+" "+port;
}
public Ant getAnt(){
return this.local;
}
public String getRemoteId() {
if(ip.startsWith("/"))
ip = ip.substring(1);
return ip+" "+this.remoteServerPort;
}
public boolean equals(Object o){
if(o instanceof NeighbourAnt){
String idThis;
String idO;
if(this.isRequirer())
idThis = this.getIdent();
else
idThis = this.getRemoteId();
if(((NeighbourAnt)o).isRequirer())
idO = ((NeighbourAnt)o).getIdent();
else
idO = ((NeighbourAnt)o).getRemoteId();
return idThis.equals(idO);
}
else
return o==this;
}
public SenderThread route(Message m){
SenderThread st = new SenderThread(m, this);
st.start();
return st;
}
public void send(Message m) throws Exception {
try {
if(this.terminate || this.s.isClosed()) {
if(!(m instanceof NetProbeMessage) &&
!(m instanceof NetModificationAlert) &&
!(m instanceof QueryMessage)){
_logger.info(this.getIdent() +
": Neighbour terminated, routing message to a different direction. " +
m);
local.activateNewRouterProcess(m, this.getIdent());
}
return;
}
CompressedByteArray cba = null;
ByteArrayOutputStream byteArrayOut = null;
if(m instanceof NetProbeMessage){
synchronized (socketLock) {
ObjectOutputStream soos = new ObjectOutputStream(bfOut);
_logger.info(this.getIdent() + ": Sending probe message at " +
new java.util.Date(System.currentTimeMillis()));
soos.writeObject(m);
soos.writeObject(new byte[this.pingSize]);
_logger.info(this.getIdent() + ": Sent probe message at " +
new java.util.Date(System.currentTimeMillis()));
//local.propertyChangeSupport.firePropertyChange("updatePacketCompressionStats", null, null);
return;
}
}
else {
//SealedObject sealedMessage;
_logger.debug(this.getIdent() + ": Routing message: ID " + m.getAck_Id() +
" Type: " + m.getType() + " From: " + " To: " +
m.getDest());
_logger.debug(this.getIdent() + ": Message sending ["+this.getQueuedMessages()+"]"+m.getType()+" "+m);//+" From: "+m.getSource()+" To: "+m.getDest());
//sealedMessage = new SealedObject(m, this.enc);
byteArrayOut = new ByteArrayOutputStream();
ZOutputStream zOut = new ZOutputStream(byteArrayOut, JZlib.Z_BEST_COMPRESSION);
ObjectOutputStream zOOs = new ObjectOutputStream(zOut);
//zOOs.writeObject(sealedMessage);
zOOs.writeObject(m);
NeighbourAnt.totalUncompressionSizeOut += zOut.getTotalIn();
zOut.close();
NeighbourAnt.totalCompressedSizeOut += byteArrayOut.toByteArray().
length;
updateOutStats(m, byteArrayOut.toByteArray().length);
cba = new CompressedByteArray(byteArrayOut.
toByteArray());
}
if(m instanceof QueryMessage){
QueryBandwidthFilterOut.getInstance().writeQuery(byteArrayOut.toByteArray().length);
}else{
DataBandwidthFilterOut.getInstance().writeData(byteArrayOut.toByteArray().length);
}
if (Thread.currentThread() instanceof SenderThread) {
SenderThread curThread = (SenderThread) Thread.currentThread();
if (!curThread.mustSend()) {
return;
}
}
if (this.terminate || this.s.isClosed()) {
if (! (m instanceof NetProbeMessage)) {
_logger.info(this.getIdent() +
": Neighbour terminated, routing message to a different direction. " +
m);
local.activateNewRouterProcess(m, this.getIdent());
}
return;
}
synchronized (socketLock) {
ObjectOutputStream oos = new ObjectOutputStream(bfOut);
oos.writeObject(cba);
_logger.debug(this.getIdent() + ": Message sent ["+this.getQueuedMessages()+"]"+m.getType()+" "+m);//+" From: "+m.getSource()+" To: "+m.getDest());
}
}
catch (Exception e) {
_logger.info(this.getIdent() +
": Socket is closed with neighbour: " +
this.getIdent(),e);
try{
if (s != null)
s.close();
}catch(Exception ex){}
}
}
public static void updateOutStats(Message m, long size){
if (m instanceof ControlMessage ||
m instanceof FileTransferEndControlMessage ||
m instanceof FileTransferErrorControlMessage ||
m instanceof FileSizePullErrorControlMessage ||
m instanceof FileInfosPullErrorControlMessage ||
m instanceof HttpTransferEndControlMessage ||
m instanceof HttpInterruptTransferMessage ||
m instanceof SecureConnectionErrorControlMessage ||
m instanceof NetModificationAlert) {
totalControlCompressedSizeOut += size;
}
else if (m instanceof SecurityRequestMessage ||
m instanceof SecurityResponseMessage ||
m instanceof FilePullMessage ||
m instanceof FilePushMessage ||
m instanceof FileInfosPullMessage ||
m instanceof FileInfosPushMessage ||
m instanceof HttpRequestMessage) {
totalControlCompressedSizeOut += size;
}
else if (m instanceof FileSizePullMessage ||
m instanceof FileSizePushMessage) {
totalControlCompressedSizeOut += size;
}
else if (m instanceof PrivateChatMessage) {
totalControlCompressedSizeOut += size;
}
else if (m instanceof QueryMessage) {
totalQueryCompressedSizeOut += size;
}
else if (m instanceof HttpResponsePartMessage) {
totalDataCompressedSizeOut += size;
}
else if (m instanceof FilePartMessage) {
totalDataCompressedSizeOut += size;
}
else if (m instanceof Message) {
totalDataCompressedSizeOut += size;
}
}
public static void updateInStats(Message m, long size){
if (m instanceof ControlMessage ||
m instanceof FileTransferEndControlMessage ||
m instanceof FileTransferErrorControlMessage ||
m instanceof FileSizePullErrorControlMessage ||
m instanceof FileInfosPullErrorControlMessage ||
m instanceof HttpTransferEndControlMessage ||
m instanceof HttpInterruptTransferMessage ||
m instanceof SecureConnectionErrorControlMessage ||
m instanceof NetModificationAlert) {
totalControlCompressedSizeIn += size;
}
else if (m instanceof SecurityRequestMessage ||
m instanceof SecurityResponseMessage ||
m instanceof FilePullMessage ||
m instanceof FilePushMessage ||
m instanceof FileInfosPullMessage ||
m instanceof FileInfosPushMessage ||
m instanceof HttpRequestMessage) {
totalControlCompressedSizeIn += size;
}
else if (m instanceof FileSizePullMessage ||
m instanceof FileSizePushMessage) {
totalControlCompressedSizeIn += size;
}
else if (m instanceof PrivateChatMessage) {
totalControlCompressedSizeIn += size;
}
else if (m instanceof QueryMessage) {
totalQueryCompressedSizeIn += size;
}
else if (m instanceof HttpResponsePartMessage) {
totalDataCompressedSizeIn += size;
}
else if (m instanceof FilePartMessage) {
totalDataCompressedSizeIn += size;
}
else if (m instanceof Message) {
totalDataCompressedSizeIn += size;
}
}
/*
public void checkProbeTime(){
_logger.info(this+": Local time elapsed: " + this.getTimeElapsed() + "[Thresold: " +
local.getRateThresold() + "]");
if (local.getUnderRatedNeighbours() >= Math.floor(Ant.maxNeighbours * Ant.underRateConnections) &&
this.getTimeElapsed() >= local.getRateThresold()) {
this.terminate = true;
_logger.info("Rejected neighbour cause it doesn't satisfy bandwith request: ["+local.getUnderRatedNeighbours()+"/"+Math.floor(Ant.maxNeighbours*Ant.underRateConnections)+"]");
}
}*/
/*boolean checkNeighboursQueues(){
boolean allowReceive = true;
List neighbours = this.local.getNeighbours();
for(int x = 0; x < neighbours.size(); x++){
if(((NeighbourAnt)neighbours.get(x)).getQueuedMessages() > NeighbourAnt.maxQueuedMessage)
allowReceive = false;
}
return allowReceive;
}*/
public void run() {
try {
this.prober = new Prober(this);
this.prober.start();
while (!local.isDisconnected() && !terminate) {
Message m = null;
//_logger.info(this.getIdent() + ": Waiting for message to route...");
ObjectInputStream ois;
Object obj;
ois = new ObjectInputStream(this.bfIn);
obj = ois.readObject();
long size = 0;
if (obj instanceof CompressedByteArray) {
CompressedByteArray cba = (CompressedByteArray)obj;
ByteArrayInputStream in = new ByteArrayInputStream(cba.getArray());
ZInputStream zIn = new ZInputStream(in);
ObjectInputStream objIn = new ObjectInputStream(zIn);
obj = objIn.readObject();
NeighbourAnt.totalCompressedSizeIn += zIn.getTotalIn();
NeighbourAnt.totalUncompressionSizeIn += zIn.getTotalOut();
size += zIn.getTotalIn();
}else if (obj instanceof NetProbeMessage) {
long startProbe = System.currentTimeMillis();
ois.readObject();
this.timeElapsed = System.currentTimeMillis() - startProbe;
this.lastProbedAt = System.currentTimeMillis();
_logger.info(this.getIdent() +": Net probe done at "+new java.util.Date(this.lastProbedAt)+"["+this.timeElapsed+"]");
//this.checkProbeTime();
continue;
}else{
_logger.debug(obj.toString());
continue;
}
//if (obj instanceof SealedObject) {
//obj = ( (SealedObject) obj).getObject(dec);
if (obj instanceof NetModificationAlert && ((NetModificationAlert) obj).getVersion().equals(Ant.getProtocolVersion())){
this.local.processNetModificationAlert( (NetModificationAlert) obj, this);
}
else if (obj instanceof Message && ((Message) obj).getVersion().equals(Ant.getProtocolVersion())) {
m = (Message) obj;
updateInStats(m, size);
_logger.debug(this.getIdent() + ": Message received ID "+m.getAck_Id().substring(0,10)+" From: "+
((m.getSource().length() > 10) ? m.getSource().substring(0,10) : m.getSource()) +" To: "+
((m.getDest().length() > 10) ? m.getDest().substring(0,10) : m.getDest()));
if(m.getType() == 2){
long now = System.currentTimeMillis();
if(now - queryInCheckpoint > 60000){
queryInCheckpoint = now;
receivedQueries = 0;
}else{
receivedQueries++;
if(receivedQueries > maxQueryRatePerMinute){
_logger.info(this +" Query flood detected. Discarding... " +
m);
continue;
}
}
}
local.activateNewRouterProcess(m, this.getIdent());
}
else
_logger.debug(obj.toString());
//}
}
if (!failure) {
failure = true;
_logger.info(local.getShortId() +
": 1Closed connection with neighbour: " +
this.getIdent());
try{
if (s != null)
s.close();
}catch(IOException ex){}
local.removeNeighbour(this);
local.propertyChangeSupport.firePropertyChange("removedNeighbour", null, this);
}
}
catch (Exception e) {
if (!failure) {
failure = true;
_logger.info(local.getShortId() +
": 2Closed connection with neighbour: " +
this.getIdent(),e);
_logger.debug("NeighbourAnt connection failed",e);
try{
if (s != null)
s.close();
}catch(IOException ex){}
local.removeNeighbour(this);
local.propertyChangeSupport.firePropertyChange("removedNeighbour", null, this);
}
}
}
public void setFailure(){
this.failure = true;
}
public String toString(){
return this.getIdent()+" ["+this.getTimeElapsed()+"ms]["+this.queuedMessages+"]";
}
public static int getTotalCompressionGainOut(){
return (int)Math.ceil((1-(NeighbourAnt.totalCompressedSizeOut / NeighbourAnt.totalUncompressionSizeOut))*100);
}
public static int getTotalCompressionGainIn(){
return (int)Math.ceil((1-(NeighbourAnt.totalCompressedSizeIn / NeighbourAnt.totalUncompressionSizeIn))*100);
}
}
class CompressedByteArray implements Serializable{
byte[] array;
public CompressedByteArray(byte[] array){
this.array = array;
}
public byte[] getArray(){
return this.array;
}
}
class Prober extends Thread{
NeighbourAnt n;
public static long interval = 3*60*1000;
static Logger _logger = Logger.getLogger(Prober.class.getName());
public Prober(NeighbourAnt n){
this.n = n;
this.setPriority(10);
}
public void run(){
while(this.n != null && this.n.isConnected()){
try {
this.sleep(interval);
if(!this.n.getAnt().neighbours.contains(this.n)){
n.terminate();
return;
}
_logger.info(this.n.getIdent() +": Scheduled probe message at "+new java.util.Date(System.currentTimeMillis()));
this.n.send(new NetProbeMessage());
}catch(Exception e){
n.terminate();
_logger.error("Neighbour Prober error",e);
}
}
}
}
The table below shows all metrics for NeighbourAnt.java.




