From 08519bb0f327ed9b323e38808ab80aeb5b12264a Mon Sep 17 00:00:00 2001 From: CyB3RC0nN0R Date: Fri, 3 Jan 2020 17:19:40 +0200 Subject: [PATCH] Added listener mechanism for socket registration and cancellation --- .../jenkov/nioserver/ISocketIdListener.java | 25 +++++++++++++++++++ .../java/com/jenkov/nioserver/Server.java | 10 +++++--- .../java/com/jenkov/nioserver/Socket.java | 1 - .../com/jenkov/nioserver/SocketProcessor.java | 8 ++++++ 4 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/jenkov/nioserver/ISocketIdListener.java diff --git a/src/main/java/com/jenkov/nioserver/ISocketIdListener.java b/src/main/java/com/jenkov/nioserver/ISocketIdListener.java new file mode 100644 index 0000000..6368e57 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/ISocketIdListener.java @@ -0,0 +1,25 @@ +package com.jenkov.nioserver; + +/** + * Project: java-nio-server
+ * File: ISocketIdListener.java
+ * Created: 03.01.2020
+ * + * @author Kai S. K. Engelbart + */ +public interface ISocketIdListener { + + /** + * Is invoked when a new {@link Socket} is registered by the server + * + * @param socketId the ID of the newly registered socket + */ + void socketRegistered(long socketId); + + /** + * Is invoked when a {@link Socket} is cancelled by the server + * + * @param socketId the ID of the cancelled socket + */ + void socketCancelled(long socketId); +} diff --git a/src/main/java/com/jenkov/nioserver/Server.java b/src/main/java/com/jenkov/nioserver/Server.java index 2926384..2eb9e71 100644 --- a/src/main/java/com/jenkov/nioserver/Server.java +++ b/src/main/java/com/jenkov/nioserver/Server.java @@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue; */ public class Server { - private SocketAcceptor socketAccepter; + private SocketAcceptor socketAcceptor; private SocketProcessor socketProcessor; private int tcpPort; @@ -30,17 +30,21 @@ public class Server { Queue socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig - socketAccepter = new SocketAcceptor(tcpPort, socketQueue); + socketAcceptor = new SocketAcceptor(tcpPort, socketQueue); MessageBuffer readBuffer = new MessageBuffer(); MessageBuffer writeBuffer = new MessageBuffer(); socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor); - Thread accepterThread = new Thread(socketAccepter); + Thread accepterThread = new Thread(socketAcceptor); Thread processorThread = new Thread(socketProcessor); accepterThread.start(); processorThread.start(); } + + public SocketAcceptor getSocketAcceptor() { return socketAcceptor; } + + public SocketProcessor getSocketProcessor() { return socketProcessor; } } \ No newline at end of file diff --git a/src/main/java/com/jenkov/nioserver/Socket.java b/src/main/java/com/jenkov/nioserver/Socket.java index 9cad6ea..9361986 100644 --- a/src/main/java/com/jenkov/nioserver/Socket.java +++ b/src/main/java/com/jenkov/nioserver/Socket.java @@ -47,5 +47,4 @@ public class Socket { return totalBytesWritten; } - } diff --git a/src/main/java/com/jenkov/nioserver/SocketProcessor.java b/src/main/java/com/jenkov/nioserver/SocketProcessor.java index f05e76b..27575f2 100644 --- a/src/main/java/com/jenkov/nioserver/SocketProcessor.java +++ b/src/main/java/com/jenkov/nioserver/SocketProcessor.java @@ -51,6 +51,8 @@ public class SocketProcessor implements Runnable { private Set emptyToNonEmptySockets = new HashSet<>(); private Set nonEmptyToEmptySockets = new HashSet<>(); + private Set socketIdListeners = new HashSet<>(); + public SocketProcessor(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { this.inboundSocketQueue = inboundSocketQueue; @@ -97,6 +99,7 @@ public class SocketProcessor implements Runnable { newSocket.messageWriter = new MessageWriter(); socketMap.put(newSocket.socketId, newSocket); + socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1)); SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ); key.attach(newSocket); @@ -121,6 +124,10 @@ public class SocketProcessor implements Runnable { } } + public void registerSocketIdListener(ISocketIdListener listener) { + socketIdListeners.add(listener); + } + private void readFromSocket(SelectionKey key) throws IOException { Socket socket = (Socket) key.attachment(); socket.messageReader.read(socket, this.readByteBuffer); @@ -138,6 +145,7 @@ public class SocketProcessor implements Runnable { if (socket.endOfStreamReached) { System.out.println("Socket closed: " + socket.socketId); socketMap.remove(socket.socketId); + socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId)); key.attach(null); key.cancel(); key.channel().close();