Closing socket if MessageReader fails

This commit is contained in:
Kai S. K. Engelbart 2020-02-09 15:09:00 +01:00
parent 5cf2959e1a
commit 028399a3e6

View File

@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/** /**
* Project: <strong>java-nio-server</strong><br> * Project: <strong>java-nio-server</strong><br>
@ -59,7 +52,7 @@ public class SocketProcessor implements Runnable {
this.readMessageBuffer = readMessageBuffer; this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer; this.writeMessageBuffer = writeMessageBuffer;
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory; this.messageReaderFactory = messageReaderFactory;
@ -69,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open(); writeSelector = Selector.open();
} }
@Override
public void run() { public void run() {
while (true) { while (true) {
try { try {
@ -112,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow(); int readReady = readSelector.selectNow();
if (readReady > 0) { if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys(); Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -130,19 +124,25 @@ public class SocketProcessor implements Runnable {
private void readFromSocket(SelectionKey key) throws IOException { private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer); boolean cancelled = false;
try {
socket.messageReader.read(socket, readByteBuffer);
List<Message> fullMessages = socket.messageReader.getMessages(); List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) { if (fullMessages.size() > 0) {
for (Message message : fullMessages) { for (Message message : fullMessages) {
message.socketId = socket.socketId; message.socketId = socket.socketId;
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an // the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket. // IMessageWriter for this socket.
messageProcessor.process(message, writeProxy);
} }
fullMessages.clear(); fullMessages.clear();
} }
} catch (IOException e) {
if (socket.endOfStreamReached) { cancelled = true;
} finally {
if (cancelled || socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId); System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId); socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId)); socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
@ -151,6 +151,7 @@ public class SocketProcessor implements Runnable {
key.channel().close(); key.channel().close();
} }
} }
}
public void writeToSockets() throws IOException { public void writeToSockets() throws IOException {
@ -164,10 +165,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets(); registerNonEmptySockets();
// Select from the Selector. // Select from the Selector.
int writeReady = this.writeSelector.selectNow(); int writeReady = writeSelector.selectNow();
if (writeReady > 0) { if (writeReady > 0) {
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys(); Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -175,9 +176,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer); socket.messageWriter.write(socket, writeByteBuffer);
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); } if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove(); keyIterator.remove();
} }
@ -193,7 +194,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() { private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) { for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel(); key.cancel();
} }
nonEmptyToEmptySockets.clear(); nonEmptyToEmptySockets.clear();