Added listener mechanism for socket registration and cancellation

This commit is contained in:
Kai S. K. Engelbart 2020-01-03 17:19:40 +02:00
parent dd7d232004
commit 08519bb0f3
4 changed files with 40 additions and 4 deletions

View File

@ -0,0 +1,25 @@
package com.jenkov.nioserver;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>ISocketIdListener.java</strong><br>
* Created: <strong>03.01.2020</strong><br>
*
* @author Kai S. K. Engelbart
*/
public interface ISocketIdListener {
/**
* Is invoked when a new {@link Socket} is registered by the server
*
* @param socketId the ID of the newly registered socket
*/
void socketRegistered(long socketId);
/**
* Is invoked when a {@link Socket} is cancelled by the server
*
* @param socketId the ID of the cancelled socket
*/
void socketCancelled(long socketId);
}

View File

@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
*/ */
public class Server { public class Server {
private SocketAcceptor socketAccepter; private SocketAcceptor socketAcceptor;
private SocketProcessor socketProcessor; private SocketProcessor socketProcessor;
private int tcpPort; private int tcpPort;
@ -30,17 +30,21 @@ public class Server {
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
socketAccepter = new SocketAcceptor(tcpPort, socketQueue); socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
MessageBuffer readBuffer = new MessageBuffer(); MessageBuffer readBuffer = new MessageBuffer();
MessageBuffer writeBuffer = new MessageBuffer(); MessageBuffer writeBuffer = new MessageBuffer();
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor); socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
Thread accepterThread = new Thread(socketAccepter); Thread accepterThread = new Thread(socketAcceptor);
Thread processorThread = new Thread(socketProcessor); Thread processorThread = new Thread(socketProcessor);
accepterThread.start(); accepterThread.start();
processorThread.start(); processorThread.start();
} }
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
public SocketProcessor getSocketProcessor() { return socketProcessor; }
} }

View File

@ -47,5 +47,4 @@ public class Socket {
return totalBytesWritten; return totalBytesWritten;
} }
} }

View File

@ -51,6 +51,8 @@ public class SocketProcessor implements Runnable {
private Set<Socket> emptyToNonEmptySockets = new HashSet<>(); private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>(); private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue; this.inboundSocketQueue = inboundSocketQueue;
@ -97,6 +99,7 @@ public class SocketProcessor implements Runnable {
newSocket.messageWriter = new MessageWriter(); newSocket.messageWriter = new MessageWriter();
socketMap.put(newSocket.socketId, newSocket); socketMap.put(newSocket.socketId, newSocket);
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ); SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
key.attach(newSocket); key.attach(newSocket);
@ -121,6 +124,10 @@ public class SocketProcessor implements Runnable {
} }
} }
public void registerSocketIdListener(ISocketIdListener listener) {
socketIdListeners.add(listener);
}
private void readFromSocket(SelectionKey key) throws IOException { private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer); socket.messageReader.read(socket, this.readByteBuffer);
@ -138,6 +145,7 @@ public class SocketProcessor implements Runnable {
if (socket.endOfStreamReached) { if (socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId); System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId); socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null); key.attach(null);
key.cancel(); key.cancel();
key.channel().close(); key.channel().close();