Ant.java
| Index Score | ||
|---|---|---|
![]() |
![]() |
ants.p2p |
![]() |
![]() |
ANtsP2P |
View: Reasons, Metrics, Source Code
These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.
//******************************************************************
//******************************************************************
//********** ANts Peer To Peer Sources *************
//
// ANts P2P realizes a third generation P2P net. It protects your
// privacy while you are connected and makes you not trackable, hiding
// your identity (ip) and crypting everything you are sending/receiving
// from others.
// Copyright (C) 2004 Roberto Rossi
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
package ants.p2p;
import java.util.*;
import java.net.*;
import java.beans.*;
import java.security.*;
import ants.p2p.security.sockets.*;
import ants.p2p.messages.*;
import ants.p2p.messages.security.*;
import ants.p2p.utils.indexer.*;
import ants.p2p.utils.encoding.*;
import ants.p2p.query.*;
import ants.p2p.exceptions.*;
import ants.p2p.http.*;
import org.apache.log4j.*;
import ants.p2p.utils.net.*;
public class Ant
extends Thread {
private static final String protocolVersion = "1.0.0";
private static final String applicationName = "ANtsP2P";
private static final String applicationVersion = "beta1.6.0";
private static final String netName = "ANtsNet";
private static Logger _logger = Logger.getLogger(Ant.class.getName());
int serverPort = 443;
public static String ConnectionType = "56K";
public static int myMessageSizeMax = 10000;
public static int inTransitMessageSizeMax = 10000;
public static long routeInactiveTimeout = 10*60*1000;
public static int maxNeighbours = 15;
java.util.List neighbours = new ArrayList();//Collections.synchronizedList(new ArrayList());
java.util.Hashtable netModifications = new java.util.Hashtable();
public java.util.List myMessages = new ArrayList()/*Collections.synchronizedList(new ArrayList())*/;
public java.util.List failedMessages = new ArrayList()/*Collections.synchronizedList(new ArrayList())*/;
public Hashtable routingTable = new Hashtable();
public java.util.List inTransitMessages = new ArrayList()/*Collections.synchronizedList(new ArrayList())*/;
protected SecureServer ss;
public static long messageTimeout = 180000;
public static int maxRetransmissions = 3;
public static int maxRetransmissionsForceDirection = 2;
public static int maxFailedMessageToTrace = 100;
public static int beingRoutedMessages = 0;
public static int maxMessagesToRouteToghether = 50;
public static int netModificationsTimeout = 60*1000*5;
public static int probeCheckInterval = 60*1000*6;
public static double underRateConnections = 2.0/3.0;
boolean terminate = false;
protected boolean acceptTCPDirectConnections = false;
String localInetAddress = null;
public PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
String id = null;
public int localNattedPort = 0;
public String localNattedInetAddress = null;
public static boolean proxied = false;
boolean upnp;
public Ant(String id, int maxNeighbours, int serverPort, boolean acceptDC, String localInetAddress, boolean UPnP) {
this.id = id;
this.upnp = UPnP;
this.acceptTCPDirectConnections = acceptDC;
this.localInetAddress = localInetAddress;
this.serverPort = serverPort;
if (maxNeighbours >= 1) {
this.setMaxNeighbours(maxNeighbours);
}
this.setPriority(6);
this.start();
}
public static void setProxied(InetAddress proxyAddress, int proxyPort){
System.setProperty("http.proxySet","true");
System.setProperty("http.proxyPort",proxyPort+"");
System.setProperty("http.proxyHost",proxyAddress.getHostAddress());
System.setProperty("https.proxyHost", proxyAddress.getHostAddress());
System.setProperty("https.proxyPort", proxyPort+"");
proxied = true;
}
public static void setDirectConnection(){
System.setProperty("http.proxySet","false");
proxied = false;
}
public boolean acceptTCPDirectConnections(){
return this.acceptTCPDirectConnections;
}
public String getLocalInetAddress(){
if(this.acceptTCPDirectConnections){
try{
if (!this.isInternetPublicAddress(InetAddress.getByName(this.localInetAddress))){
return "";
}else{
return this.localInetAddress + ":" + (this.serverPort);
}
}catch(Exception e){
return "";
}
}
else
return "";
}
public void setLocalInetAddress(String localInetAddress){
if(localInetAddress.startsWith("/")) localInetAddress = localInetAddress.substring(1);
this.localInetAddress = localInetAddress;
}
public String getLanAddress(){
try{
InetAddress[] addresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostName());
for (int x = 0; x < addresses.length; x++) {
if (!isInternetPublicAddress(addresses[x]) && !addresses[x].isLoopbackAddress()) {
String address = addresses[x].getHostAddress();
if(address.startsWith("/")) address = address.substring(1);
return address;
}
}
for (int x = 0; x < addresses.length; x++) {
if (!addresses[x].isLoopbackAddress()) {
String address = addresses[x].getHostAddress();
if(address.startsWith("/")) address = address.substring(1);
return address;
}
}
}catch(Exception e){
_logger.error("Invalid InetAddress: ",e);
}
return null;
}
public static boolean isInternetPublicAddress(InetAddress address) {
try {
byte head = address.getAddress()[0];
byte body = address.getAddress()[1];
byte tail = address.getAddress()[2];
if (head >= 89 && head <= 127)
return false;
if (head >= 173 && head <= 187)
return false;
if (head >= 224 && head <= 239)
return false;
if (head >= 240 && head <= 255)
return false;
switch (head) {
case (byte) 0:
break;
case (byte) 1:
break;
case (byte) 2:
break;
case (byte) 5:
break;
case (byte) 10:
break;
case (byte) 23:
break;
case (byte) 27:
break;
case (byte) 31:
break;
case (byte) 36:
break;
case (byte) 37:
break;
case (byte) 39:
break;
case (byte) 41:
break;
case (byte) 42:
break;
case (byte) 46:
break;
case (byte) 49:
break;
case (byte) 50:
break;
case (byte) 58:
break;
case (byte) 59:
break;
case (byte) 60:
break;
case (byte) 71:
break;
case (byte) 72:
break;
case (byte) 73:
break;
case (byte) 74:
break;
case (byte) 75:
break;
case (byte) 76:
break;
case (byte) 77:
break;
case (byte) 78:
break;
case (byte) 79:
break;
case (byte) 128:
if (body == (byte) 0)
break;
case (byte) 169:
if (body == (byte) 254)
break;
case (byte) 172:
if (body >= (byte) 16 && body <= (byte) 31)
break;
case (byte) 189:
break;
case (byte) 190:
break;
case (byte) 191:
if (body == (byte) 255)
break;
case (byte) 192:
if (body == (byte) 0 ||
(body == (byte) 68 && tail == (byte) 185) ||
body == (byte) 168)
break;
case (byte) 197:
break;
case (byte) 198:
if (body >= (byte) 18 && body <= (byte) 19)
break;
case (byte) 223:
break;
default:
return true;
}
return false;
}
catch (Exception e) {
return false;
}
}
public static String getVersion() {
return applicationVersion;
}
public static String getProtocolVersion() {
return protocolVersion;
}
public static String getApplicationName() {
return applicationName;
}
public static String getNetName(){
return netName;
}
public String getIdent() {
return this.id;
}
public String getShortId(){
return this.id.substring(0,10);
}
public int getServerPort() {
return this.serverPort;
}
public java.util.List getFailedMessages() {
return this.failedMessages;
}
public void setMaxNeighbours(int mn) {
maxNeighbours = mn;
}
public int getMaxNeighbours() {
return maxNeighbours;
}
public void removeNeighbour(NeighbourAnt n) {
synchronized (neighbours) {
try {
n.terminate();
if(this.neighbours.remove(n)){
this.generateNetModificationAlert(n);
}
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
}
}
}
public void generateNetModificationAlert(NeighbourAnt disconnectedNeighbour){
try{
ArrayList unreachableNeighbours = new ArrayList();
Enumeration routingElements = this.routingTable.keys();
while(routingElements.hasMoreElements()){
String nodeID = (String)routingElements.nextElement();
if(nodeID.equals(this.getIdent())) continue;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(nodeID);
if(rte != null && rte.removeRoute(disconnectedNeighbour.getIdent()) && rte.getRoutes().size() == 0){
unreachableNeighbours.add(nodeID);
this.routingTable.remove(nodeID);
if(rte != null && rte.getIP() != null){
String unreachableIP = rte.getIP();
unreachableNeighbours.addAll(this.removeRoutes(unreachableIP, disconnectedNeighbour.getIdent()));
}
this.propertyChangeSupport.firePropertyChange("routeLost", disconnectedNeighbour.getIdent(), nodeID);
_logger.info("[1]UNREACHABLE: "+nodeID.substring(0,10));
}
}
if(unreachableNeighbours.size() != 0){
NetModificationAlert alert = new NetModificationAlert(
unreachableNeighbours);
this.netModifications.put(alert.getAck_Id(),
new Long(System.currentTimeMillis()));
for (int x = 0; x < this.neighbours.size(); x++) {
( (NeighbourAnt)this.neighbours.get(x)).route(alert);
}
}
}catch(Exception e){
_logger.error("Error in processing netModification upon peer disconnection",e);
}
}
public void generateNetModificationAlert(Message m, String neighbourId){
try{
if(m.getDest().equals(this.getIdent())) return;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(m.getDest());
if(rte != null && rte.removeRoute(neighbourId) && rte.getRoutes().size() == 0){
_logger.info("[2]UNREACHABLE: "+m.getDest().substring(0,10));
this.routingTable.remove(neighbourId);
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(m.getDest());
if(rte != null && rte.getIP() != null){
String unreachableIP = rte.getIP();
unreachableNeighbours.addAll(this.removeRoutes(unreachableIP, neighbourId));
}
this.propertyChangeSupport.firePropertyChange("routeLost", neighbourId, m.getDest());
if (unreachableNeighbours.size() != 0) {
NetModificationAlert alert = new NetModificationAlert(
unreachableNeighbours);
this.netModifications.put(alert.getAck_Id(), new Long(System.currentTimeMillis()));
for (int x = 0; x < this.neighbours.size(); x++) {
( (NeighbourAnt)this.neighbours.get(x)).route(alert);
}
}
}
}
catch (Exception e) {
_logger.error("Error in generating netModification Alert upon message deletion", e);
}
}
public void generateNetModificationAlert(ArrayList unreachableNeighbours, String requirer){
try{
if (unreachableNeighbours.size() != 0){
for(int x = unreachableNeighbours.size() - 1; x >= 0; x--){
if(unreachableNeighbours.get(x).equals(this.getIdent())) {
unreachableNeighbours.remove(x);
continue;
}
RoutingTableElement rte = (RoutingTableElement)this.routingTable.remove(unreachableNeighbours.get(x));
if(rte != null && rte.getIP() != null){
String unreachableIP = rte.getIP();
unreachableNeighbours.addAll(this.removeRoutes(unreachableIP, requirer));
}
this.propertyChangeSupport.firePropertyChange("routeLost", null, unreachableNeighbours.get(x));
_logger.info("[3]UNREACHABLE: "+(unreachableNeighbours.get(x).toString().length() > 10 ? unreachableNeighbours.get(x).toString().subSequence(0,10) : unreachableNeighbours.get(x).toString()));
}
NetModificationAlert alert = new NetModificationAlert(unreachableNeighbours);
this.netModifications.put(alert.getAck_Id(), new Long(System.currentTimeMillis()));
if(this.acceptTCPDirectConnections){
try{
Neighbour n = new DirectNeighbour(requirer, this);
if (n != null) {
n.send(alert);
}
}catch(Exception e){}
}
for (int x = 0; x < this.neighbours.size(); x++) {
( (NeighbourAnt)this.neighbours.get(x)).route(alert);
}
}
}
catch (Exception e) {
_logger.error("Error in generating netModification Alert upon message deletion", e);
}
}
private ArrayList removeRoutes(String ip, String sourceId){
Enumeration routingElements = this.routingTable.keys();
ArrayList unreachableNeighbours = new ArrayList();
while(routingElements.hasMoreElements()){
String nodeID = (String)routingElements.nextElement();
if(nodeID.equals(this.getIdent())) continue;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(nodeID);
if(rte != null && rte.removeRoute(ip) &&
rte.getRoutes().size() == 0){
unreachableNeighbours.add(nodeID);
this.routingTable.remove(nodeID);
String unreachableIP = rte.getIP();
unreachableNeighbours.addAll(this.removeRoutes(unreachableIP, sourceId));
this.propertyChangeSupport.firePropertyChange("routeLost", sourceId, nodeID);
}
}
return unreachableNeighbours;
}
protected synchronized void processNetModificationAlert(NetModificationAlert alert, Neighbour source){
try{
if (this.netModifications.get(alert.getAck_Id()) != null) return;
this.netModifications.put(alert.getAck_Id(), new Long(System.currentTimeMillis()));
ArrayList unreachableNeighbours = new ArrayList();
Enumeration routingElements = this.routingTable.keys();
while(routingElements.hasMoreElements()){
String nodeID = (String)routingElements.nextElement();
if(nodeID.equals(this.getIdent())) continue;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(nodeID);
if(alert.getUnreachablePeers().contains(nodeID) && rte != null &&
rte.removeRoute(source.getIdent()) &&
rte.getRoutes().size() == 0){
unreachableNeighbours.add(nodeID);
_logger.info("[4]UNREACHABLE: "+nodeID.substring(0,10));
this.routingTable.remove(nodeID);
if(rte != null && rte.getIP() != null){
String unreachableIP = rte.getIP();
unreachableNeighbours.addAll(this.removeRoutes(unreachableIP, source.getIdent()));
}
this.propertyChangeSupport.firePropertyChange("routeLost", source.getIdent(), nodeID);
}
}
if (unreachableNeighbours.size() != 0){
alert.setUnreachablePeers(unreachableNeighbours);
for (int x = 0; x < this.neighbours.size(); x++) {
if (! ( (NeighbourAnt)this.neighbours.get(x)).equals(source))
( (NeighbourAnt)this.neighbours.get(x)).route(alert);
}
}
}
catch (Exception e) {
_logger.error("Error in processing netModification Alert", e);
}
}
public NeighbourAnt getNeighbour(String id) {
for (int x = 0; x < this.neighbours.size(); x++) {
if ( ( (NeighbourAnt)this.neighbours.get(x)).getIdent().equals(id) ||
( (NeighbourAnt)this.neighbours.get(x)).getRemoteId().equals(id)) {
return (NeighbourAnt)this.neighbours.get(x);
}
}
return null;
}
public static int getRateThresold(){
int rateThresold = Integer.MAX_VALUE;
if(Ant.ConnectionType.equals("56K"))
rateThresold = Integer.MAX_VALUE;
else if(Ant.ConnectionType.equals("ISDN"))
rateThresold = Integer.MAX_VALUE;
else if(Ant.ConnectionType.equals("DSL"))
rateThresold = 3000;
else if(Ant.ConnectionType.equals("CABLE"))
rateThresold = 1500;
else if(Ant.ConnectionType.equals("LAN T3"))
rateThresold = 750;
else if(Ant.ConnectionType.equals("LAN T2"))
rateThresold = 375;
else if(Ant.ConnectionType.equals("LAN T1"))
rateThresold = 186;
else if(Ant.ConnectionType.equals("LAN or Fiber Net"))
rateThresold = 186;
return rateThresold;
}
public int getUnderRatedNeighbours() {
int rateThresold = this.getRateThresold();
int underRatedNeighbours = 0;
for (int x = 0; x < this.getNeighbours().size(); x++) {
if ( ( (NeighbourAnt)this.getNeighbours().get(x)).getTimeElapsed() >=
rateThresold)
underRatedNeighbours++;
}
return underRatedNeighbours;
}
public boolean equals(Object o) {
if (o instanceof Ant)
return ( (Ant) o).getIdent().equals(this.getIdent());
else
return o == this;
}
public int getNeighboursNumber() {
return this.neighbours.size();
}
public void addNeighbour(NeighbourAnt n) throws Exception {
synchronized (neighbours) {
if (this.neighbours.size() < this.maxNeighbours &&
!this.neighbours.contains(n) && !this.isDisconnected()) {
this.neighbours.add(n);
}
else {
if (this.neighbours.size() >= this.maxNeighbours)
throw new Exception(this.getShortId() +
": Max neighbourg number reached");
else if (this.neighbours.contains(n))
throw new Exception(this.getShortId() + ": Neighbourg already connected");
else
throw new Exception(this.getShortId() +
": Strange thing in neighbour add process.");
}
}
_logger.debug(this.getIdent() + ": Added Neighbour by request IP(local): "+n.getIdent()+" IP(remote): "+n.getRemoteId());
this.propertyChangeSupport.firePropertyChange("newNeighbour", null, n);
}
public void addP2PNeighbour(String remoteAddress, int port,
boolean isRequirer,
InetAddress localhost) throws Exception {
if(this.isDisconnected())
return;
InetAddress remote = InetAddress.getByName(remoteAddress);
if ( (remote.isLoopbackAddress() ||
remote.isAnyLocalAddress() ||
remote.isLinkLocalAddress() ||
remote.isSiteLocalAddress()) &&
port == this.getServerPort()){
return;
}
if (localhost.getHostAddress().equals(remote.getHostAddress()) &&
port == this.getServerPort()){
return;
}
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt na = ( (NeighbourAnt)this.neighbours.get(x));
if (na.getIdent().equals(remoteAddress + " " + port) ||
na.getRemoteId().equals(remoteAddress + " " + port) ||
((na.s.getInetAddress().isLoopbackAddress() ||
na.s.getInetAddress().isAnyLocalAddress() ||
na.s.getInetAddress().isLinkLocalAddress() ||
na.s.getInetAddress().isSiteLocalAddress()) && (na.remoteServerPort == port || na.port == port))){
return;
}
}
SecureClientSocket scs = new SecureP2PClientSocket(remoteAddress, port, this.getServerPort(), this.proxied);
if (scs.isNewVersionDetected()) {
this.getPropertyChangeSupport().firePropertyChange(
"newANtsVersionDetected", null,
scs.getNewerVersion());
}
if (!scs.isClosed()) {
_logger.info(scs.getInetAddress().getHostAddress() +": Local time elapsed: "+scs.getTimeElapsed()+"[Thresold: "+this.getRateThresold()+"]");
/*if(this.getUnderRatedNeighbours() >= Math.floor(Ant.maxNeighbours*underRateConnections) &&
scs.getTimeElapsed() >= this.getRateThresold()){
throw new Exception(scs.getInetAddress().getHostAddress() + ": Rejected neighbour cause it doesn't satisfy bandwith request: ["+this.getUnderRatedNeighbours()+"/"+Math.floor(Ant.maxNeighbours*underRateConnections)+"]");
}*/
NeighbourAnt n = new NeighbourAnt(this, remoteAddress, port,
scs.getLocalServerPort(),
scs.getCipherEnc(), scs.getCipherDec(),
scs.getSocket(), isRequirer, scs.getTimeElapsed());
synchronized (neighbours) {
if (this.neighbours.size() < this.maxNeighbours &&
!this.neighbours.contains(n)) {
this.neighbours.add(n);
n.start();
}
else {
n.terminate();
n.setFailure();
n.start();
if (this.neighbours.size() >= this.maxNeighbours){
throw new Exception(this.getShortId() +
": Max neighbourg number reached");
}
else if (this.neighbours.contains(n)){
throw new Exception(this.getShortId() +
": Neighbourg already connected");
}
throw new Exception(this.getShortId() +
": Strange thing in neighbour add process.");
}
}
_logger.debug(this.getIdent() + ": Added Neighbourg from address IP(local): "+n.getIdent()+" IP(remote): "+n.getRemoteId());
this.propertyChangeSupport.firePropertyChange("newNeighbour", null, n);
}
else {
_logger.info(this.getShortId() + ": Connection rejected...");
}
}
public java.util.List getNeighbours() {
return this.neighbours;
}
public MessageWrapper getMessage(String id) {
for (int x = 0; x < this.myMessages.size(); x++) {
if ( ( (MessageWrapper)this.myMessages.get(x)).getMessage().getAck_Id().
equals(id)) {
return (MessageWrapper)this.myMessages.get(x);
}
}
return null;
}
public void traceHints(Message m, String requirer){
if(!m.getSource().equals(this.getIdent())){
if(m instanceof QueryMessage && m.getType() == 0 && ((QueryMessage)m).getProcessed()){
QueryMessage qm = (QueryMessage)m;
for(int x = 0; qm.getTuples() != null && x < qm.getTuples().size(); x++){
if(qm.getTuples().get(x) instanceof QueryRemoteFileTuple){
QueryRemoteFileTuple qrft = (QueryRemoteFileTuple) qm.getTuples().get(x);
if(this.getIdent().equals(qrft.getOwnerID())) continue;
if (this.routingTable.get(qrft.getOwnerID()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(qrft.getOwnerID());
if (!this.acceptTCPDirectConnections || m.getSourceAddress().equals(""))
rte.addHintRoute(requirer, qrft.getLastTimeSeen().longValue());
else
rte.setIP(qrft.getOwnerIP());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || m.getSourceAddress().equals(""))
rte.addHintRoute(requirer, qrft.getLastTimeSeen().longValue());
else
rte.setIP(qrft.getOwnerIP());
this.routingTable.put(qrft.getOwnerID(), rte);
}
}
else if(qm.getTuples().get(x) instanceof HttpServerInfo){
HttpServerInfo si = (HttpServerInfo) qm.getTuples().get(x);
if(this.getIdent().equals(si.getOwnerId())) continue;
if (this.routingTable.get(si.getOwnerId()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(si.getOwnerId());
if (!this.acceptTCPDirectConnections || si.getOwnerIp().equals(""))
rte.addHintRoute(requirer, si.getSeenOn().longValue());
else
rte.setIP(si.getOwnerIp());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || si.getOwnerIp().equals(""))
rte.addHintRoute(requirer, si.getSeenOn().longValue());
else
rte.setIP(si.getOwnerIp());
this.routingTable.put(si.getOwnerId(), rte);
}
}
}
}
}
}
public void traceDeliveredMessage(MessageWrapper wm) {
if(!wm.getMessage().getSource().equals(this.getIdent()) &&
!wm.getMessage().getSource().equals("")){
if(this.routingTable.get(wm.getMessage().getSource()) != null){
RoutingTableElement rte = (RoutingTableElement) this.routingTable.get(wm.getMessage().getSource());
if (!this.acceptTCPDirectConnections || wm.getMessage().getSourceAddress().equals(""))
rte.addTracedRoute(wm.getRequirer());
else
rte.setIP(wm.getMessage().getSourceAddress());
}else{
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || wm.getMessage().getSourceAddress().equals(""))
rte.addTracedRoute(wm.getRequirer());
else
rte.setIP(wm.getMessage().getSourceAddress());
this.routingTable.put(wm.getMessage().getSource(), rte);
}
}
if(!wm.getMessage().getDest().equals(this.getIdent()) &&
!wm.getMessage().getDest().equals("")){
if(this.routingTable.get(wm.getMessage().getDest()) != null){
RoutingTableElement rte = (RoutingTableElement) this.routingTable.get(wm.getMessage().getDest());
rte.addTracedRoute(wm.getRoutedTo());
}else{
RoutingTableElement rte = new RoutingTableElement();
rte.addTracedRoute(wm.getRoutedTo());
this.routingTable.put(wm.getMessage().getDest(), rte);
}
if(wm.getMessage() instanceof QueryMessage && wm.getMessage().getType() == 2){
QueryMessage qm = (QueryMessage)wm.getMessage();
for(int x = 0; qm.getTuples() != null && x < qm.getTuples().size(); x++){
if(qm.getTuples().get(x) instanceof QueryRemoteFileTuple){
QueryRemoteFileTuple qrft = (QueryRemoteFileTuple) qm.getTuples().get(x);
if(this.getIdent().equals(qrft.getOwnerID())) continue;
if (this.routingTable.get(qrft.getOwnerID()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(qrft.getOwnerID());
if (!this.acceptTCPDirectConnections || qrft.getOwnerIP().equals(""))
rte.addHintRoute(wm.getRoutedTo(), qrft.getLastTimeSeen().longValue());
else
rte.setIP(qrft.getOwnerIP());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || qrft.getOwnerIP().equals(""))
rte.addHintRoute(wm.getRoutedTo(), qrft.getLastTimeSeen().longValue());
else
rte.setIP(qrft.getOwnerIP());
this.routingTable.put(qrft.getOwnerID(), rte);
}
}
else if(qm.getTuples().get(x) instanceof QuerySupernodeTuple){
QuerySupernodeTuple qst = (QuerySupernodeTuple) qm.getTuples().get(x);
if(this.getIdent().equals(qst.getOwnerID())) continue;
if (this.routingTable.get(qst.getOwnerID()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(qst.getOwnerID());
if (!this.acceptTCPDirectConnections || qst.getOwnerIP().equals(""))
rte.addHintRoute(wm.getRoutedTo(), qst.getSeenOn().longValue());
else
rte.setIP(qst.getOwnerIP());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || qst.getOwnerIP().equals(""))
rte.addHintRoute(wm.getRoutedTo(), qst.getSeenOn().longValue());
else
rte.setIP(qst.getOwnerIP());
this.routingTable.put(qst.getOwnerID(), rte);
}
}
else if(qm.getTuples().get(x) instanceof HttpServerInfo){
HttpServerInfo si = (HttpServerInfo) qm.getTuples().get(x);
if(this.getIdent().equals(si.getOwnerId())) continue;
if (this.routingTable.get(si.getOwnerId()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(si.getOwnerId());
if (!this.acceptTCPDirectConnections || si.getOwnerIp().equals(""))
rte.addHintRoute(wm.getRoutedTo(), si.getSeenOn().longValue());
else
rte.setIP(si.getOwnerIp());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if (!this.acceptTCPDirectConnections || si.getOwnerIp().equals(""))
rte.addHintRoute(wm.getRoutedTo(), si.getSeenOn().longValue());
else
rte.setIP(si.getOwnerIp());
this.routingTable.put(si.getOwnerId(), rte);
}
}
}
}
}
}
public void traceMessage(Message m, String requirer, String routedTo) {
MessageWrapper wm = new MessageWrapper(m, requirer);
wm.setStoreFormat();
wm.setRoutedTo(routedTo);
if (this.routingTable.get(m.getSource()) != null) {
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(m.getSource());
if(!this.acceptTCPDirectConnections || m.getSourceAddress().equals(""))
rte.addTracedRoute(requirer);
else
rte.setIP(m.getSourceAddress());
}
else {
RoutingTableElement rte = new RoutingTableElement();
if(!this.acceptTCPDirectConnections || m.getSourceAddress().equals(""))
rte.addTracedRoute(requirer);
else
rte.setIP(m.getSourceAddress());
this.routingTable.put(m.getSource(), rte);
}
this.traceHints(m, requirer);
int index = this.inTransitMessages.indexOf(m);
if (index >= 0) {
this.inTransitMessages.remove(index);
}
if (this.inTransitMessages.size() < this.inTransitMessageSizeMax &&
!this.inTransitMessages.contains(m)) {
this.inTransitMessages.add(wm);
}
else if (this.inTransitMessages.size() >= this.inTransitMessageSizeMax &&
!this.inTransitMessages.contains(m)) {
this.inTransitMessages.add(wm);
this.inTransitMessages.remove(0);
}
}
public void traceMyMessage(MessageWrapper wm, String routedTo) {
//synchronized (wm) {
wm.setRoutedTo(routedTo);
//}
//synchronized (this.myMessages) {
int index = this.myMessages.indexOf(wm);
if(index < 0){
this.myMessages.add(wm);
}else{
this.myMessages.remove(wm);
this.myMessages.add(wm);
//this.myMessages.trimToSize();
}
//}
}
public static void decBeingRoutedMessages() {
Ant.beingRoutedMessages--;
//_logger.info("[Routed] There are "+Ant.beingRoutedMessages+" messages being routed");
}
public static boolean verifyAndIncBeingRoutedMessages() {
if (Ant.beingRoutedMessages < Ant.maxMessagesToRouteToghether) {
Ant.beingRoutedMessages++;
//_logger.info("[Routing] There are "+Ant.beingRoutedMessages+" messages being routed");
return true;
}
else {
return false;
}
}
public Router activateNewRouterProcess(Message m, String requirer) throws Exception {
if (Ant.verifyAndIncBeingRoutedMessages()) {
String source = m.getSource();
if(source.length() > 10) source = source.substring(0, 10);
String dest = m.getDest();
if(dest.length() > 10) dest = dest.substring(0, 10);
java.io.ByteArrayOutputStream bos = new java.io.ByteArrayOutputStream();
java.io.ObjectOutputStream oss = new java.io.ObjectOutputStream(bos);
oss.writeObject(m);
oss.close();
bos.close();
_logger.info("Routing["+m.getType()+"]: "+m.getClass().toString()+" from " + source + " to " + dest + " size " + (bos.toByteArray().length / 1024.0) + "KB");
Router r = new Router(this, m, requirer);
/*Prova Priorit‡*/
if (m instanceof ControlMessage ||
m instanceof FileTransferEndControlMessage ||
m instanceof FileTransferErrorControlMessage ||
m instanceof FileSizePullErrorControlMessage ||
m instanceof FileInfosPullErrorControlMessage ||
m instanceof HttpTransferEndControlMessage ||
m instanceof HttpInterruptTransferMessage ||
m instanceof SecureConnectionErrorControlMessage ||
m instanceof NetModificationAlert) {
r.setPriority(10);
}
else if (m instanceof SecurityRequestMessage ||
m instanceof SecurityResponseMessage ||
m instanceof FilePullMessage ||
m instanceof FilePushMessage ||
m instanceof FileInfosPullMessage ||
m instanceof FileInfosPushMessage ||
m instanceof HttpRequestMessage) {
r.setPriority(10);
}
else if (m instanceof FileSizePullMessage ||
m instanceof FileSizePushMessage) {
r.setPriority(8);
}
else if (m instanceof PrivateChatMessage) {
r.setPriority(7);
}
else if (m instanceof QueryMessage) {
QueryMessage qm = (QueryMessage) m;
if (qm.getQuery() instanceof QueryStringItem ||
qm.getQuery() instanceof QueryHashItem){
r.setPriority(10); //was 6
}
else if (qm.getQuery() instanceof QueryFileListItem ||
qm.getQuery() instanceof QuerySupernodeItem ||
qm.getQuery() instanceof QueryInetAddressItem ||
qm.getQuery() instanceof QueryRandomItem) {
r.setPriority(6);
}
else
r.setPriority(10);
}
else if (m instanceof HttpResponsePartMessage) {
r.setPriority(6);//was 4
}
else if (m instanceof FilePartMessage) {
r.setPriority(6);// was 3
}
else if (m instanceof Message) {
r.setPriority(6);
}
if (Router.PARALLEL)
r.start();
else
r.run();
return r;
}else{
_logger.error("Too many messages being routed!");
return null;
}
}
public SenderThread[] route(MessageWrapper wm, Message m, String requirer, Router r, boolean external, boolean retransmission)throws
Exception {
if(external){
return this.routeExternalMessage(m, requirer, r);
}else if(!external && retransmission){
return this.retransmitMessage(wm);
}else{
return this.routeMyMessage(wm);
}
}
private SenderThread[] routeExternalMessage(Message m, String requirer, Router r) throws
Exception {
try {
if (!m.getSource().equals("") && m.getSource().equals(this.getIdent())) {
try {
MessageSigner.getInstance().sign(m);
}
catch (Exception e) {
_logger.error("Failed signing broadcast message", e);
}
}
if (Router.delay > 0)
Thread.sleep( (long) Math.floor(Router.delay * Math.random()));
if (this.neighbours.size() == 0) {
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(m.getDest());
this.generateNetModificationAlert(unreachableNeighbours, requirer);
throw new Exception(this.getShortId() +
": Destination node " + m.getDest().substring(0,10) +
" unreachable for message from node " + (m.getSource().length() > 0 ? m.getSource().substring(0,10) : m.getSource()) +
" with id " +
m.getAck_Id() + " type = " + m.getType() +
"\nReason: No Route Found " + "Node " + this.getShortId() +
" has no neighbours");
}
if (this.myMessages.contains(m)) {
if(m.getType() != 2){
int index = this.myMessages.indexOf(m);
MessageWrapper wm = (MessageWrapper)this.myMessages.get(index);
/*if (wm.getRetrasmissions() < m.getRetrasmissions() && m.getRetrasmissions() < Ant.maxRetransmissionsForceDirection) {
wm.notToBeConsidered = new ArrayList();//Collections.synchronizedList(new ArrayList())
}
else*/ if (!wm.notToBeConsidered.contains(wm.getRoutedTo())) {
wm.notToBeConsidered.add(wm.getRoutedTo());
this.generateNetModificationAlert(wm.getMessage(), wm.getRoutedTo());
}
Neighbour selectedNeighbour = this.checkForRoute(wm, wm.notToBeConsidered);
if(selectedNeighbour != null){
SenderThread[] st = new SenderThread[1];
st[0] = selectedNeighbour.route(m);
return st;
}
if(wm.notToBeConsidered.size() == 0){
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(wm.getMessage().getDest());
this.generateNetModificationAlert(unreachableNeighbours, requirer);
}
throw new Exception(this.getShortId() +
": Destination node " + m.getDest().substring(0,10) +
" unreachable for message from node " +
(m.getSource().length() > 0 ? m.getSource().substring(0,10) : m.getSource()) + " with id " +
m.getAck_Id() +
" type = " + m.getType() +
"\nReason: No Route Found " + "Node " +
this.getShortId() + " excluded all its " +
wm.notToBeConsidered.size() + " neghbours");
}
else { // m.getType() == 2
_logger.debug(this.getShortId() +
": Broadcast Message returned no more node avaiable to send. Killing...");
return null;
}
}
else if (!m.getDest().equals(this.getIdent())) {
if (!this.inTransitMessages.contains(m)) {
if (m.getType() == 1) {
for (int k = 0; k < this.inTransitMessages.size(); k++) {
MessageWrapper inTransit = (MessageWrapper)this.inTransitMessages.get(k);
if ( (inTransit.getMessage().getType() == 0 ||
inTransit.getMessage().getType() == 2) &&
inTransit.getMessage().getAck_Id().equals(m.getAck_Id())) {
Neighbour n = null;
if(inTransit.getMessage().getSourceAddress().equals("")){
try{
n = new DirectNeighbour(inTransit.getRequirer(), this);
}catch(Exception e){
n = this.getNeighbour(inTransit.getRequirer());
}
}
else{
try{
n = new DirectNeighbour(inTransit.getMessage().getSourceAddress(), this);
}catch(Exception e){
n = this.getNeighbour(inTransit.getRequirer());
}
}
if (n != null) {
inTransit.getMessage().invalidate();
if(inTransit.getMessage().getType() == 2){
Message broadTracer;
if (inTransit.getMessage() instanceof QueryMessage)
broadTracer = new QueryMessage( (QueryMessage) m);
else
broadTracer = new Message(m);
broadTracer.fillMessageProperties(inTransit.getMessage().getSource(), inTransit.getMessage().getSourceAddress() , m.getSource(), 2, m.getAck_Id());
MessageWrapper broadTracerWrapper = new MessageWrapper(broadTracer, inTransit.getRequirer());
broadTracerWrapper.setRoutedTo(requirer);
this.traceDeliveredMessage(broadTracerWrapper);
}else{
this.inTransitMessages.remove(k);
this.traceDeliveredMessage(inTransit);
}
_logger.debug(this.getShortId() +
": C Forwarding message with id = " +
m.getAck_Id() + " type = " +
m.getType() +
" to id = " +
n.getIdent());
SenderThread[] st = new SenderThread[1];
st[0] = n.route(m);
return st;
}else{
throw new NullNeighbourException(this.getShortId() +
": Ack forwarding error [" +inTransit.getRequirer()+"] Dest "+
m.getDest() +
" source " +
m.getSource() + " with id " +
m.getAck_Id() +
" type = " + m.getType() +
"\nReason: No Route Found Node " +
this.getShortId());
}
}
}
throw new NullNeighbourException(this.getShortId() +
": Ack message error: dest " +
m.getDest() +
" source " +
m.getSource() + " with id " +
m.getAck_Id() +
" type = " + m.getType() +
"\nReason: No Route Found Node " +
this.getShortId());
}
else if (m.getType() == 2) {
this.traceMessage(m, requirer, "");
double mustDie = ((QueryMessage)m).getNextRandomDoubleAndRollback() * 100;
if(!(mustDie < r.queryDieProbability)) this.processMessage(m, r);
ArrayList threadsList = new ArrayList();
int floodedMessages = 0;
List neighboursShuffled = (ArrayList)((ArrayList)this.getNeighbours()).clone();
Collections.shuffle(neighboursShuffled);
for(int x = 0; x < neighboursShuffled.size() && !(mustDie < r.queryDieProbability); x++){
double mustRoute = Math.random() * 100;
if(mustRoute < (floodedMessages > 0 ?
100 * Math.pow(r.routeProbability / 100.0, (floodedMessages + 1.0)):
r.routeProbability)){
floodedMessages++;
QueryMessage copy = new QueryMessage((QueryMessage)m);
double modifySeed = Math.random() * 100;
if(modifySeed < r.querySeedModificationProbability) copy.getNextRandomDouble();
NeighbourAnt n = (NeighbourAnt)neighboursShuffled.get(x);
if (n != null && !n.getIdent().equals(requirer)) {
try{
SenderThread broadSender = n.route(copy);
threadsList.add(broadSender);
_logger.debug(this.getShortId() +
": Flooding message with id = " +
m.getAck_Id() + " type = " +
m.getType() +
" to id = " +
n.getIdent());
}catch(Exception e){
_logger.error("Cannot flood to neighbour",e);
}
}
}
}
SenderThread[] st = new SenderThread[threadsList.size()];
for(int x = 0; x < threadsList.size(); x++)
st[x] = (SenderThread)threadsList.get(x);
return st;
}else{
Neighbour selectedNeighbour = this.checkForRoute(m, requirer, new ArrayList(), false);
if(selectedNeighbour != null){
SenderThread[] st = new SenderThread[1];
st[0] = selectedNeighbour.route(m);
return st;
}
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(m.getDest());
this.generateNetModificationAlert(unreachableNeighbours, requirer);
throw new NullNeighbourException(this.getShortId() +
": Destination node " +
m.getDest().substring(0,10) +
" unreachable for message from node " +
(m.getSource().length() > 0 ? m.getSource().substring(0,10) : m.getSource()) + " with id " +
m.getAck_Id() +
" type = " + m.getType() +
"\nReason: No Route Found with no excluded neghbours");
}
}
else {
int index = this.inTransitMessages.indexOf(m);
MessageWrapper wm = (MessageWrapper)this.inTransitMessages.get(index);
if(m.getType() != 2){
/*if (wm.getRetrasmissions() < m.getRetrasmissions() &&
m.getRetrasmissions() < Ant.maxRetransmissionsForceDirection) {
wm.notToBeConsidered = new ArrayList();//Collections.synchronizedList(new ArrayList())
}
else */if (!wm.notToBeConsidered.contains(wm.getRoutedTo())) {
wm.notToBeConsidered.add(wm.getRoutedTo());
this.generateNetModificationAlert(wm.getMessage(), wm.getRoutedTo());
}
_logger.debug(this.getShortId() +
": Message with id = " +
m.getAck_Id() + " type = " + m.getType() +
" returned. Not considering node " +
wm.getRoutedTo());
Neighbour selectedNeighbour = this.checkForRoute(m, requirer, wm.notToBeConsidered, true);
if(selectedNeighbour != null){
SenderThread[] st = new SenderThread[1];
st[0] = selectedNeighbour.route(m);
return st;
}
if(wm.notToBeConsidered.size() == 0){
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(m.getDest());
this.generateNetModificationAlert(unreachableNeighbours, requirer);
}
throw new NullNeighbourException(this.getShortId() +
": Destination node " +
m.getDest().substring(0,10) +
" unreachable for message from node " +
(m.getSource().length() > 0 ? m.getSource().substring(0,10) : m.getSource()) + " with id " +
m.getAck_Id() +
" type = " + m.getType() +
"\nReason: No Route Found Node " +
this.getShortId() +
" excluded all its " +
wm.notToBeConsidered.size() +
" neghbours " + m);
}else{ // m.getType() == 2
this.traceMessage(m, requirer, "");
_logger.debug(this.getShortId() +
": D Flooded message with id = " +
m.getAck_Id() + " type = " +
m.getType() +
" returned. Killing!");
return null;
}
}
}
else if (m.getDest().equals(this.getIdent())) {
if (m.getType() == 0 && !m.getSource().equals("")) {
_logger.debug(this.getShortId() +
": Received Message sending Ack for message with id = " +
m.getAck_Id() + " type = " + m.getType());
MessageWrapper toBeStored = new MessageWrapper(m, requirer);
this.traceHints(m, requirer);
this.traceDeliveredMessage(toBeStored);
Message ackMessage = new Message();
ackMessage.fillMessageProperties(this.getIdent(), "",
m.getSource(), 1,
m.getAck_Id()
);
MessageWrapper wm = new MessageWrapper(ackMessage, requirer);
if (!wm.getMessage().getSource().equals("") && wm.getMessage().getSource().equals(this.getIdent())) {
try {
MessageSigner.getInstance().sign(wm.getMessage());
}
catch (Exception e) {
_logger.error("Failed signing message", e);
}
}
SenderThread[] st = null;
Neighbour n = null;
if(m.getSourceAddress().equals("")){
try{
n = new DirectNeighbour(requirer, this);
}catch(Exception e){
n = this.getNeighbour(requirer);
}
}
else{
try{
n = new DirectNeighbour(m.getSourceAddress(), this);
}catch(Exception e){
n = this.getNeighbour(requirer);
}
}
if (n != null) {
st = new SenderThread[1];
st[0] = n.route(new Message(wm));
_logger.debug(this.getShortId() +
": D1 Forwarding message with id = " +
wm.getMessage().getAck_Id() + " type = " +
wm.getMessage().getType() +
" to id = " + requirer);
}
else {
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(wm.getMessage().getDest());
this.generateNetModificationAlert(unreachableNeighbours, this.getIdent());
throw new NullNeighbourException(this.getShortId() +
": Destination node " +
wm.getMessage().getDest().substring(0,10) +
" unreachable for message from node " +
(wm.getMessage().getSource().length() > 0 ? wm.getMessage().getSource().substring(0,10) : wm.getMessage().getSource()) + " with id " +
wm.getMessage().getAck_Id() +
" type = " + wm.getMessage().getType() +
"\nReason: No Route Found Node " +
this.getShortId() +
" excluded all its " +
wm.notToBeConsidered.size() +
" neghbours");
}
this.processMessage(m, r);
return st;
}
else if ( (m.getType() == 0 || m.getType() == 1) &&
m.getSource().equals("")) {
_logger.debug(this.getShortId() +
": Received Message with masked source id = " +
m.getAck_Id() + " type = " + m.getType());
this.processMessage(m, r);
return null;
}
else if (m.getType() == 1) {
if (this.getMessage(m.getAck_Id()) != null) {
MessageWrapper movingMessage = null;
movingMessage = (MessageWrapper)this.getMessage(m.getAck_Id());
if (movingMessage != null) {
if(movingMessage.getMessage().getType() == 2){
Message broadTracer;
if(m instanceof QueryMessage)
broadTracer = new QueryMessage((QueryMessage)m);
else
broadTracer = new Message(m);
broadTracer.fillMessageProperties(m.getDest(), movingMessage.getMessage().getSourceAddress(), m.getSource(), 2, m.getAck_Id());
broadTracer.invalidate();
MessageWrapper broadTracerWrapper = new MessageWrapper(broadTracer, this.getIdent());
broadTracerWrapper.setRoutedTo(((Router)Thread.currentThread()).getRequirer());
this.traceDeliveredMessage(broadTracerWrapper);
}else{
movingMessage.getMessage().invalidate();
this.myMessages.remove(movingMessage);
this.traceDeliveredMessage(movingMessage);
}
}
_logger.debug(this.getShortId() +
": Message succesfully routed id = " +
m.getAck_Id() +
" type = " + m.getType());
this.processMessage(m, r);
return null;
}
else {
_logger.debug(this.getShortId() +
": (Retransmission) Message succesfully routed id = " +
m.getAck_Id() +
" type = " + m.getType());
this.processMessage(m, r);
return null;
}
}
else {
throw new Exception(this.getShortId() +
": There is something strange(2) for message with id = " +
m.getAck_Id() + " type = " + m.getType());
}
}
else {
throw new Exception(this.getShortId() +
": There is something strange(3) for message with id = " +
m.getAck_Id() +
" type = " + m.getType());
}
}
catch (Exception e) {
_logger.error(this.getShortId() + " routing error["+requirer+"]: ", e);
return null;
}
}
private SenderThread[] routeMyMessage(MessageWrapper wm) throws Exception {
if (!wm.getMessage().getSource().equals("") && wm.getMessage().getSource().equals(this.getIdent())) {
try {
MessageSigner.getInstance().sign(wm.getMessage());
}
catch (Exception e) {
_logger.error("Failed signing broadcast message", e);
}
}
if(wm.getMessage().getType() != 2){
Neighbour n = this.checkForRoute(wm, wm.notToBeConsidered);
if (n != null) {
SenderThread[] st = new SenderThread[1];
st[0] = n.route(wm.getMessage());
return st;
}
else {
if(wm.notToBeConsidered.size() == 0){
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(wm.getMessage().getDest());
this.generateNetModificationAlert(unreachableNeighbours, this.getIdent());
}
throw new Exception(this.getShortId() +
": Destination unreachable for message with id = " +
wm.getMessage().getAck_Id().substring(0,10) + " type = " +
wm.getMessage().getType());
}
}else{
ArrayList threadsList = new ArrayList();
int floodedMessages = 0;
int routeProbability = 25 + (int)(System.currentTimeMillis() % 50);
int queryDieProbability = 5 + (int)(System.currentTimeMillis() % 10);
int querySeedModificationProbability = 50 + (int)(System.currentTimeMillis() % 25);
double mustDie = ((QueryMessage)wm.getMessage()).getNextRandomDoubleAndRollback() * 100;
List neighboursShuffled = (ArrayList)((ArrayList)this.getNeighbours()).clone();
Collections.shuffle(neighboursShuffled);
for (int x = 0; x < neighboursShuffled.size() && !(mustDie < queryDieProbability); x++) {
NeighbourAnt n = (NeighbourAnt)neighboursShuffled.get(x);
double mustRoute = Math.random() * 100;
if(mustRoute < (floodedMessages > 0 ?
100 * Math.pow(routeProbability / 100.0, (floodedMessages + 1.0)):
routeProbability)){
try{
QueryMessage copy = new QueryMessage((QueryMessage)wm.getMessage());
double modifySeed = Math.random() * 100;
if(modifySeed < querySeedModificationProbability) copy.getNextRandomDouble();
SenderThread broadSender = n.route(copy);
threadsList.add(broadSender);
_logger.debug(this.getShortId() +
": Flooding message with id = " +
wm.getMessage().getAck_Id() + " type = " +
wm.getMessage().getType() +
" to id = " +
n.getIdent());
}
catch (Exception e) {
_logger.error("Cannot flood to neighbour", e);
}
}
}
this.traceMyMessage(wm, "");
SenderThread[] st = new SenderThread[threadsList.size()];
for (int x = 0; x < threadsList.size(); x++)
st[x] = (SenderThread) threadsList.get(x);
return st;
}
}
private int changeThreadPriority(Message m){
int oldPriority = Thread.currentThread().getPriority();
if (m instanceof ControlMessage ||
m instanceof FileTransferEndControlMessage ||
m instanceof FileTransferErrorControlMessage ||
m instanceof FileSizePullErrorControlMessage ||
m instanceof FileInfosPullErrorControlMessage ||
m instanceof HttpTransferEndControlMessage ||
m instanceof HttpInterruptTransferMessage ||
m instanceof SecureConnectionErrorControlMessage ||
m instanceof NetModificationAlert) {
Thread.currentThread().setPriority(10);
}
else if (m instanceof SecurityRequestMessage ||
m instanceof SecurityResponseMessage ||
m instanceof FilePullMessage ||
m instanceof FilePushMessage ||
m instanceof FileInfosPullMessage ||
m instanceof FileInfosPushMessage ||
m instanceof HttpRequestMessage) {
Thread.currentThread().setPriority(10);
}
else if (m instanceof FileSizePullMessage ||
m instanceof FileSizePushMessage) {
Thread.currentThread().setPriority(8);
}
else if (m instanceof PrivateChatMessage) {
Thread.currentThread().setPriority(7);
}
else if (m instanceof QueryMessage) {
QueryMessage qm = (QueryMessage) m;
if (qm.getQuery() instanceof QueryStringItem ||
qm.getQuery() instanceof QueryHashItem){
Thread.currentThread().setPriority(10); //was 7
}
else if (qm.getQuery() instanceof QueryFileListItem ||
qm.getQuery() instanceof QuerySupernodeItem ||
qm.getQuery() instanceof QueryInetAddressItem ||
qm.getQuery() instanceof QueryRandomItem) {
Thread.currentThread().setPriority(6); // was 2
}
else
Thread.currentThread().setPriority(10);
}
else if (m instanceof HttpResponsePartMessage) {
Thread.currentThread().setPriority(6); // was 4
}
else if (m instanceof FilePartMessage) {
Thread.currentThread().setPriority(6); // was 3
}
else if (m instanceof Message) {
Thread.currentThread().setPriority(6);
}
return oldPriority;
}
public MessageWrapper sendMessage(Message message, String dest,
boolean disableAutoRetransmit,
boolean maskSource) throws
InterruptedException {
int oldPriority = this.changeThreadPriority(message);
String messageID;
try{
Thread.sleep(2);
messageID = this.getIdent() + "@" +
(new DigestManager()).getDigest(System.
currentTimeMillis() + "");
Thread.sleep(2);
}catch(Exception e){messageID = Math.floor(Math.random()*Math.pow(10,32))+"";}
if (!maskSource)
message.fillMessageProperties(this.getIdent(), this.getLocalInetAddress(), dest, 0, messageID);
else
message.fillMessageProperties("", "", dest, 0, messageID);
_logger.debug("There are " + this.failedMessages.size() +
" messages failed!");
_logger.debug("There are " + this.myMessages.size() +
" messages pending!");
_logger.debug(this.getShortId() + ": Sending Message from id = " +
this.getShortId() + " to id = " + dest + " message id = " +
messageID);
MessageWrapper wm = new MessageWrapper(message, this.getIdent());
if (disableAutoRetransmit) {
wm.disableAutoRetransmit();
}
try {
SenderThread[] st = this.route(wm,null,null,null,false,false);
if(st != null){
final SenderThread[] stFinal = st;
Thread waiter = new Thread(){
public void run() {
for(int x = 0; x < stFinal.length; x++){
try{
stFinal[x].join(Router.routeTimeout);
}catch(Exception e){
_logger.error("Cannot join send process",e);
}
if(stFinal[x].isAlive()){
stFinal[x].interruptSending();
}
}
}
};
waiter.start();
}
Thread.currentThread().setPriority(oldPriority);
return wm;
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
Thread.currentThread().setPriority(oldPriority);
return null;
}
}
public MessageWrapper sendBroadcastMessage(Message message) {
int oldPriority = this.changeThreadPriority(message);
String messageID;
try{
Thread.sleep(2);
messageID = this.getIdent() + "@" +
(new DigestManager()).getDigest(System.
currentTimeMillis() + "");
Thread.sleep(2);
}catch(Exception e){messageID = Math.floor(Math.random()*Math.pow(10,32))+"";}
SecureRandom sr = new SecureRandom();
byte[] randomDest = new byte[10];
sr.nextBytes(randomDest);
String dest = Base16.toHexString(randomDest);
message.fillMessageProperties(this.getIdent(), this.getLocalInetAddress(), dest, 2, messageID);
MessageWrapper wm = new MessageWrapper(message, this.getIdent());
wm.disableAutoRetransmit();
try {
SenderThread st[] = this.route(wm,null,null,null,false,false);
if(st != null){
final SenderThread[] stFinal = st;
Thread waiter = new Thread(){
public void run() {
for(int x = 0; x < stFinal.length; x++){
try{
stFinal[x].join(Router.routeTimeout);
}catch(Exception e){
_logger.error("Cannot join send process",e);
}
if(stFinal[x].isAlive()){
stFinal[x].interruptSending();
}
}
}
};
waiter.start();
}
Thread.currentThread().setPriority(oldPriority);
return wm;
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
Thread.currentThread().setPriority(oldPriority);
return wm;
}
}
private SenderThread[] retransmitMessage(MessageWrapper wm) {
try {
Thread.sleep(1000);
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
}
try {
if (!wm.getMessage().getSource().equals("") && wm.getMessage().getSource().equals(this.getIdent())) {
try {
MessageSigner.getInstance().sign(wm.getMessage());
}
catch (Exception e) {
_logger.error("Failed signing broadcast message", e);
}
}
if (wm.getRetrasmissions() >= Ant.maxRetransmissions)
throw new Exception("Retransmissions exceeeded");
_logger.debug(this.getShortId() + ": Retransmitting Message from id = " +
this.getShortId() + " to id = " + wm.getMessage().getDest() +
" message id = " + wm.getMessage().getAck_Id());
wm.getMessage().resetDelivered();
/*if (wm.getRetrasmissions() < Ant.maxRetransmissionsForceDirection) {
wm.notToBeConsidered = new ArrayList();//Collections.synchronizedList(new ArrayList())
}
else*/ if (!wm.notToBeConsidered.contains(wm.getRoutedTo())) {
wm.notToBeConsidered.add(wm.getRoutedTo());
this.generateNetModificationAlert(wm.getMessage(), wm.getRoutedTo());//
}
Neighbour n = this.checkForRoute(wm, wm.notToBeConsidered);
if (n != null) {
SenderThread[] st = new SenderThread[1];
st[0] = n.route(wm.getMessage());
return st;
}else{
if(wm.notToBeConsidered.size() == 0){
ArrayList unreachableNeighbours = new ArrayList();
unreachableNeighbours.add(wm.getMessage().getDest());
this.generateNetModificationAlert(unreachableNeighbours, this.getIdent());
}
throw new Exception(this.getShortId() +
": Destination (retransmission) unreachable for message with id = " +
wm.getMessage().getAck_Id().substring(0,10) + " type = " +
wm.getMessage().getType());
}
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
return null;
}
}
public Neighbour checkForRoute(MessageWrapper wm, List notToBeConsidered) throws Exception{
Neighbour selectedNeighbour = null;
String neighbourId = null;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(wm.getMessage().getDest());
if(rte != null && rte.getIP() != null){
try{
selectedNeighbour = new DirectNeighbour(rte.getIP(), this);
neighbourId = selectedNeighbour.getIdent();
}
catch (Exception e) {
rte.firewalled = true;
}
}
if(selectedNeighbour == null){
for (int x = 0; rte != null && x < rte.getRoutes().size(); x++) {
String routingTo = (String) rte.getRoutes().get(x);
Neighbour n;
try{
n = new DirectNeighbour(routingTo, this);
}catch(Exception e){
n = this.getNeighbour(routingTo);
}
if (n == null || notToBeConsidered.contains(routingTo)) {
continue;
}
else if (selectedNeighbour == null ||
selectedNeighbour.getQueuedMessages() > n.getQueuedMessages()) {
selectedNeighbour = n;
neighbourId = routingTo;
}
}
}
if (selectedNeighbour != null && neighbourId != null) {
rte.setLastTimeUsed();
if(wm.getMessage().getType() != 1) this.traceMyMessage(wm, neighbourId);
_logger.debug(this.getShortId() + ": Routing message with id = " +
wm.getMessage().getAck_Id() + " type = " +
wm.getMessage().getType() + " to id = " +
neighbourId);
}
return selectedNeighbour;
}
public boolean existRouteTo(String dest){
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(dest);
if(rte != null && (rte.getIP() != null || rte.getRoutes().size() > 0))
return true;
else
return false;
}
public Neighbour checkForRoute(Message m, String requirer, List notToBeConsidered, boolean alreadySeen) throws Exception{
Neighbour selectedNeighbour = null;
String neighbourId = null;
RoutingTableElement rte = (RoutingTableElement)this.routingTable.get(m.getDest());
if(rte != null && rte.getIP() != null){
try{
selectedNeighbour = new DirectNeighbour(rte.getIP(), this);
neighbourId = selectedNeighbour.getIdent();
}catch(Exception e){
rte.firewalled = true;
}
}
if(selectedNeighbour == null){
for (int x = 0; rte != null && x < rte.getRoutes().size(); x++) {
String routingTo = (String) rte.getRoutes().get(x);
Neighbour n;
try{
n = new DirectNeighbour(routingTo, this);
}catch(Exception e){
n = this.getNeighbour(routingTo);
}
if (n == null || notToBeConsidered.contains(routingTo) ||
routingTo.equals(requirer)) {
continue;
}
else if (selectedNeighbour == null ||
selectedNeighbour.getQueuedMessages() >
n.getQueuedMessages()) {
selectedNeighbour = n;
neighbourId = routingTo;
}
}
}
if (selectedNeighbour != null && neighbourId != null) {
rte.setLastTimeUsed();
if(alreadySeen){
int index = this.inTransitMessages.indexOf(m);
MessageWrapper wm = (MessageWrapper)this.inTransitMessages.get(index);
wm.setRoutedTo(neighbourId);
}else{
this.traceMessage(m, requirer, neighbourId);
}
_logger.debug(this.getShortId() + ": Routing message with id = " +
m.getAck_Id() + " type = " +
m.getType() + " to id = " +
neighbourId);
}
return selectedNeighbour;
}
public void uPnPMapping(){
try{
UPnPManager upnpManager = UPnPManager.instance();
int result = 0;
if (upnpManager.isNATPresent())
result = upnpManager.mapPort(this.serverPort);
if (result > 0) {
this.localNattedInetAddress = this.localInetAddress;
this.localInetAddress = upnpManager.getNATAddress().getHostAddress();
this.localNattedPort = this.serverPort;
this.serverPort = result;
}
}catch (Exception ex) {
_logger.error("", ex);
}
}
public void run() {
try {
ss = new SecureServer(this, this.serverPort); //Does also UPnP port mapping
if (ss.getServerSocket() == null) return;
if(this.upnp) UPnPManager.instance().startDevice();
//Update LAN address
ants.p2p.query.ServerInfo si = new ants.p2p.query.ServerInfo("", getLanAddress(), new Integer(getServerPort()), "");
_logger.info("Updating UPnP LAN address: "+si);
ants.p2p.utils.net.UPnPManager.instance().setCurrentLanAddress(si);
while (!terminate) {
try {
System.gc();
_logger.info("Running ANts main loop...");
for (int z = 0; z < 3; z++) {
int sleepTime = 1000;
for(int indx = 0; indx < 60; indx++){
this.sleep(sleepTime);
//this.propertyChangeSupport.firePropertyChange("updatePacketCompressionStats", null, null);
}
try {
for (int x = this.neighbours.size() - 1; x >= 0; x--) {
for (int y = x - 1; y >= 0; y--) {
if (this.neighbours.get(x).equals(this.neighbours.get(y))) {
this.removeNeighbour( (NeighbourAnt)this.neighbours.get(x));
_logger.info("Removing duplicated neighbour...");
break;
}
}
}
boolean[] activeUploadNeighbour = new boolean[this.neighbours.size()];
boolean[] activeDownloadNeighbour = new boolean[this.neighbours.size()];
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt toBeRemoved = (NeighbourAnt)this.neighbours.get(x);
_logger.info(toBeRemoved + " Alive: " + toBeRemoved.isAlive() + " Connected:"+toBeRemoved.isConnected());
if (! toBeRemoved.isAlive() ||
! toBeRemoved.isConnected()) {
this.removeNeighbour(toBeRemoved);
propertyChangeSupport.firePropertyChange("removedNeighbour", null, toBeRemoved);
_logger.info("Removing stuck neighbour...");
}else if(toBeRemoved.getLastProbedAt() < System.currentTimeMillis() - probeCheckInterval) {
toBeRemoved.terminate();
propertyChangeSupport.firePropertyChange("removedNeighbour", null, toBeRemoved);
_logger.info("Removing stuck neighbour...");
}else{
NeighbourAnt current = (NeighbourAnt)this.neighbours.get(x);
if (System.currentTimeMillis() - current.getLastActiveDownloadTime() >
Ant.messageTimeout * Ant.maxRetransmissions) {
activeDownloadNeighbour[x] = false;
}else{
activeDownloadNeighbour[x] = true;
}
if(System.currentTimeMillis() - current.getLastActiveUploadTime() >
Ant.messageTimeout * Ant.maxRetransmissions){
activeUploadNeighbour[x] = false;
}else{
activeUploadNeighbour[x] = true;
}
//this.checkNeighboursActivity(activeDownloadNeighbour, activeUploadNeighbour);
}
}
}
catch (Exception e) {
_logger.error(this.getShortId() + ": Error in removing stuck neighbour", e);
}
}
//synchronized(this){
while (this.failedMessages.size() > Ant.maxFailedMessageToTrace) {
this.failedMessages.remove(0);
}
//}
Enumeration netMods = this.netModifications.keys();
while (netMods.hasMoreElements()) {
String mod = (String) netMods.nextElement();
if (System.currentTimeMillis() -
( (Long)this.netModifications.get(mod)).longValue() >
netModificationsTimeout) {
this.netModifications.remove(mod);
}
}
Enumeration routingTableElementKeys = this.routingTable.keys();
ArrayList un = new ArrayList();
while (routingTableElementKeys.hasMoreElements()) {
String routingTableElementKey = (String) routingTableElementKeys.nextElement();
if (System.currentTimeMillis() -
( (RoutingTableElement)this.routingTable.get(routingTableElementKey)).getLastTimeUsed() >
routeInactiveTimeout) {
this.routingTable.remove(routingTableElementKey);
un.add(routingTableElementKey);
}
}
this.generateNetModificationAlert(un, this.getIdent());
//synchronized (this.myMessages) {
if (this.myMessages.size() != 0) {
_logger.debug(this.getShortId() + ": Processing " +
this.myMessages.size() +
" messages for eventual retransmission...");
_logger.debug("There are " + this.failedMessages.size() +
" messages failed!");
ArrayList toBeRemoved = new ArrayList();
for (int x = 0; x < this.myMessages.size(); x++) {
MessageWrapper wm = (MessageWrapper)this.myMessages.get(x);
if (wm.getLifetime() > Ant.messageTimeout) {
if (wm.getRetrasmissions() + 1 < Ant.maxRetransmissions) {
wm.retrasmitted(this);
SenderThread[] st = this.route(wm,null,null,null,false,true);
if (st != null) {
final SenderThread[] stFinal = st;
Thread waiter = new Thread() {
public void run() {
for (int x = 0; x < stFinal.length; x++) {
try {
stFinal[x].join(Router.routeTimeout);
}
catch (Exception e) {
_logger.error("Cannot join send process", e);
}
if (stFinal[x].isAlive()) {
stFinal[x].interruptSending();
}
}
}
};
waiter.start();
}
_logger.debug("Retransmited Message: " +
wm.getMessage().getAck_Id() +
wm.getMessage().getType());
}
else {
if (wm.getMessage().getType() == 2) {
_logger.info(this.getShortId() +
": Broadcast message timed out " +
wm.getMessage().getDest());
}
else if (wm.getMessage().getType() == 0 &&
wm.getMessage().getSource().equals("")) {
_logger.info(this.getShortId() +
": Masked message timed out " +
wm.getMessage().getDest());
}
else {
_logger.info(this.getShortId() +
": Destination unreachable " +
wm.getMessage().getDest().substring(0,10));
//synchronized (this.failedMessages) {
this.failedMessages.add(wm);
//}
}
toBeRemoved.add(wm);
}
}
}
synchronized(this){
for (int x = toBeRemoved.size() - 1; x >= 0; x--) {
MessageWrapper wm = (MessageWrapper) toBeRemoved.get(x);
if (this.myMessages.remove(wm)) {
_logger.info("Removed Message from myMessages: " +
wm.getMessage().getAck_Id().substring(0,10) +
wm.getMessage().getType());
}
}
}
//this.myMessages.trimToSize();
}
//}
}catch (Exception cycleException) {
_logger.error("ANt(ID_" + this.getShortId() + ") main cycle error!",
cycleException);
}
}
_logger.error("ANts MAIN ENGINE TERMINATED********************************************************");
}
catch (Exception e) {
this.terminate = true;
_logger.info(this.getShortId() + " - Failure in setting up Ant server port", e);
_logger.error("ANts MAIN ENGINE TERMINATED********************************************************");
}
}
private void checkNeighboursActivity(boolean[] activeDownload, boolean[] activeUpload){
int activeNumber = 0;
int activeUp = 0;
int activeDown = 0;
final int maxActiveOneWay = (int)Math.floor(Ant.maxNeighbours * 2.0 / 3);
for(int x = 0; x < activeDownload.length; x++){
if (activeDownload[x] == true) {
activeDown++;
}
if (activeUpload[x] == true) {
activeUp++;
}
if (activeUpload[x] == true ||
activeDownload[x] == true) {
activeNumber++;
}
}
if((this.neighbours.size() - activeNumber) > Ant.maxNeighbours * 2.0 / 3){
int toBeRemoved = -1;
long currentPing = 0;
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt current = (NeighbourAnt)this.neighbours.get(x);
if(activeDownload[x] == false && activeUpload[x] == false &&
current.getTimeElapsed() > currentPing){
currentPing = current.getTimeElapsed();
toBeRemoved = x;
}
}
if(toBeRemoved >= 0){
_logger.info("Neighbour "+this.neighbours.get(toBeRemoved)+" is not active up/down, removing...");
this.removeNeighbour( (NeighbourAnt)this.neighbours.get(toBeRemoved));
}
}
else if (activeNumber > Ant.maxNeighbours * 2.0 / 3) {
if (activeUp == this.neighbours.size() &&
(activeUp - activeDown) > maxActiveOneWay) {
int toBeRemoved = -1;
long currentPing = 0;
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt current = (NeighbourAnt)this.neighbours.get(x);
if (activeDownload[x] == false && activeUpload[x] == true &&
current.getTimeElapsed() > currentPing) {
currentPing = current.getTimeElapsed();
toBeRemoved = x;
}
}
if (toBeRemoved >= 0) {
_logger.info("Neighbour " + this.neighbours.get(toBeRemoved) +
" is not active down, removing...");
this.removeNeighbour( (NeighbourAnt)this.neighbours.get(toBeRemoved));
}
}
else if (activeDown == this.neighbours.size() &&
(activeDown - activeUp) > maxActiveOneWay) {
int toBeRemoved = -1;
long currentPing = 0;
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt current = (NeighbourAnt)this.neighbours.get(x);
if (activeDownload[x] == true && activeUpload[x] == false &&
current.getTimeElapsed() > currentPing) {
currentPing = current.getTimeElapsed();
toBeRemoved = x;
}
}
if (toBeRemoved >= 0) {
_logger.info("Neighbour " + this.neighbours.get(toBeRemoved) +
" is not active up, removing...");
this.removeNeighbour( (NeighbourAnt)this.neighbours.get(toBeRemoved));
}
}
}
}
protected void processMessage(Message m, Router r) throws Exception {
_logger.debug(this.getShortId() +
": Received Message = " +
m.getAck_Id() +
" type = " + m.getType() +
" from a peer running version: " + m.getVersion());
}
public PropertyChangeSupport getPropertyChangeSupport() {
return this.propertyChangeSupport;
}
public void disconnect() {
try {
for (int x = 0; x < this.neighbours.size(); x++) {
NeighbourAnt na = ( (NeighbourAnt)this.neighbours.get(x));
na.terminate();
}
if (ss.getServerSocket() != null) {
ss.getServerSocket().close();
}
UPnPManager.instance().stopDevice();
if(UPnPManager.instance().mappingsExist()){
UPnPManager.instance().clearMappingsOnShutdown();
this.localInetAddress = this.localNattedInetAddress;
this.serverPort = this.localNattedPort;
}
this.terminate = true;
}
catch (Exception e) {
_logger.error(this.getShortId() + "",e);
}
}
public boolean isDisconnected() {
return this.terminate;
}
}
The table below shows all metrics for Ant.java.




