Compare commits

11 Commits

Author SHA1 Message Date
9d430dbb75 Merge pull request #3 from informatik-ag-ngl/b/cancelled_socket
Fixed unresponsive behavior after cancelled socket
2020-02-09 15:34:35 +01:00
6c3ceaae36 Printing socket exception to console 2020-02-09 15:16:59 +01:00
028399a3e6 Closing socket if MessageReader fails 2020-02-09 15:09:00 +01:00
5cf2959e1a Fixed consecutive message writing 2020-01-11 10:53:01 +01:00
27ac6b43c0 Generating sources and Javadoc in JAR export 2020-01-06 14:38:14 +01:00
0044e5c200 Fixed Maven build error caused by release flag 2020-01-06 10:29:21 +01:00
35bce6dae7 Added getters for SocketAcceptor and SocketProcessor to Server 2020-01-03 17:55:37 +02:00
ef44963f20 Merge branch 'master' of https://github.com/informatik-ag-ngl/java-nio-server.git 2020-01-03 17:19:50 +02:00
282f9b916a Added listener mechanism for socket registration and cancellation 2020-01-03 17:19:40 +02:00
aedb2a9ac0 Create maven.yml
Maven build is done automatically on push
2019-12-30 15:28:51 +01:00
6b9a4d4f1c Merge pull request #1 from informatik-ag-ngl/f/style_adjustment
Converted to Maven project, reformatted source files and added Javadoc headers
2019-12-28 11:50:57 +02:00
10 changed files with 141 additions and 46 deletions

17
.github/workflows/maven.yml vendored Normal file
View File

@ -0,0 +1,17 @@
name: Java CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B package --file pom.xml

View File

@ -0,0 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

View File

@ -4,5 +4,5 @@ org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=enabled org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8 org.eclipse.jdt.core.compiler.source=1.8

41
pom.xml
View File

@ -2,10 +2,21 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>informatik-ag-ngl</groupId> <groupId>informatik-ag-ngl</groupId>
<artifactId>java-nio-server</artifactId> <artifactId>java-nio-server</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<name>java-nio-server</name>
<url>https://github.com/informatik-ag-ngl/java-nio-server</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
@ -16,13 +27,33 @@
</dependencies> </dependencies>
<build> <build>
<finalName>java-nio-server</finalName>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-compiler-plugin</artifactId> <groupId>org.apache.maven.plugins</groupId>
<version>3.8.0</version> <artifactId>maven-source-plugin</artifactId>
<configuration> <version>3.2.1</version>
<release>8</release> <executions>
</configuration> <execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>

View File

@ -0,0 +1,25 @@
package com.jenkov.nioserver;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>ISocketIdListener.java</strong><br>
* Created: <strong>03.01.2020</strong><br>
*
* @author Kai S. K. Engelbart
*/
public interface ISocketIdListener {
/**
* Is invoked when a new {@link Socket} is registered by the server
*
* @param socketId the ID of the newly registered socket
*/
void socketRegistered(long socketId);
/**
* Is invoked when a {@link Socket} is cancelled by the server
*
* @param socketId the ID of the cancelled socket
*/
void socketCancelled(long socketId);
}

View File

@ -68,7 +68,7 @@ public class Message {
if (!this.messageBuffer.expandMessage(this)) return -1; if (!this.messageBuffer.expandMessage(this)) return -1;
int bytesToCopy = Math.min(remaining, capacity - length); int bytesToCopy = Math.min(remaining, capacity - length);
System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy); System.arraycopy(byteArray, offset, sharedArray, this.offset + this.length, bytesToCopy);
this.length += bytesToCopy; this.length += bytesToCopy;
return bytesToCopy; return bytesToCopy;
} }

View File

@ -19,8 +19,10 @@ public class MessageWriter {
private int bytesWritten; private int bytesWritten;
public void enqueue(Message message) { public void enqueue(Message message) {
if (messageInProgress == null) messageInProgress = message; if (messageInProgress == null) {
else writeQueue.add(message); messageInProgress = message;
bytesWritten = 0;
} else writeQueue.add(message);
} }
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
@ -31,8 +33,10 @@ public class MessageWriter {
byteBuffer.clear(); byteBuffer.clear();
if (bytesWritten >= messageInProgress.length) { if (bytesWritten >= messageInProgress.length) {
if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0); if (writeQueue.size() > 0) {
else messageInProgress = null; messageInProgress = writeQueue.remove(0);
bytesWritten = 0;
} else messageInProgress = null;
// TODO: unregister from selector // TODO: unregister from selector
} }
} }

