ReadWorker.java
| Index Score | ||
|---|---|---|
![]() |
![]() |
org.furthurnet.furi |
![]() |
![]() |
Furthurnet |
View: Reasons, Metrics, Source Code
These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.
/*
* FURI - A distributed peer-to-peer file sharing system.
* Copyright (C) 2000-2002 William W. Wong, Furthur Network
* williamw@jps.net
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.furthurnet.furi;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Vector;
import org.furthurnet.datastructures.supporting.Common;
import org.furthurnet.datastructures.supporting.IntegerPointer;
import org.furthurnet.servergui.FileSetInfo;
import org.furthurnet.xmlparser.IrcSpec;
import org.furthurnet.xmlparser.XmlTags;
public class ReadWorker implements Runnable
{
// FURTHUR CONNECT/0.4\n\n - JMA 8/24/01
private static final String sRequestSignature = "FURTHUR CONNECT/1.1";
private static final String sReplySignature = "FURTHUR 1.1 OK";
protected Host mRemoteHost;
private byte[] mHeaderBuf; // pre-allocated buffer for
// repeated uses.
private FlexBuf mBuf1; // general buf for read/send
private FlexBuf mBuf2; // general buf for read/send
private FlexBuf mBodyBuf; // Buf for reading msg body
private int mSleep = 1;
private int mSleepBeforeStart = 0;
protected ServiceManager mManager = ServiceManager.getManager();
protected HostManager mHostMgr = ServiceManager.getHostManager();
protected SendManager mSendMgr = ServiceManager.getSendManager();
private ShareManager mShareMgr = ServiceManager.getShareManager();
private MsgManager mMsgMgr = ServiceManager.getMsgManager();
private IrcManager mIrcMgr = ServiceManager.getIrcManager();
private MainFrame mainFrame = mManager.getMainFrame();
private int remoteListeningPort = 0;
// JMA
private String myVersion = null;
private static final int mHeaderBufSize = 1024;
public ReadWorker(Host remoteHost)
{
myVersion = Res.getStr("Program.Version");
mRemoteHost = remoteHost;
//mHeaderBuf = new byte[MsgHeader.sDataLength];
mHeaderBuf = new byte[mHeaderBufSize];
mBuf1 = new FlexBuf();
mBuf2 = new FlexBuf();
mBodyBuf = new FlexBuf();
}
public ReadWorker(Host remoteHost, int sleepBeforeStart)
{
this(remoteHost);
mSleepBeforeStart = sleepBeforeStart;
(new Thread(this)).start();
}
public void run()
{
FurthurThread.logPid("furi.ReadWorker " + hashCode());
if (! mRemoteHost.acquireByWorker())
{
// Already has a worker working on it. Just exit.
return;
}
if (mSleepBeforeStart > 0)
{
try
{
Thread.sleep(mSleepBeforeStart);
}
catch (Exception e)
{
}
}
// JMA 9/26
// Definite hack. On a new connection, pass the remote
// listening port in as a negative sleep time
if (mSleepBeforeStart < 0)
remoteListeningPort = -mSleepBeforeStart;
try
{
if (mRemoteHost.getType() == Host.sTypeOutgoing)
{
// Connect to remote host.
mRemoteHost.setStatus(Host.sStatusConnecting, "");
try
{
connectToRemoteHost();
if (mRemoteHost.getStatus() == Host.sStatusTimeout)
{
// Connecting has been taken too long.
throw new Exception("Timed out.");
}
}
catch (Exception e3)
{
mHostMgr.setHostCaughtConnectionFailed(mRemoteHost.getHostAddr(), true);
throw e3;
}
// Negotiate handshake
mRemoteHost.setStatus(Host.sStatusConnecting, "Negotiate handshake.");
negotiateHandShakeAsClient();
// Connection to remote gnutella host is completed at this point.
mRemoteHost.setStatus(Host.sStatusConnected, "");
mHostMgr.addConnectedHost(mRemoteHost);
// IDN 8/30/02: Always start up a FirewallTest
// It tries to connect to the other host so they learn their
// firewall status. It is not for discovering our own status.
(new FirewallTest(mRemoteHost.getHostAddr(), -1)).start();
// queue first Init msg to send.
mSendMgr.queueMsgToSend(mRemoteHost, mMsgMgr.getMyMsgInit(),
false);
processIncomingData();
}
else if (mRemoteHost.getType() == Host.sTypeIncoming)
{
ConnectionRequest req = negotiateHandShakeAsServer();
// See if the incoming connection is from a gnutella client.
if (req.getMethod().equals(ConnectionRequest.sGnutellaConnect))
{
// Incoming connection is from a gnutella client.
if (mHostMgr.getConnectedHostCount() >= ServiceManager.getCfg().mNetMaxConnection)
{
// Exceed total connection limit (both incoming and outgoing).
// Refuse additional incoming connection.
if (!mainFrame.isAddressServer())
return;
// JMA - 9/28/01
// accept new and disconnect old
ServiceManager.getHostManager().disconnectFirstConnectedHost();
}
// same subnet and same listening port is shady, probably myself - disconnect
if ((remoteListeningPort == ServiceManager.getCfg().mListeningPort) && (localSubnetConnection(mRemoteHost.getHostAddr())))
return;
// IDN 8/30/02: Always start up a FirewallTest
// It tries to connect to the other host so they learn their
// firewall status. It is not for discovering our own
// status.
(new FirewallTest(mRemoteHost.getHostAddr(),
remoteListeningPort)).start();
byte[] buf = mBuf1.getBuf(1024);
int len;
// Send reply signature back to remote host
len = IOUtil.serializeString(sReplySignature + "\n\n", buf, 0);
mRemoteHost.getOs().write(buf, 0, len);
mHostMgr.addIncomingHost(mRemoteHost);
mHostMgr.addConnectedHost(mRemoteHost);
processIncomingData();
}
else
{
throw new Exception("Unknown incoming request type.");
}
}
else if (mRemoteHost.getType() == Host.sTypeIRC)
{
processIRC();
}
else
{
throw new Exception("Invalid host type");
}
}
catch (Exception e)
{
mRemoteHost.setStatus(Host.sStatusError, e.getMessage());
// System.out.println(e.getMessage());
// e.printStackTrace();
}
finally
{
// Close the connection. Need to close even for error.
// Half-closed connection is bad for TCP.
if (mRemoteHost != null)
{
mRemoteHost.closeConnection();
mRemoteHost.releaseFromWorker();
}
}
}
protected void connectToRemoteHost() throws IOException
{
try
{
URL tmpUrl = new URL("http://" + mRemoteHost.getHostAddr());
mRemoteHost.markConnectionStartTime();
Socket sock = new Socket(tmpUrl.getHost(), tmpUrl.getPort());
// I am connected to the remote host at this point.
mRemoteHost.setSock(sock);
mRemoteHost.setOs(sock.getOutputStream());
mRemoteHost.setIs(sock.getInputStream());
}
catch (UnknownHostException e)
{
throw new IOException("Unknown host.");
}
}
protected void negotiateHandShakeAsClient()
throws Exception
{
// Send the first handshake greeting to the remote host.
String greeting = firewallTestInfo() + ServiceManager.getNetworkManager().getFullGreeting() + "\n\n";
byte[] buf = mBuf1.getBuf(1024);
int len = IOUtil.serializeString(greeting, buf, 0);
mRemoteHost.getOs().write(buf, 0, len);
try
{
while (true)
{
// Read reply from server.
StringBuffer strBuf = new StringBuffer();
len = IOUtil.readToCRLF(mRemoteHost.getIs(), buf, 1024, 0);
IOUtil.deserializeString(buf, 0, len, strBuf);
String reply = strBuf.toString();
if (reply.equals(sReplySignature))
{
// Normal Gnutella reply.
return;
}
// I don't recognize any other reply.
throw new Exception("Not a Furthur Client.");
}
}
catch (IOException e) {
throw new IOException("Disconnected from remote host during initial handshake");
}
catch (Exception e)
{
throw e;
}
}
private ConnectionRequest negotiateHandShakeAsServer()
throws Exception
{
if (!ServiceManager.getNetworkManager().getJoined())
{
throw new Exception("Network not joined.");
}
if (mHostMgr.isHostIgnored(mRemoteHost.getHostAddr()))
{
throw new Exception("Host is ignored.");
}
// should be "GNUTELLA CONNECT/0.4\n\n"
InputStream is = mRemoteHost.getIs();
if (is == null)
{
// IS from getIs() has been cleared when connection closed.
throw new IOException("Disconnected from remote host during initial handshake");
}
byte[] buf = mBuf1.getBuf(1024);
int lenRead = IOUtil.readToCRLF(is, buf, 1024, 0);
StringBuffer strBuf = new StringBuffer();
IOUtil.deserializeString(buf, 0, lenRead, strBuf);
return (new ConnectionRequest(strBuf.toString()));
}
protected void processIncomingData()
throws Exception
{
mRemoteHost.setStatus(Host.sStatusConnected, "");
try
{
while (true)
{
MsgHeader header = readHeader();
byte[] body = readBody(header);
mRemoteHost.incReceivedCount();
mHostMgr.incStatMsgCount(1);
mHostMgr.incStatTakenCount(header.getHopsTaken());
switch (header.getFunction())
{
case MsgHeader.sInit:
handleInit(header, body);
break;
case MsgHeader.sInitResponse:
handleInitResponse(header, body);
break;
case MsgHeader.sPushRequest:
handlePushRequest(header, body);
break;
case MsgHeader.sQuery:
handleQuery(header, body);
break;
case MsgHeader.sQueryResponse:
handleQueryResponse(header, body);
break;
default:
mainFrame.incInvalidMsgs();
handleUnknown(header, body);
break;
}
}
}
catch (Exception e)
{
if (mRemoteHost.getSock() != null)
{
mRemoteHost.setStatus(Host.sStatusError, e.getMessage());
}
}
}
private MsgHeader readHeader()
throws Exception
{
int lenRead;
int len;
InputStream is = mRemoteHost.getIs();
int status = -1;
int tries = 0;
int offset;
// resolve: reuse msg header from free list.
MsgHeader header = new MsgHeader(new GUID(true));
if (is == null)
{
// IS from getIs() has been cleared when connection closed.
throw new Exception("Connection closed by remote host");
}
while (status == -1) {
lenRead = 0;
offset = tries;
if (offset + MsgHeader.sDataLength > mHeaderBufSize) {
mainFrame.incInvalidMsgs();
mRemoteHost.log("Couldn't sync with remote host");
throw new Exception("Couldn't sync up with remote host");
}
if (tries > 0) {
len = is.read(mHeaderBuf, offset + MsgHeader.sDataLength - 1, 1);
if (len != 1) {
throw new Exception("Connection closed by remote host");
}
lenRead = 1;
}
else {
while (lenRead < MsgHeader.sDataLength)
{
len = is.read(mHeaderBuf, lenRead, MsgHeader.sDataLength - lenRead);
if (len == 0 || len == -1) {
throw new Exception("Connection closed by remote host");
}
lenRead += len;
}
}
mHostMgr.incBytesCount(lenRead);
mHostMgr.downstreamMonitor.addBytes(lenRead);
// Headers are never compressed
status = header.deserialize(mHeaderBuf, offset, false);
//
// If we receive a status of -1, it means there was bogus data in the header,
// probably the result of a dropped packet or something that caused us to now
// read from, say, the middle of a data stream instead of the beginning of a
// message. We'll try to sync back up by reading one byte at a time to see if
// we can find a valid message header.
//
if (status == -1)
tries++;
}
//if (tries > 0) {
// System.out.println("Sync after " + tries + " bytes shifted");
// System.out.println(header.toString());
//}
header.setArrivalTime(System.currentTimeMillis());
header.setFromHost(mRemoteHost);
mSleep = mHostMgr.throttleControl(mSleep, 4000, mHostMgr.getRate(),
ServiceManager.getCfg().mNetMaxRate);
return header;
}
private byte[] readBody(MsgHeader header)
throws Exception
{
int lenToRead;
int lenRead = 0;
int len;
int size;
byte[] body;
InputStream is = mRemoteHost.getIs();
lenToRead = header.getDataLen();
body = mBodyBuf.getBuf(lenToRead);
if (is == null)
{
// IS from getIs() has been cleared when connection closed.
// mRemoteHost.log("Disconnected from remote host during reading msg body");
throw new Exception("Connection closed by remote host");
}
while (lenRead < lenToRead)
{
size = lenToRead - lenRead;
if (size > 1024 * 50)
size = 1024 * 50;
len = is.read(body, lenRead, size);
if (len == 0 || len == -1)
{
// mRemoteHost.log("Disconnected from remote host during reading message body.");
throw new Exception("Connection closed by remote host");
}
lenRead += len;
mHostMgr.incBytesCount(len);
mSleep = mHostMgr.throttleControl(mSleep, 4000, mHostMgr.getRate(),
ServiceManager.getCfg().mNetMaxRate);
}
mHostMgr.downstreamMonitor.addBytes(lenRead);
return body;
}
private void handleInit(MsgHeader header, byte[] body)
throws Exception
{
MsgInit msg = new MsgInit(header);
msg.deserialize(body, 0, false); // INIT is never compressed
// mRemoteHost.log("Got msg " + msg);
// See if I have seen this Init before. Drop msg if duplicate.
if (mMsgMgr.checkAndAddMsgSeen(msg))
{
mRemoteHost.incDropCount();
mHostMgr.incStatDropCount(1);
// mRemoteHost.log("Seen " + msg);
return;
}
// Add the Init msg to the routing table so that I know where
// to route the InitResponse back.
mMsgMgr.addToRoutingTable(msg.getHeader().getMsgID(), mRemoteHost);
// Forward the Init msg to other connected hosts except the one sent it.
mHostMgr.forwardMsg(msg, mRemoteHost);
// Get my host:port for InitResponse.
Listener listener = ServiceManager.getListener();
// Construct InitResponse msg. Copy the original Init's GUID.
MsgHeader newHeader = new MsgHeader(header.getMsgID());
MsgInitResponse response = new MsgInitResponse(newHeader);
newHeader.setTTL(header.getHopsTaken() + 1); // Will take as many hops to get back.
response.setPort((short)listener.getListeningPort());
response.setIP(listener.getSelfHostAddress());
response.setFileCount(mShareMgr.getFileCount());
response.setTotalSize(mShareMgr.getTotalFileSize());
mSendMgr.queueMsgToSend(mRemoteHost, response, false);
}
private void handleInitResponse(MsgHeader header, byte[] body)
throws Exception
{
MsgInitResponse msg = new MsgInitResponse(header);
msg.deserialize(body, 0, false); // INIT RESPONSE is never compressed
// mRemoteHost.log("Got msg " + msg);
//
// Check for compression support
//
mRemoteHost.setSupportsCompression(msg);
if (!mHostMgr.isHostInvalid(msg.getIP().getHostAddress()))
{
mHostMgr.addAndSaveHostCaught(msg.getIP().getHostAddress() + ":" + msg.getPort());
}
// Handle init response to my origianl init.
MsgInit msgInit = mMsgMgr.getMyMsgInit();
if (msg.getHeader().getMsgID().equals(msgInit.getHeader().getMsgID()) && !mHostMgr.isNoMoreStats())
{
// This InitResponse is for me.
// mRemoteHost.log("Got response to my init. " + msg);
mHostMgr.incStatHosts(1);
mHostMgr.incStatFiles(msg.getFileCount());
mHostMgr.incStatSize(msg.getTotalSize());
return;
}
// See if the response is to my latency ping to neighbor.
if (mRemoteHost.checkPingResponse(msg))
{
// Yes. Processed.
return;
}
// InitResponse is a response to an Init msg.
// Did I forward that Init msgID before?
Host returnHost = mMsgMgr.getRouting(msg.getHeader().getMsgID());
if (returnHost == null)
{
// mRemoteHost.log("Don't route the InitResponse since I didn't forward the Init msg. " + msg);
return;
}
// Ok, I did forward the Init msg on behalf of returnHost.
// The InitResponse is for returnHost. Better route it back.
if (mHostMgr.decTTL(msg))
{
// mRemoteHost.log("TTL expired " + msg);
return;
}
mSendMgr.queueMsgToSend(returnHost, msg, false);
}
// JMA 8/28/01 - remove functionality - only local searching
private void handleQuery(MsgHeader header, byte[] body)
throws Exception
{
MsgQuery msg = new MsgQuery(header);
msg.deserialize(body, 0, mRemoteHost.getSupportsCompression());
// mRemoteHost.log("Got msg " + msg);
// See if I have seen this Query before. Drop msg if duplicate.
// but since it may be from duplicate hosts, don't count it as
// a duplicate
// mRemoteHost.incDropCount();
// mHostMgr.incStatDropCount(1);
if (mMsgMgr.checkAndAddMsgSeen(msg))
{
// mRemoteHost.log("Seen " + msg);
return;
}
// Add the Query msg to the routing table so that I know where
// to route the QueryResponse back.
mMsgMgr.addToRoutingTable(msg.getHeader().getMsgID(), mRemoteHost);
// Add to the net search history.
mMsgMgr.addNetSearch(msg); // This might be causes the memory increase
// Forward the Query msg to other connected hosts except the one sent it.
mHostMgr.forwardMsg(msg, mRemoteHost);
// Perform search on my list.
if (msg.getMinSpeed() > ServiceManager.getCfg().mNetMySpeed)
{
return;
}
// Search the sharefile database and get groups of sharefiles.
Vector list = null;
IntegerPointer isIdSearch = new IntegerPointer(0);
try
{
list = mShareMgr.searchFile(msg.getSearchString(), isIdSearch);
}
catch (Exception e)
{
return;
}
if (list.size() == 0)
{
// mRemoteHost.log("Not found");
return;
}
Listener listener = ServiceManager.getListener();
// build a MsgQueryResponse packet
// Construct QueryResponse msg. Copy the original Init's GUID.
MsgHeader newHeader = new MsgHeader(header.getMsgID());
MsgQueryResponse response = new MsgQueryResponse(newHeader);
int recCount = list.size();
if (recCount > 100)
recCount = 100;
newHeader.setTTL(header.getHopsTaken() + 1); // Will take as many hops to get back.
response.setNumRecords(recCount);
response.setRemoteHostSpeed(ServiceManager.getCfg().mNetMySpeed);
response.setRemoteClientID(mManager.getClientID());
double speed = ServiceManager.getCfg().mSpeedIndex;
FileSetInfo sfile = null;
for (int i = 0; i < recCount; i++)
{
sfile = (FileSetInfo)list.elementAt(i);
int size = (int)(sfile.totalBytes);
String resStr = generateQueryHitXml(sfile.fileSetName,
sfile.fileId, sfile.totalBytes,
speed, sfile.furthurAttributes,
isIdSearch.value == 1);
response.addMsgRecord(new MsgResRecord(0, size, resStr));
}
response.setRemoteListeningPort((short)listener.getListeningPort());
response.setRemoteHost(listener.getSelfHostAddress());
// Send message
mSendMgr.queueMsgToSend(mRemoteHost, response, false);
}
private String generateQueryHitXml(String fileSetName, String fileSetId,
long size, double speed,
String attributes, boolean isIdSearch)
{
if (isIdSearch)
attributes = ""; // don't send the attributes for an id query
return "<" + XmlTags.QUERY_RESULT_ITEM + " " +
XmlTags.NAME + "=\"" + fileSetName + "\"" + " " +
XmlTags.ID + "=\"" + fileSetId + "\"" + " " +
// these 2 are bogus (don't break old clients though)
XmlTags.SIGNATURE + "=\"" + "null" + "\"" + " " +
XmlTags.USER + "=\"" + "admin" + "\"" + " " +
XmlTags.SIZE + "=\"" + new Long(size).toString() + "\"" + " " +
XmlTags.SPEED + "=\"" +
new Double(speed).toString() + "\"" + " " +
XmlTags.FIREWALL + "=\"" + firewallCode() + "\"" + " " +
XmlTags.ANTELOPE + "=\"" + antelopeCode() + "\"" + " " +
XmlTags.REMOTE_ID + "=\"" +
mManager.getClientID().toHexString() + "\"" + " " +
XmlTags.FURTHUR_VERSION + "=\"" + myVersion + "\"" + " " +
XmlTags.PCP_IP + "=\"" +
ServiceManager.getListener().getMyIp() + "\"" + " " +
XmlTags.PCP_PORT + "=\"" +
Integer.toString(mainFrame.getPcpServiceMgrPort()) + "\"" + " " +
XmlTags.OPEN_PORT + "=\"" +
Integer.toString(ServiceManager.getListener().getListeningPort()) + "\"" + " " +
XmlTags.NICKNAME + "=\"" +
ServiceManager.getCfg().mIrcNickname + ServiceManager.getCfg().mIrcNickIdentifier + "\"" + ">" +
// XmlTags.PCP_PASSWORD + "=\"" + "" + "\"" + ">" +
attributes + "</" + XmlTags.QUERY_RESULT_ITEM + ">";
}
public static String firewallCode()
{
if (ServiceManager.getManager().getMainFrame().isFirewall())
return "F"; // firewall
else
return "N"; // not firewall
}
private String antelopeCode()
{
if (mainFrame.canAcceptAntelope(1))
return "Y"; // firewall
else
return "N"; // not firewall
}
private void handleQueryResponse(MsgHeader header, byte[] body)
throws Exception
{
MsgQueryResponse msg = new MsgQueryResponse(header);
msg.deserialize(body, 0, mRemoteHost.getSupportsCompression());
//System.out.println("Got msg " + msg);
//System.out.println("len " + (i +=body.length));
mMsgMgr.addToPushRoutingTable(msg.getRemoteClientID(), mRemoteHost);
mMsgMgr.processQueryResponse(mRemoteHost, msg);
// QueryResponse is a response to a Query msg.
// Did I forward that Query msgID before?
Host returnHost = mMsgMgr.getRouting(msg.getHeader().getMsgID());
if (returnHost == null)
{
// mRemoteHost.log("Don't route the QueryResponse since I didn't forward the Query msg. " + msg);
return;
}
// Ok, I did forward the Query msg on behalf of returnHost.
// The QueryResponse is for returnHost. Better route it back.
if (mHostMgr.decTTL(msg))
{
// mRemoteHost.log("TTL expired " + msg);
return;
}
mSendMgr.queueMsgToSend(returnHost, msg, false);
}
private void handlePushRequest(MsgHeader header, byte[] body)
throws Exception
{
MsgPushRequest msg = new MsgPushRequest(header);
msg.deserialize(body, 0, mRemoteHost.getSupportsCompression());
// mRemoteHost.log("Got msg " + msg);
if (mManager.getClientID().equals(msg.getClientID()))
{
mainFrame.pushFile(msg);
return;
}
Host returnHost = mMsgMgr.getPushRouting(msg.getClientID());
if (returnHost == null)
{
// mRemoteHost.log("Don't route the PushRequest since I didn't forward the QueryResponse msg. " + msg);
return;
}
// Ok, I did forward the QueryResponse msg on behalf of returnHost.
// The PushRequest is for the returnHost. Better route it back.
if (mHostMgr.decTTL(msg))
{
// mRemoteHost.log("TTL expired " + msg);
return;
}
mSendMgr.queueMsgToSend(returnHost, msg, false);
}
private void handleUnknown(MsgHeader header, byte[] body)
throws Exception
{
MsgUnknown msg = new MsgUnknown(header);
msg.deserialize(body, 0, false); // Ignore compression
// mRemoteHost.log("Got unknown msg " + msg);
// See if I have seen this msg before. Drop msg if duplicate.
if (mMsgMgr.checkAndAddMsgSeen(msg))
{
mRemoteHost.incDropCount();
mHostMgr.incStatDropCount(1);
// mRemoteHost.log("Seen " + msg);
return;
}
// Forward the msg to other connected hosts except the one sent it.
mHostMgr.forwardMsg(msg, mRemoteHost);
}
private void processIRC()
throws Exception
{
//method modified by JAS 12/11/01 to lookup hosts in XML
IrcSpec spec = mainFrame.getIrcSpec();
boolean connected = false;
String version = Common.replaceAll(Res.getStr("Program.Version"), ".", "");
String userName = new String(ServiceManager.getCfg().mIrcUserName + version);
for (int i = 0; i < spec.numAddresses(); i++)
{
try
{
String hostAddr = spec.getAddress(i);
mRemoteHost.setHostAddr(hostAddr);
mRemoteHost.setStatus(Host.sStatusConnecting, "");
connectToRemoteHost();
mRemoteHost.setStatus(Host.sStatusConnecting,
"Negotiate handshake.");
StringBuffer param = new StringBuffer();
param.append(userName);
param.append(" ");
param.append(StrUtil.parseHost(ServiceManager.getListener().getMyServerAddress()));
param.append(" ");
param.append(StrUtil.parseHost(hostAddr));
param.append(" :");
param.append(userName);
MsgIRC msg;
msg = new MsgIRC(null, "NICK", ServiceManager.getCfg().mIrcNickname
+ ServiceManager.getCfg().mIrcNickIdentifier);
mIrcMgr.sendMsg(msg);
msg = new MsgIRC(null, "USER", param.toString());
mIrcMgr.sendMsg(msg);
connected = true;
break;
}
catch (Exception e)
{
ServiceManager.log(e);
}
}
if (! connected)
{
throw new Exception("Failed to connect to IRC servers '" +
spec.addressesToString() + "'");
}
mRemoteHost.setStatus(Host.sStatusConnected, "");
InputStream is = mRemoteHost.getIs();
while (true)
{
byte[] buf = mBuf1.getBuf(1024);
int lenRead = IOUtil.readToCROrLF(is, buf, 1024, 0);
if (lenRead == 0)
continue;
StringBuffer strBuf = new StringBuffer();
IOUtil.deserializeString(buf, 0, lenRead, strBuf);
String text = strBuf.toString();
// System.out.println(text);
mIrcMgr.processIncomingMsg(text, new MsgIRC(text));
mHostMgr.incBytesCount(lenRead);
mHostMgr.downstreamMonitor.addBytes(lenRead);
mSleep = mHostMgr.throttleControl(mSleep, 4000, mHostMgr.getRate(),
ServiceManager.getCfg().mNetMaxRate);
}
}
protected String firewallTestInfo()
{
// when I first establish a connection with a remote host, it will test
// whether or not I am a firewall by making a dummy connection to me.
// I specify my listening port as a negative port number
// to signify this.
return "-" + Integer.toString(ServiceManager.getListener().getListeningPort()) + "\n";
}
private boolean localSubnetConnection(String ip)
{
try
{
String self = ServiceManager.getListener().getSelfHostname();
String remoteSubnet = ip.substring(0, ip.lastIndexOf("."));
String localSubnet = self.substring(0, self.lastIndexOf("."));
if ((ip.equals("0.0.0.0")) || (ip.equals("127.0.0.1")) || (remoteSubnet.equals(localSubnet)))
return true;
}
catch (Exception e)
{}
return false;
}
}
The table below shows all metrics for ReadWorker.java.




