Compare commits
8 Commits
v0.1-alpha
...
master
Author | SHA1 | Date |
---|---|---|
Kai S. K. Engelbart | 5f4f626206 | |
Kai S. K. Engelbart | 16b0a338a7 | |
Kai S. K. Engelbart | 9fa8686c7c | |
Kai S. K. Engelbart | f0b007aa75 | |
Kai S. K. Engelbart | 5b2ffb0f7e | |
Kai S. K. Engelbart | 4fe5d9d5f8 | |
Kai S. K. Engelbart | 08519bb0f3 | |
delvh | dd7d232004 |
|
@ -1,6 +1,6 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<classpath>
|
<classpath>
|
||||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
|
||||||
<attributes>
|
<attributes>
|
||||||
<attribute name="maven.pomderived" value="true"/>
|
<attribute name="maven.pomderived" value="true"/>
|
||||||
</attributes>
|
</attributes>
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
name: Java CI
|
||||||
|
|
||||||
|
on: [push]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v1
|
||||||
|
- name: Set up JDK 11
|
||||||
|
uses: actions/setup-java@v1
|
||||||
|
with:
|
||||||
|
java-version: 11
|
||||||
|
- name: Build with Maven
|
||||||
|
run: mvn -B package --file pom.xml
|
|
@ -0,0 +1,4 @@
|
||||||
|
eclipse.preferences.version=1
|
||||||
|
encoding//src/main/java=UTF-8
|
||||||
|
encoding//src/test/java=UTF-8
|
||||||
|
encoding/<project>=UTF-8
|
|
@ -1,8 +1,8 @@
|
||||||
eclipse.preferences.version=1
|
eclipse.preferences.version=1
|
||||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
|
||||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
org.eclipse.jdt.core.compiler.compliance=11
|
||||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||||
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
||||||
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
|
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
|
||||||
org.eclipse.jdt.core.compiler.release=enabled
|
org.eclipse.jdt.core.compiler.release=disabled
|
||||||
org.eclipse.jdt.core.compiler.source=1.8
|
org.eclipse.jdt.core.compiler.source=11
|
||||||
|
|
52
pom.xml
52
pom.xml
|
@ -2,9 +2,20 @@
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<groupId>informatik-ag-ngl</groupId>
|
<groupId>informatik-ag-ngl</groupId>
|
||||||
<artifactId>java-nio-server</artifactId>
|
<artifactId>java-nio-server</artifactId>
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
<version>0.1-beta</version>
|
||||||
|
|
||||||
|
<name>java-nio-server</name>
|
||||||
|
<url>https://github.com/informatik-ag-ngl/java-nio-server</url>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||||
|
<maven.compiler.source>11</maven.compiler.source>
|
||||||
|
<maven.compiler.target>11</maven.compiler.target>
|
||||||
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -16,13 +27,42 @@
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
<finalName>java-nio-server</finalName>
|
||||||
|
<pluginManagement>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<version>3.8.1</version>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</pluginManagement>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<version>3.8.0</version>
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
<configuration>
|
<version>3.2.1</version>
|
||||||
<release>8</release>
|
<executions>
|
||||||
</configuration>
|
<execution>
|
||||||
|
<id>attach-sources</id>
|
||||||
|
<goals>
|
||||||
|
<goal>jar</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
|
<version>3.1.1</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>attach-javadocs</id>
|
||||||
|
<goals>
|
||||||
|
<goal>jar</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.jenkov.nioserver;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Project: <strong>java-nio-server</strong><br>
|
||||||
|
* File: <strong>ISocketIdListener.java</strong><br>
|
||||||
|
* Created: <strong>03.01.2020</strong><br>
|
||||||
|
*
|
||||||
|
* @author Kai S. K. Engelbart
|
||||||
|
*/
|
||||||
|
public interface ISocketIdListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is invoked when a new {@link Socket} is registered by the server
|
||||||
|
*
|
||||||
|
* @param socketId the ID of the newly registered socket
|
||||||
|
*/
|
||||||
|
void socketRegistered(long socketId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is invoked when a {@link Socket} is cancelled by the server
|
||||||
|
*
|
||||||
|
* @param socketId the ID of the cancelled socket
|
||||||
|
*/
|
||||||
|
void socketCancelled(long socketId);
|
||||||
|
}
|
|
@ -67,8 +67,8 @@ public class Message {
|
||||||
while (this.length + remaining > capacity)
|
while (this.length + remaining > capacity)
|
||||||
if (!this.messageBuffer.expandMessage(this)) return -1;
|
if (!this.messageBuffer.expandMessage(this)) return -1;
|
||||||
|
|
||||||
int bytesToCopy = Math.min(remaining, capacity - length);
|
int bytesToCopy = Math.min(remaining, capacity - this.length);
|
||||||
System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy);
|
System.arraycopy(byteArray, offset, sharedArray, this.offset + this.length, bytesToCopy);
|
||||||
this.length += bytesToCopy;
|
this.length += bytesToCopy;
|
||||||
return bytesToCopy;
|
return bytesToCopy;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,10 @@ public class MessageWriter {
|
||||||
private int bytesWritten;
|
private int bytesWritten;
|
||||||
|
|
||||||
public void enqueue(Message message) {
|
public void enqueue(Message message) {
|
||||||
if (messageInProgress == null) messageInProgress = message;
|
if (messageInProgress == null) {
|
||||||
else writeQueue.add(message);
|
messageInProgress = message;
|
||||||
|
bytesWritten = 0;
|
||||||
|
} else writeQueue.add(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
|
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
|
||||||
|
@ -31,8 +33,10 @@ public class MessageWriter {
|
||||||
byteBuffer.clear();
|
byteBuffer.clear();
|
||||||
|
|
||||||
if (bytesWritten >= messageInProgress.length) {
|
if (bytesWritten >= messageInProgress.length) {
|
||||||
if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0);
|
if (writeQueue.size() > 0) {
|
||||||
else messageInProgress = null;
|
messageInProgress = writeQueue.remove(0);
|
||||||
|
bytesWritten = 0;
|
||||||
|
} else messageInProgress = null;
|
||||||
// TODO: unregister from selector
|
// TODO: unregister from selector
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
||||||
*/
|
*/
|
||||||
public class Server {
|
public class Server {
|
||||||
|
|
||||||
private SocketAcceptor socketAccepter;
|
private SocketAcceptor socketAcceptor;
|
||||||
private SocketProcessor socketProcessor;
|
private SocketProcessor socketProcessor;
|
||||||
|
|
||||||
private int tcpPort;
|
private int tcpPort;
|
||||||
|
@ -30,17 +30,21 @@ public class Server {
|
||||||
|
|
||||||
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
|
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
|
||||||
|
|
||||||
socketAccepter = new SocketAcceptor(tcpPort, socketQueue);
|
socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
|
||||||
|
|
||||||
MessageBuffer readBuffer = new MessageBuffer();
|
MessageBuffer readBuffer = new MessageBuffer();
|
||||||
MessageBuffer writeBuffer = new MessageBuffer();
|
MessageBuffer writeBuffer = new MessageBuffer();
|
||||||
|
|
||||||
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
|
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
|
||||||
|
|
||||||
Thread accepterThread = new Thread(socketAccepter);
|
Thread accepterThread = new Thread(socketAcceptor);
|
||||||
Thread processorThread = new Thread(socketProcessor);
|
Thread processorThread = new Thread(socketProcessor);
|
||||||
|
|
||||||
accepterThread.start();
|
accepterThread.start();
|
||||||
processorThread.start();
|
processorThread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
|
||||||
|
|
||||||
|
public SocketProcessor getSocketProcessor() { return socketProcessor; }
|
||||||
}
|
}
|
|
@ -47,5 +47,4 @@ public class Socket {
|
||||||
|
|
||||||
return totalBytesWritten;
|
return totalBytesWritten;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
import java.util.HashMap;
|
import java.util.*;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Project: <strong>java-nio-server</strong><br>
|
* Project: <strong>java-nio-server</strong><br>
|
||||||
|
@ -51,13 +44,15 @@ public class SocketProcessor implements Runnable {
|
||||||
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
|
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
|
||||||
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
|
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
|
||||||
|
|
||||||
|
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
|
||||||
|
|
||||||
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
|
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
|
||||||
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
|
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
|
||||||
this.inboundSocketQueue = inboundSocketQueue;
|
this.inboundSocketQueue = inboundSocketQueue;
|
||||||
|
|
||||||
this.readMessageBuffer = readMessageBuffer;
|
this.readMessageBuffer = readMessageBuffer;
|
||||||
this.writeMessageBuffer = writeMessageBuffer;
|
this.writeMessageBuffer = writeMessageBuffer;
|
||||||
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
|
writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
|
||||||
|
|
||||||
this.messageReaderFactory = messageReaderFactory;
|
this.messageReaderFactory = messageReaderFactory;
|
||||||
|
|
||||||
|
@ -67,6 +62,7 @@ public class SocketProcessor implements Runnable {
|
||||||
writeSelector = Selector.open();
|
writeSelector = Selector.open();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
@ -97,6 +93,7 @@ public class SocketProcessor implements Runnable {
|
||||||
newSocket.messageWriter = new MessageWriter();
|
newSocket.messageWriter = new MessageWriter();
|
||||||
|
|
||||||
socketMap.put(newSocket.socketId, newSocket);
|
socketMap.put(newSocket.socketId, newSocket);
|
||||||
|
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
|
||||||
|
|
||||||
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
|
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
|
||||||
key.attach(newSocket);
|
key.attach(newSocket);
|
||||||
|
@ -109,7 +106,7 @@ public class SocketProcessor implements Runnable {
|
||||||
int readReady = readSelector.selectNow();
|
int readReady = readSelector.selectNow();
|
||||||
|
|
||||||
if (readReady > 0) {
|
if (readReady > 0) {
|
||||||
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
|
Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
|
||||||
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
|
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
|
||||||
|
|
||||||
while (keyIterator.hasNext()) {
|
while (keyIterator.hasNext()) {
|
||||||
|
@ -121,26 +118,40 @@ public class SocketProcessor implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void registerSocketIdListener(ISocketIdListener listener) {
|
||||||
|
socketIdListeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
private void readFromSocket(SelectionKey key) throws IOException {
|
private void readFromSocket(SelectionKey key) throws IOException {
|
||||||
Socket socket = (Socket) key.attachment();
|
Socket socket = (Socket) key.attachment();
|
||||||
socket.messageReader.read(socket, this.readByteBuffer);
|
boolean cancelled = false;
|
||||||
|
|
||||||
List<Message> fullMessages = socket.messageReader.getMessages();
|
try {
|
||||||
if (fullMessages.size() > 0) {
|
socket.messageReader.read(socket, readByteBuffer);
|
||||||
for (Message message : fullMessages) {
|
|
||||||
message.socketId = socket.socketId;
|
List<Message> fullMessages = socket.messageReader.getMessages();
|
||||||
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
|
if (fullMessages.size() > 0) {
|
||||||
// IMessageWriter for this socket.
|
for (Message message : fullMessages) {
|
||||||
|
message.socketId = socket.socketId;
|
||||||
|
// the message processor will eventually push outgoing messages into an
|
||||||
|
// IMessageWriter for this socket.
|
||||||
|
messageProcessor.process(message, writeProxy);
|
||||||
|
}
|
||||||
|
fullMessages.clear();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
|
||||||
|
cancelled = true;
|
||||||
|
} finally {
|
||||||
|
if (cancelled || socket.endOfStreamReached) {
|
||||||
|
System.out.println("Socket closed: " + socket.socketId);
|
||||||
|
socketMap.remove(socket.socketId);
|
||||||
|
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
|
||||||
|
key.attach(null);
|
||||||
|
key.cancel();
|
||||||
|
key.channel().close();
|
||||||
}
|
}
|
||||||
fullMessages.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (socket.endOfStreamReached) {
|
|
||||||
System.out.println("Socket closed: " + socket.socketId);
|
|
||||||
socketMap.remove(socket.socketId);
|
|
||||||
key.attach(null);
|
|
||||||
key.cancel();
|
|
||||||
key.channel().close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,10 +167,10 @@ public class SocketProcessor implements Runnable {
|
||||||
registerNonEmptySockets();
|
registerNonEmptySockets();
|
||||||
|
|
||||||
// Select from the Selector.
|
// Select from the Selector.
|
||||||
int writeReady = this.writeSelector.selectNow();
|
int writeReady = writeSelector.selectNow();
|
||||||
|
|
||||||
if (writeReady > 0) {
|
if (writeReady > 0) {
|
||||||
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
|
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
|
||||||
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
||||||
|
|
||||||
while (keyIterator.hasNext()) {
|
while (keyIterator.hasNext()) {
|
||||||
|
@ -167,9 +178,9 @@ public class SocketProcessor implements Runnable {
|
||||||
|
|
||||||
Socket socket = (Socket) key.attachment();
|
Socket socket = (Socket) key.attachment();
|
||||||
|
|
||||||
socket.messageWriter.write(socket, this.writeByteBuffer);
|
socket.messageWriter.write(socket, writeByteBuffer);
|
||||||
|
|
||||||
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
|
if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
|
||||||
|
|
||||||
keyIterator.remove();
|
keyIterator.remove();
|
||||||
}
|
}
|
||||||
|
@ -185,7 +196,7 @@ public class SocketProcessor implements Runnable {
|
||||||
|
|
||||||
private void cancelEmptySockets() {
|
private void cancelEmptySockets() {
|
||||||
for (Socket socket : nonEmptyToEmptySockets) {
|
for (Socket socket : nonEmptyToEmptySockets) {
|
||||||
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
|
SelectionKey key = socket.socketChannel.keyFor(writeSelector);
|
||||||
key.cancel();
|
key.cancel();
|
||||||
}
|
}
|
||||||
nonEmptyToEmptySockets.clear();
|
nonEmptyToEmptySockets.clear();
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
/**
|
||||||
|
* Contains the entire server API.
|
||||||
|
*
|
||||||
|
* @author Kai S. K. Engelbart
|
||||||
|
* @since java-nio-server v0.1-beta
|
||||||
|
*/
|
||||||
|
module java.nio.server {
|
||||||
|
|
||||||
|
exports com.jenkov.nioserver;
|
||||||
|
}
|
Reference in New Issue