OpenNapServerRunner.java
| Index Score | ||
|---|---|---|
![]() |
![]() |
org.xnap.plugin.opennap.net |
![]() |
![]() |
XNap 3 |
View: Reasons, Metrics, Source Code
These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.
/*
* XNap - A P2P framework and client.
*
* See the file AUTHORS for copyright information.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.xnap.plugin.opennap.net;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.StringTokenizer;
import org.apache.log4j.Logger;
import org.xnap.XNap;
import org.xnap.net.NetHelper;
import org.xnap.plugin.opennap.OpenNapPlugin;
import org.xnap.plugin.opennap.net.msg.MessageFactory;
import org.xnap.plugin.opennap.net.msg.MessageHandler;
import org.xnap.plugin.opennap.net.msg.MessageStrings;
import org.xnap.plugin.opennap.net.msg.client.ClientMessage;
import org.xnap.plugin.opennap.net.msg.client.LoginMessage;
import org.xnap.plugin.opennap.net.msg.client.NewUserLoginMessage;
import org.xnap.plugin.opennap.net.msg.client.NickCheckMessage;
import org.xnap.plugin.opennap.net.msg.server.BrowseResponseMessage;
import org.xnap.plugin.opennap.net.msg.server.EndBrowseMessage;
import org.xnap.plugin.opennap.net.msg.server.EndSearchMessage;
import org.xnap.plugin.opennap.net.msg.server.ErrorMessage;
import org.xnap.plugin.opennap.net.msg.server.InvalidMessageException;
import org.xnap.plugin.opennap.net.msg.server.InvalidNickMessage;
import org.xnap.plugin.opennap.net.msg.server.LoginAckMessage;
import org.xnap.plugin.opennap.net.msg.server.LoginErrorMessage;
import org.xnap.plugin.opennap.net.msg.server.NickAlreadyRegisteredMessage;
import org.xnap.plugin.opennap.net.msg.server.NickNotRegisteredMessage;
import org.xnap.plugin.opennap.net.msg.server.SearchResponseMessage;
import org.xnap.plugin.opennap.net.msg.server.ServerMessage;
import org.xnap.plugin.opennap.util.*;
import org.xnap.util.Preferences;
import org.xnap.util.PriorityQueue;
import org.xnap.util.State;
import org.xnap.util.StringHelper;
public class OpenNapServerRunner extends Thread {
//--- Constant(s) ---
// wait 30 seconds for login ack
//public static final int LOGIN_TIMEOUT = 30 * 1000;
public static final int CONNECT_TIMEOUT = 1 * 30 * 1000;
public static final int MAX_RETRY_ON_CONNECTION_REFUSED = 3;
public static final int SOCKET_TIMEOUT = 0;
//--- Data field(s) ---
private static Logger logger = Logger.getLogger(OpenNapServerRunner.class);
private Socket socket;
private InputStream in;
private OutputStream out;
private OpenNapServer server;
/**
* Remembers the last <code>ErrorMessage</code>.
*/
private String lastError;
private OpenNapSearch currentSearch;
private OpenNapBrowse currentBrowse;
/**
* Queue of pending searches.
*/
private PriorityQueue searchQueue = new PriorityQueue();
/**
* Queue of pending browses.
*/
private LinkedList browseQueue = new LinkedList();
/**
* Queue of pending messages.
*/
private SendQueue sendQueue = new SendQueue(this);
private Object sendLock = new Object();
/**
* If set to true this runner is to die.
*/
private boolean die = false;
/**
* The state description why the runner has died.
*/
private String dieReason;
private boolean useUTF8Encoding;
//--- Constructor(s) ---
public OpenNapServerRunner(OpenNapServer server)
{
super("OpenNapServer " + server.toString());
this.server = server;
this.useUTF8Encoding
= OpenNapPlugin.getPreferences().getUseUTF8Encoding();
}
//--- Method(s) ---
public synchronized void enqueue(OpenNapBrowse browse)
{
if (die) {
browse.getRequest().failed();
}
else {
browseQueue.add(browse);
allowBrowse();
}
}
/**
*
*/
public synchronized void enqueue(OpenNapSearch search)
{
if (die) {
search.getRequest().failed();
}
else {
searchQueue.add(search, search.getPriority());
allowSearch();
}
}
public synchronized void dequeue(OpenNapBrowse browse)
{
browseQueue.remove(browse);
}
/**
* Invoked by {@link OpenNapSearch} when search did not take place or
* was aborted.
*/
public synchronized void dequeue(OpenNapSearch search)
{
searchQueue.remove(search);
}
public synchronized void die(org.xnap.util.State newState, String description)
{
if (!die) {
die = true;
dieReason = description;
server.setState(newState, description);
}
else {
// ups looks like we died multiple times
logger.error("Already dead");
}
}
public void disconnect()
{
die = true;
// FIX: ugly work around to force close immediatelly
try {
socket.close();
}
catch (Exception e) {
}
this.interrupt();
}
private synchronized void close()
{
if (socket != null) {
try {
socket.close();
} catch (IOException e) {}
}
if (in != null) {
try {
in.close();
} catch (IOException e) {}
}
if (out != null) {
try {
out.close();
} catch (IOException e) {}
}
if (currentBrowse != null) {
currentBrowse.getRequest().failed();
}
for (Iterator i = browseQueue.iterator(); i.hasNext();) {
((OpenNapBrowse)i.next()).getRequest().failed();
}
if (currentSearch != null) {
currentSearch.getRequest().failed();
}
for (Iterator i = searchQueue.iterator(); i.hasNext();) {
((OpenNapSearch)i.next()).getRequest().failed();
}
// notify all peers
}
public SendQueue getSendQueue()
{
return sendQueue;
}
private byte[] read(int length) throws IOException
{
byte[] textBuf = new byte[length];
int read = 0;
while (read < length) {
int c = in.read(textBuf, read, length - read);
if (c == -1) {
throw new IOException("Connection closed");
}
read += c;
}
return textBuf;
}
/**
* Receives the next message. Invoked by {@link #runQueue()}.
*/
private ServerMessage recv()
{
Packet p = null;
while (!die) {
try {
p = recvPacket();
return MessageFactory.create(server, p.id, p.data);
}
catch (IOException e) {
logger.debug(server.getHost() + ":" + server.getPort()
+ " recv ", e);
die(org.xnap.util.State.FAILED, (lastError != null)
? lastError
: NetHelper.getErrorMessage(e));
}
catch (InvalidMessageException e) {
logger.error(server.getHost() + ":" + server.getPort()
+ " recv: " + p, e);
}
}
return null;
}
private Packet recvPacket() throws IOException
{
byte[] header = read(4);
// byte types are signed...
int lo = 0xFF & (int)header[0];
int hi = 0xFF & (int)header[1];
int length = 0xFFFF & (hi << 8 | lo);
lo = 0xFF & (int)header[2];
hi = 0xFF & (int)header[3];
int id = 0xFFFF & (hi << 8 | lo);
byte[] data = read(length);
String content = (length > 0)
? (useUTF8Encoding)
? StringHelper.toString(data)
: new String(data)
: "";
logger.debug("< " + server.getHost() + ":" + server.getPort()
+ " (" + id + ") [" +
MessageFactory.getMessageName(id) + "] " + content);
return new Packet(id, content);
}
/**
* This method should only be invoked by the associated
* {@link SendQueue} object.
*/
void send(ClientMessage msg) throws IOException
{
String data = msg.getData(server.getVersion());
sendPacket(new Packet(msg.getType(), data));
}
private void sendPacket(Packet p) throws IOException
{
logger.debug
("> " + server.getHost() + ":" + server.getPort() + " "
+ MessageStrings.getMessageName(p.id) + "(" + p.id + ") "
+ p.data);
try {
if (out == null) {
// FIX: should not happen
throw new IOException(XNap.tr("Invalid stream state"));
}
byte[] content
= (useUTF8Encoding)
? p.data.getBytes("UTF-8")
: p.data.getBytes();
int dataLength = content.length;
byte[] data = new byte[2 + 2 + dataLength];
int type = (short)p.id;
int len = (short)dataLength;
data[0] = (byte)len;
data[1] = (byte)(len >> 8);
data[2] = (byte)type;
data[3] = (byte)(type >> 8);
System.arraycopy(content, 0, data, 4, content.length);
synchronized (sendLock) {
out.write(data);
out.flush();
}
}
catch (IOException e) {
logger.debug(server.getHost() + ":" + server.getPort() +
" sendPacket " + p, e);
die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
throw e;
}
}
private void connect()
{
try {
if (server.isRedirector()) {
if (fetchHost(server.getHost(), server.getPort())) {
connect(null, server.getRedirectedHost(),
server.getRedirectedPort());
}
else {
die(org.xnap.util.State.FAILED, "Invalid response");
}
}
else {
connect(server.getIP(), server.getHost(), server.getPort());
}
}
catch (ConnectException e) {
// connection refused, we will try again later
die(org.xnap.util.State.FAILED, NetHelper.getErrorMessage(e));
}
catch (IOException e) {
// unrecoverable error
die(org.xnap.util.State.ERROR, NetHelper.getErrorMessage(e));
if (server.canAutoRemove()) {
OpenNapPlugin.getServerManager().remove(server);
}
}
}
/**
* Opens the socket to the server and connects the streams.
* If host1 != null, it is tried first.
*
* @param host1 the ip address of the server or null if unknown
* @param host2 the hostname of the server
* @param port the port of the server
*/
private void connect(String host1, String host2, int port)
throws IOException
{
if (host1 != null) {
try {
socket = new Socket(host1, port);
}
catch (IOException e) {
server.setIP(null);
}
}
if (socket == null) {
for (int i = 0; i < MAX_RETRY_ON_CONNECTION_REFUSED
&& socket == null; i++) {
try {
socket = new Socket(host2, port);
}
catch (ConnectException e) {
if (i == MAX_RETRY_ON_CONNECTION_REFUSED - 1) {
throw e;
}
try {
Thread.sleep(10);
}
catch (InterruptedException e2) {
}
}
}
}
try {
socket.setSoTimeout(CONNECT_TIMEOUT);
in = new BufferedInputStream(socket.getInputStream());
out = socket.getOutputStream();
server.setStateDescription(XNap.tr("Logging in") + "...");
if (server.isNewUser()) {
send(new NickCheckMessage(server.getNick()));
}
else {
login(false);
}
}
catch (IOException e) {
die(org.xnap.util.State.FAILED, XNap.tr("Login failed"));
}
}
/**
* Connects to a redirector server and reads the host information.
*
* @return true, if successful; false, if failed
*/
public boolean fetchHost(String host, int port) throws IOException
{
Socket socket = null;
InputStream in = null;
try {
socket = new Socket(host, port);
in = new BufferedInputStream(socket.getInputStream());
byte[] b = new byte[1024];
int c = in.read(b, 0, 1024);
if (c > 0) {
// chop the \n
String response = new String(b, 0, c - 1);
StringTokenizer t = new StringTokenizer(response, ":");
if (t.countTokens() == 2) {
try {
server.setRedirectedHost(t.nextToken());
server.setRedirectedPort
(Integer.parseInt(t.nextToken()));
return true;
} catch (NumberFormatException e) {}
}
}
}
finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {}
}
if (in != null) {
try {
in.close();
} catch (IOException e) {}
}
}
return false;
}
/**
* Sends the login message to the server.
*/
private void login(boolean newUser) throws IOException
{
Preferences prefs = Preferences.getInstance();
String nick = server.getNick();
String password = server.getPassword();
int port = server.getLocalPort();
String email = server.getEmail();
String info = OpenNapPlugin.getPreferences().getClientInfo();
int linkSpeed = OpenNapLinkType.getIndexOfType(prefs.getLinkSpeed());
if (newUser) {
send(new NewUserLoginMessage
(nick, password, port, info, linkSpeed, email));
}
else {
send(new LoginMessage(nick, password, port, info, linkSpeed));
}
}
/**
* Polls the input stream for messages and passes them to the
* {@link MessageHandler}.
*/
private void runQueue()
{
while (!die) {
ServerMessage msg = recv();
if (die) {
return;
}
if (msg instanceof NickNotRegisteredMessage) {
try {
login(true);
}
catch (IOException e) {
die(org.xnap.util.State.FAILED, e.getLocalizedMessage());
}
}
else if (msg instanceof NickAlreadyRegisteredMessage) {
die(org.xnap.util.State.FAILED, XNap.tr("Nick already registered"));
}
else if (msg instanceof InvalidNickMessage) {
die(org.xnap.util.State.FAILED, XNap.tr("Invalid nick"));
}
else if (msg instanceof LoginAckMessage) {
server.setNewUser(false);
try {
socket.setSoTimeout(SOCKET_TIMEOUT);
}
catch (IOException e) {
logger.warn("Could not set socket timeout", e);
}
server.setState(org.xnap.util.State.CONNECTED);
}
else if (msg instanceof LoginErrorMessage) {
die(org.xnap.util.State.FAILED, ((LoginErrorMessage)msg).message);
}
else if (msg instanceof BrowseResponseMessage) {
if (currentBrowse != null) {
currentBrowse.received((BrowseResponseMessage)msg);
}
}
else if (msg instanceof EndBrowseMessage) {
if (currentBrowse != null) {
synchronized (this) {
currentBrowse.finished();
currentBrowse = null;
}
updateStatus();
}
}
else if (msg instanceof SearchResponseMessage) {
if (currentSearch != null) {
currentSearch.received((SearchResponseMessage)msg);
}
}
else if (msg instanceof EndSearchMessage) {
if (currentSearch != null) {
synchronized (this) {
currentSearch.finished();
currentSearch = null;
}
updateStatus();
}
}
else if (msg instanceof ErrorMessage) {
lastError = ((ErrorMessage)msg).message;
if (lastError.startsWith("You have been killed by server")
|| lastError.startsWith("You were killed by server")) {
// handle server kicks
die(org.xnap.util.State.FAILED, lastError);
}
}
OpenNapPlugin.getMessageHandler().handle(msg);
allowSearch();
allowBrowse();
}
}
/**
*
*/
public void run()
{
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
//userByNick.clear();
connect();
runQueue();
server.setState(org.xnap.util.State.DISCONNECTED, dieReason);
logger.debug(server.toString() + " has died");
}
private void updateStatus()
{
StringBuffer sb = new StringBuffer(XNap.tr("connected"));
if (currentSearch != null) {
sb.append(", ");
sb.append(XNap.tr("searching"));
}
if (searchQueue.size() > 0) {
sb.append(", ");
sb.append(XNap.tr("{0} searches pending", searchQueue.size()));
}
if (currentBrowse != null) {
sb.append(", browsing");
}
server.setStateDescription(sb.toString());
}
private synchronized void allowBrowse()
{
if (currentBrowse != null || browseQueue.size() == 0) {
return;
}
currentBrowse = (OpenNapBrowse)browseQueue.removeFirst();
MessageHandler.send(server, currentBrowse.getRequest());
updateStatus();
}
private synchronized void allowSearch()
{
if (currentSearch != null || searchQueue.size() == 0) {
return;
}
currentSearch = (OpenNapSearch)searchQueue.pop();
MessageHandler.send(server, currentSearch.getRequest());
updateStatus();
}
}
The table below shows all metrics for OpenNapServerRunner.java.