View File

@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
*/ */
public class Server { public class Server {
private SocketAcceptor socketAccepter; private SocketAcceptor socketAcceptor;
private SocketProcessor socketProcessor; private SocketProcessor socketProcessor;
private int tcpPort; private int tcpPort;
@ -30,17 +30,21 @@ public class Server {
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
socketAccepter = new SocketAcceptor(tcpPort, socketQueue); socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
MessageBuffer readBuffer = new MessageBuffer(); MessageBuffer readBuffer = new MessageBuffer();
MessageBuffer writeBuffer = new MessageBuffer(); MessageBuffer writeBuffer = new MessageBuffer();
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor); socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
Thread accepterThread = new Thread(socketAccepter); Thread accepterThread = new Thread(socketAcceptor);
Thread processorThread = new Thread(socketProcessor); Thread processorThread = new Thread(socketProcessor);
accepterThread.start(); accepterThread.start();
processorThread.start(); processorThread.start();
} }
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
public SocketProcessor getSocketProcessor() { return socketProcessor; }
} }

View File

@ -47,5 +47,4 @@ public class Socket {
return totalBytesWritten; return totalBytesWritten;
} }
} }

View File

@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/** /**
* Project: <strong>java-nio-server</strong><br> * Project: <strong>java-nio-server</strong><br>
@ -51,13 +44,15 @@ public class SocketProcessor implements Runnable {
private Set<Socket> emptyToNonEmptySockets = new HashSet<>(); private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>(); private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue; this.inboundSocketQueue = inboundSocketQueue;
this.readMessageBuffer = readMessageBuffer; this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer; this.writeMessageBuffer = writeMessageBuffer;
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory; this.messageReaderFactory = messageReaderFactory;
@ -67,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open(); writeSelector = Selector.open();
} }
@Override
public void run() { public void run() {
while (true) { while (true) {
try { try {
@ -97,6 +93,7 @@ public class SocketProcessor implements Runnable {
newSocket.messageWriter = new MessageWriter(); newSocket.messageWriter = new MessageWriter();
socketMap.put(newSocket.socketId, newSocket); socketMap.put(newSocket.socketId, newSocket);
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ); SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
key.attach(newSocket); key.attach(newSocket);
@ -109,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow(); int readReady = readSelector.selectNow();
if (readReady > 0) { if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys(); Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -121,28 +118,42 @@ public class SocketProcessor implements Runnable {
} }
} }
public void registerSocketIdListener(ISocketIdListener listener) {
socketIdListeners.add(listener);
}
private void readFromSocket(SelectionKey key) throws IOException { private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer); boolean cancelled = false;
try {
socket.messageReader.read(socket, readByteBuffer);
List<Message> fullMessages = socket.messageReader.getMessages(); List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) { if (fullMessages.size() > 0) {
for (Message message : fullMessages) { for (Message message : fullMessages) {
message.socketId = socket.socketId; message.socketId = socket.socketId;
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an // the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket. // IMessageWriter for this socket.
messageProcessor.process(message, writeProxy);
} }
fullMessages.clear(); fullMessages.clear();
} }
} catch (IOException e) {
if (socket.endOfStreamReached) { e.printStackTrace();
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
cancelled = true;
} finally {
if (cancelled || socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId); System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId); socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null); key.attach(null);
key.cancel(); key.cancel();
key.channel().close(); key.channel().close();
} }
} }
}
public void writeToSockets() throws IOException { public void writeToSockets() throws IOException {
@ -156,10 +167,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets(); registerNonEmptySockets();
// Select from the Selector. // Select from the Selector.
int writeReady = this.writeSelector.selectNow(); int writeReady = writeSelector.selectNow();
if (writeReady > 0) { if (writeReady > 0) {
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys(); Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
@ -167,9 +178,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment(); Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer); socket.messageWriter.write(socket, writeByteBuffer);
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); } if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove(); keyIterator.remove();
} }
@ -185,7 +196,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() { private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) { for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel(); key.cancel();
} }
nonEmptyToEmptySockets.clear(); nonEmptyToEmptySockets.clear();