diff --git a/src/main/java/com/jenkov/nioserver/SocketProcessor.java b/src/main/java/com/jenkov/nioserver/SocketProcessor.java
index 27575f2..92ad574 100644
--- a/src/main/java/com/jenkov/nioserver/SocketProcessor.java
+++ b/src/main/java/com/jenkov/nioserver/SocketProcessor.java
@@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
+import java.util.*;
/**
* Project: java-nio-server
@@ -59,7 +52,7 @@ public class SocketProcessor implements Runnable {
this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer;
- writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
+ writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory;
@@ -69,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open();
}
+ @Override
public void run() {
while (true) {
try {
@@ -112,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow();
if (readReady > 0) {
- Set selectedKeys = this.readSelector.selectedKeys();
+ Set selectedKeys = readSelector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
@@ -130,25 +124,34 @@ public class SocketProcessor implements Runnable {
private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment();
- socket.messageReader.read(socket, this.readByteBuffer);
+ boolean cancelled = false;
- List fullMessages = socket.messageReader.getMessages();
- if (fullMessages.size() > 0) {
- for (Message message : fullMessages) {
- message.socketId = socket.socketId;
- messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
- // IMessageWriter for this socket.
+ try {
+ socket.messageReader.read(socket, readByteBuffer);
+
+ List fullMessages = socket.messageReader.getMessages();
+ if (fullMessages.size() > 0) {
+ for (Message message : fullMessages) {
+ message.socketId = socket.socketId;
+ // the message processor will eventually push outgoing messages into an
+ // IMessageWriter for this socket.
+ messageProcessor.process(message, writeProxy);
+ }
+ fullMessages.clear();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
+ cancelled = true;
+ } finally {
+ if (cancelled || socket.endOfStreamReached) {
+ System.out.println("Socket closed: " + socket.socketId);
+ socketMap.remove(socket.socketId);
+ socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
+ key.attach(null);
+ key.cancel();
+ key.channel().close();
}
- fullMessages.clear();
- }
-
- if (socket.endOfStreamReached) {
- System.out.println("Socket closed: " + socket.socketId);
- socketMap.remove(socket.socketId);
- socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
- key.attach(null);
- key.cancel();
- key.channel().close();
}
}
@@ -164,10 +167,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets();
// Select from the Selector.
- int writeReady = this.writeSelector.selectNow();
+ int writeReady = writeSelector.selectNow();
if (writeReady > 0) {
- Set selectionKeys = this.writeSelector.selectedKeys();
+ Set selectionKeys = writeSelector.selectedKeys();
Iterator keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
@@ -175,9 +178,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment();
- socket.messageWriter.write(socket, this.writeByteBuffer);
+ socket.messageWriter.write(socket, writeByteBuffer);
- if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
+ if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove();
}
@@ -193,7 +196,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) {
- SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
+ SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel();
}
nonEmptyToEmptySockets.clear();