Merge pull request #3 from informatik-ag-ngl/b/cancelled_socket

Fixed unresponsive behavior after cancelled socket
This commit is contained in:
Kai S. K. Engelbart 2020-02-09 15:34:35 +01:00 committed by GitHub
commit 9d430dbb75

View File

@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/** /**
* Project: <strong>java-nio-server</strong><br> * Project: <strong>java-nio-server</strong><br>
@ -59,7 +52,7 @@ public class SocketProcessor implements Runnable {
this.readMessageBuffer = readMessageBuffer; this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer; this.writeMessageBuffer = writeMessageBuffer;
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory; this.messageReaderFactory = messageReaderFactory;
@ -69,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open(); writeSelector = Selector.open();
} }
@Override
public void run() { public void run() {
while (true) { while (true) {
try { try {
@ -112,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow(); int readReady = readSelector.selectNow();
if (readReady > 0) { if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys(); Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -130,25 +124,34 @@ public class SocketProcessor implements Runnable {
private void readFromSocket(SelectionKey key) throws IOException { private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer); boolean cancelled = false;
List<Message> fullMessages = socket.messageReader.getMessages(); try {
if (fullMessages.size() > 0) { socket.messageReader.read(socket, readByteBuffer);
for (Message message : fullMessages) {
message.socketId = socket.socketId; List<Message> fullMessages = socket.messageReader.getMessages();
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an if (fullMessages.size() > 0) {
// IMessageWriter for this socket. for (Message message : fullMessages) {
message.socketId = socket.socketId;
// the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
messageProcessor.process(message, writeProxy);
}
fullMessages.clear();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
cancelled = true;
} finally {
if (cancelled || socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null);
key.cancel();
key.channel().close();
} }
fullMessages.clear();
}
if (socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null);
key.cancel();
key.channel().close();
} }
} }
@ -164,10 +167,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets(); registerNonEmptySockets();
// Select from the Selector. // Select from the Selector.
int writeReady = this.writeSelector.selectNow(); int writeReady = writeSelector.selectNow();
if (writeReady > 0) { if (writeReady > 0) {
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys(); Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -175,9 +178,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer); socket.messageWriter.write(socket, writeByteBuffer);
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); } if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove(); keyIterator.remove();
} }
@ -193,7 +196,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() { private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) { for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel(); key.cancel();
} }
nonEmptyToEmptySockets.clear(); nonEmptyToEmptySockets.clear();