OvernetCore.java
| Index Score | ||
|---|---|---|
![]() |
![]() |
org.xnap.plugin.overnet.net |
![]() |
![]() |
XNap 3 |
View: Reasons, Metrics, Source Code
These are the metrics that contribute to the Enerjy Score for this file, ranked by impact. So the metrics listed at the top influence the score to a greater extent that the metrics listed at the bottom.
/*
* XNap - A P2P framework and client.
*
* See the file AUTHORS for copyright information.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.xnap.plugin.overnet.net;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Hashtable;
import java.util.Iterator;
import org.apache.log4j.Logger;
import org.xnap.XNap;
import org.xnap.event.StateListener;
import org.xnap.event.StateSupport;
import org.xnap.plugin.overnet.OvernetPlugin;
import org.xnap.plugin.overnet.net.msg.client.*;
import org.xnap.plugin.overnet.net.msg.*;
import org.xnap.plugin.overnet.net.msg.client.GetOptionsMessage;
import org.xnap.plugin.overnet.net.msg.client.GetServerListMessage;
import org.xnap.plugin.overnet.net.msg.client.GetSharedFilesMessage;
import org.xnap.plugin.overnet.net.msg.client.LoginMessage;
import org.xnap.plugin.overnet.net.msg.client.SendDownloadStatusMessage;
import org.xnap.plugin.overnet.net.msg.client.SendUploadStatusMessage;
import org.xnap.plugin.overnet.net.msg.client.StopDownloadStatusMessage;
import org.xnap.plugin.overnet.net.msg.client.StopUploadStatusMessage;
import org.xnap.plugin.overnet.net.msg.core.OvernetCoreMessage;
import org.xnap.plugin.overnet.net.msg.core.MessageFactory;
import org.xnap.plugin.overnet.util.OvernetPreferences;
import org.xnap.search.SearchManager;
import org.xnap.util.FiniteStateMachine;
import org.xnap.util.Preferences;
import org.xnap.util.Scheduler;
import org.xnap.util.State;
/**
* Epitomizes a connection to the overnet core.
*
* This class handles all the low level writing and reading of messages. The
* transitions from one connection state to the next are handled in its
* private subclass <code>StateMachine</code>.
*/
public class OvernetCore
{
//--- Constant(s) ---
public static final byte ED2K_BYTE = (byte)0xe3;
private static final Hashtable TRANSITION_TABLE;
static {
State[][] table = new State[][] {
{ State.DISCONNECTED, State.CONNECTING, },
{ State.CONNECTING, State.CONNECTED, State.DISCONNECTED },
{ State.CONNECTED, State.DISCONNECTING, State.DISCONNECTED },
{ State.DISCONNECTING, State.DISCONNECTED }
};
TRANSITION_TABLE = FiniteStateMachine.createStateTable(table);
}
//--- Data Field(s) ---
/**
* The non blocking channel for this connection.
*/
private SocketChannel channel;
/**
* The buffer used for reading from the channel.
*/
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private Selector selector;
private boolean die = false;
/**
* Queues the messages to be written out.
*/
private MessageQueue sendQueue = new MessageQueue();
/**
* Messages are forwarded to the message handler after their creation.
*/
private MessageHandler messageHandler;
private OvernetPreferences prefs = OvernetPreferences.getInstance();
/**
* Delay connection time.
*/
private int delay = 0;
private StateMachine sm = new StateMachine();
private StateSupport listeners = new StateSupport(this);
private static Logger logger = Logger.getLogger(OvernetCore.class);
//--- Constructor(s) ---
public OvernetCore(MessageHandler handler)
{
buffer.order(ByteOrder.LITTLE_ENDIAN);
if (handler == null) {
throw new NullPointerException();
}
messageHandler = handler;
}
/**
* Connect to the overnet core.
*/
public void connect()
{
connect(0);
}
/**
* Connect to core after a certain delay.
*
* The core's state will be immediately connecting.
*/
public void connect(int delay)
{
this.delay = delay;
setState(State.CONNECTING);
}
private void read() throws IOException
{
try {
if (channel.read(buffer) == -1) {
stop();
return;
}
}
catch (IOException ie) {
// logger.debug("channel read", ie);
}
byte type;
while ((type = check(buffer)) != -1) {
// logger.debug("read " + (int)type);
OvernetCoreMessage msg = MessageFactory.create(type, buffer);
if (msg != null && msg.isValid()) {
messageHandler.handle(msg);
}
int size = buffer.getInt(1);
// logger.debug("Computed size " + size + " as to b.limit() "
// + buffer.limit());
buffer.position(Math.min(size + 5, buffer.limit()));
buffer.compact();
}
buffer.compact();
}
private byte check(ByteBuffer buffer) throws IOException
{
buffer.flip();
if (buffer.remaining() < 6) {
return -1;
}
byte b = buffer.get();
int size = buffer.getInt();
byte type = buffer.get();
buffer.rewind();
if (b != ED2K_BYTE) {
logger.debug("invalid packet" + toString(buffer.array())
+ " offset " + buffer.arrayOffset()
+ " position " + buffer.position()
+ " limit " + buffer.limit());
throw new IOException("Invalid Packet");
}
if (buffer.capacity() < size + 5) {
resizeBuffer(size + 5);
return -1;
}
if (buffer.remaining() < size + 5) {
logger.debug(type + " message of size " + size
+ " not fully read");
return -1;
}
return type;
}
private void resizeBuffer(int size)
{
logger.debug("reallocating buffer to " + size);
byte[] bytes = new byte[size];
buffer.get(bytes, 0, buffer.remaining());
ByteBuffer newBuf = ByteBuffer.wrap(bytes);
newBuf.order(ByteOrder.LITTLE_ENDIAN);
newBuf.limit(buffer.limit());
buffer = newBuf;
}
private String toString(byte[] array)
{
StringBuffer sb = new StringBuffer(array.length * 2);
for (int i = 0; i < array.length; i++) {
sb.append(array[i]);
sb.append(" ");
}
return sb.toString();
}
private synchronized void write() throws IOException
{
OvernetClientMessage msg = sendQueue.getNextMessage();
if (msg != null) {
channel.write(msg.getBuffer());
}
/* if there is still data to be written out, register the write op
bit. */
if (sendQueue.hasRemaining()) {
addOp(SelectionKey.OP_WRITE);
}
else {
removeOp(SelectionKey.OP_WRITE);
}
}
private void addOp(int op)
{
SelectionKey key = channel.keyFor(selector);
try {
if (key != null) {
channel.register(selector, key.interestOps() | op);
}
else {
channel.register(selector, op);
}
}
catch (ClosedChannelException ce) {
logger.debug("addOp", ce);
setState(State.DISCONNECTED, ce.getLocalizedMessage());
}
}
private void removeOp(int op)
{
SelectionKey key = channel.keyFor(selector);
try {
if (key != null) {
channel.register(selector, key.interestOps() ^ op);
}
}
catch (ClosedChannelException ce) {
logger.debug("removeOp", ce);
setState(State.DISCONNECTED, ce.getLocalizedMessage());
}
}
/**
* Adds the message to the <code>sendQueue</code> and calls {@link
* #write()}
*/
private void write(OvernetClientMessage cm)
{
sendQueue.add(cm);
try {
write();
}
catch (IOException ie) {
stop();
logger.debug("error writing", ie);
}
}
/**
* Sends a message to the overnet core in a non-blocking way.
*/
public static void send(OvernetClientMessage cm)
{
OvernetCore c = OvernetPlugin.getInstance().getCore();
// logger.debug("Sending " + cm.getClass().getName());
c.write(cm);
}
/**
* Stops the connection.
*/
public void stop()
{
die = true;
if (selector != null) {
selector.wakeup();
}
}
public void addStateListener(StateListener listener)
{
listeners.addStateListener(listener);
}
public void removeStateListener(StateListener listener)
{
listeners.removeStateListener(listener);
}
public State getState()
{
return sm.getState();
}
public String getDescription()
{
return sm.getDescription();
}
private void setState(State newState)
{
sm.setState(newState);
listeners.fireStateChanged();
}
private void setState(State newState, String message)
{
sm.setState(newState, message);
listeners.fireStateChanged();
}
//--- Inner Class(es) ---
private class StateMachine extends FiniteStateMachine implements Runnable
{
//---- Data Field(s) ---
private Thread runner;
//--- Constructor(s) ---
public StateMachine()
{
super(State.DISCONNECTED, TRANSITION_TABLE);
}
protected synchronized void stateChanged(State oldState,
State newState)
{
if (newState == State.CONNECTING) {
if (delay > 0) {
Scheduler.run(delay, new Runnable() {
public void run()
{
connect();
}
});
}
else {
connect();
}
}
else if (newState == State.CONNECTED) {
connected();
}
else if (newState == State.DISCONNECTING) {
disconnect();
}
else if (newState == State.DISCONNECTED) {
close();
}
}
private void connect()
{
try {
sendQueue.clear();
die = false;
channel = SocketChannel.open();
channel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress
(prefs.getCoreHost(), prefs.getCorePort());
if (isa.isUnresolved()) {
throw new IOException("Address could not me be resolved");
}
channel.connect(isa);
}
catch (IOException ie) {
logger.debug("connecting failed", ie);
this.setState(State.DISCONNECTED, ie.getLocalizedMessage());
}
// start selector thread here
runner = new Thread(this, "Overnet core connection");
runner.start();
}
private void connected()
{
removeOp(SelectionKey.OP_CONNECT);
addOp(SelectionKey.OP_READ);
/* todo: move this somewhere else, it's not the core's task to
send messages on its own. */
write(new LoginMessage(prefs.getUsername(), prefs.getPassword()));
write(new CommandMessage("g"));
if (prefs.getUseXNapDownloadDirs()) {
// these are the global preferences
Preferences p = Preferences.getInstance();
write(new CommandMessage("temp " + p.getIncompleteDir()));
write(new CommandMessage("in " + p.getDownloadDir()));
}
write(new GetOptionsMessage());
write(new SendDownloadStatusMessage());
write(new SendUploadStatusMessage());
write(new GetServerListMessage());
// write(new GetSharedDirsMessage());
// write(new GetSharedFilesMessage());
SearchManager.getInstance().add(OvernetPlugin.getInstance());
}
public void run()
{
try {
select();
OvernetCore.this.setState(State.DISCONNECTING);
OvernetCore.this.setState(State.DISCONNECTED);
}
catch (IOException ie) {
logger.debug("overnet core connection", ie);
OvernetCore.this.setState(State.DISCONNECTED,
ie.getLocalizedMessage());
}
}
private void select() throws IOException
{
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
while (selector.select() > 0 && !die) {
for (Iterator i = selector.selectedKeys().iterator();
i.hasNext();) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
SocketChannel sc = (SocketChannel)key.channel();
if (key.isConnectable()) {
if (sc.isConnectionPending()) {
logger.debug("finish connect");
try {
if (sc.finishConnect()) {
OvernetCore.this.setState(State.CONNECTED);
}
else {
throw new
IOException("finish connect failed");
}
}
catch (ConnectException ce) {
logger.debug("connect exp", ce);
throw new ConnectException(XNap.tr("Could not connect to overnet daemon\nThis can have several reasons:\n-overnet is not installed.This plugin requires an external program called overnet that handles the network connection. Please see http://www.overnet.com/ for more information and download links.\n-The plugin settings are incorrect, e.g. the overnet daemon port is wrong. Please check the settings under Settings -> Configure Plugins.\n-The overnet daemon is not running: Please start it manually or configure XNap to start it for you and restart XNap.\n"));
}
}
}
else if (key.isReadable()) {
read();
}
else if (key.isWritable()) {
write();
}
}
}
}
private void disconnect()
{
// that's what the gtk gui sends on deliberate shutdown
// this shouldn't be sent here
write(new StopDownloadStatusMessage());
write(new StopUploadStatusMessage());
write(new GetSharedFilesMessage());
}
/**
* Clean up and close channel.
*/
private void close()
{
if (channel != null) {
try {
logger.debug("closing channel");
channel.socket().shutdownInput();
channel.socket().shutdownOutput();
channel.socket().close();
}
catch (IOException ie) {
logger.debug("closing channel", ie);
}
}
SearchManager.getInstance().remove(OvernetPlugin.getInstance());
}
}
}
The table below shows all metrics for OvernetCore.java.




