Compare commits
8 Commits
v0.1-alpha
...
master
Author | SHA1 | Date | |
---|---|---|---|
5f4f626206 | |||
16b0a338a7 | |||
9fa8686c7c | |||
f0b007aa75 | |||
5b2ffb0f7e | |||
4fe5d9d5f8 | |||
08519bb0f3 | |||
|
dd7d232004 |
@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<classpath>
|
||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
|
||||
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
|
||||
<attributes>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
|
17
.github/workflows/maven.yml
vendored
Normal file
17
.github/workflows/maven.yml
vendored
Normal file
@ -0,0 +1,17 @@
|
||||
name: Java CI
|
||||
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: Set up JDK 11
|
||||
uses: actions/setup-java@v1
|
||||
with:
|
||||
java-version: 11
|
||||
- name: Build with Maven
|
||||
run: mvn -B package --file pom.xml
|
4
.settings/org.eclipse.core.resources.prefs
Normal file
4
.settings/org.eclipse.core.resources.prefs
Normal file
@ -0,0 +1,4 @@
|
||||
eclipse.preferences.version=1
|
||||
encoding//src/main/java=UTF-8
|
||||
encoding//src/test/java=UTF-8
|
||||
encoding/<project>=UTF-8
|
@ -1,8 +1,8 @@
|
||||
eclipse.preferences.version=1
|
||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
|
||||
org.eclipse.jdt.core.compiler.compliance=11
|
||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
||||
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
|
||||
org.eclipse.jdt.core.compiler.release=enabled
|
||||
org.eclipse.jdt.core.compiler.source=1.8
|
||||
org.eclipse.jdt.core.compiler.release=disabled
|
||||
org.eclipse.jdt.core.compiler.source=11
|
||||
|
52
pom.xml
52
pom.xml
@ -2,9 +2,20 @@
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>informatik-ag-ngl</groupId>
|
||||
<artifactId>java-nio-server</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<version>0.1-beta</version>
|
||||
|
||||
<name>java-nio-server</name>
|
||||
<url>https://github.com/informatik-ag-ngl/java-nio-server</url>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@ -16,13 +27,42 @@
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>java-nio-server</finalName>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.0</version>
|
||||
<configuration>
|
||||
<release>8</release>
|
||||
</configuration>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-sources</id>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-javadocs</id>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
25
src/main/java/com/jenkov/nioserver/ISocketIdListener.java
Normal file
25
src/main/java/com/jenkov/nioserver/ISocketIdListener.java
Normal file
@ -0,0 +1,25 @@
|
||||
package com.jenkov.nioserver;
|
||||
|
||||
/**
|
||||
* Project: <strong>java-nio-server</strong><br>
|
||||
* File: <strong>ISocketIdListener.java</strong><br>
|
||||
* Created: <strong>03.01.2020</strong><br>
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
*/
|
||||
public interface ISocketIdListener {
|
||||
|
||||
/**
|
||||
* Is invoked when a new {@link Socket} is registered by the server
|
||||
*
|
||||
* @param socketId the ID of the newly registered socket
|
||||
*/
|
||||
void socketRegistered(long socketId);
|
||||
|
||||
/**
|
||||
* Is invoked when a {@link Socket} is cancelled by the server
|
||||
*
|
||||
* @param socketId the ID of the cancelled socket
|
||||
*/
|
||||
void socketCancelled(long socketId);
|
||||
}
|
@ -67,8 +67,8 @@ public class Message {
|
||||
while (this.length + remaining > capacity)
|
||||
if (!this.messageBuffer.expandMessage(this)) return -1;
|
||||
|
||||
int bytesToCopy = Math.min(remaining, capacity - length);
|
||||
System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy);
|
||||
int bytesToCopy = Math.min(remaining, capacity - this.length);
|
||||
System.arraycopy(byteArray, offset, sharedArray, this.offset + this.length, bytesToCopy);
|
||||
this.length += bytesToCopy;
|
||||
return bytesToCopy;
|
||||
}
|
||||
|
@ -19,8 +19,10 @@ public class MessageWriter {
|
||||
private int bytesWritten;
|
||||
|
||||
public void enqueue(Message message) {
|
||||
if (messageInProgress == null) messageInProgress = message;
|
||||
else writeQueue.add(message);
|
||||
if (messageInProgress == null) {
|
||||
messageInProgress = message;
|
||||
bytesWritten = 0;
|
||||
} else writeQueue.add(message);
|
||||
}
|
||||
|
||||
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
|
||||
@ -31,8 +33,10 @@ public class MessageWriter {
|
||||
byteBuffer.clear();
|
||||
|
||||
if (bytesWritten >= messageInProgress.length) {
|
||||
if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0);
|
||||
else messageInProgress = null;
|
||||
if (writeQueue.size() > 0) {
|
||||
messageInProgress = writeQueue.remove(0);
|
||||
bytesWritten = 0;
|
||||
} else messageInProgress = null;
|
||||
// TODO: unregister from selector
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
||||
*/
|
||||
public class Server {
|
||||
|
||||
private SocketAcceptor socketAccepter;
|
||||
private SocketAcceptor socketAcceptor;
|
||||
private SocketProcessor socketProcessor;
|
||||
|
||||
private int tcpPort;
|
||||
@ -30,17 +30,21 @@ public class Server {
|
||||
|
||||
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
|
||||
|
||||
socketAccepter = new SocketAcceptor(tcpPort, socketQueue);
|
||||
socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
|
||||
|
||||
MessageBuffer readBuffer = new MessageBuffer();
|
||||
MessageBuffer writeBuffer = new MessageBuffer();
|
||||
|
||||
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
|
||||
|
||||
Thread accepterThread = new Thread(socketAccepter);
|
||||
Thread accepterThread = new Thread(socketAcceptor);
|
||||
Thread processorThread = new Thread(socketProcessor);
|
||||
|
||||
accepterThread.start();
|
||||
processorThread.start();
|
||||
}
|
||||
|
||||
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
|
||||
|
||||
public SocketProcessor getSocketProcessor() { return socketProcessor; }
|
||||
}
|
@ -47,5 +47,4 @@ public class Socket {
|
||||
|
||||
return totalBytesWritten;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Project: <strong>java-nio-server</strong><br>
|
||||
@ -51,13 +44,15 @@ public class SocketProcessor implements Runnable {
|
||||
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
|
||||
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
|
||||
|
||||
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
|
||||
|
||||
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
|
||||
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
|
||||
this.inboundSocketQueue = inboundSocketQueue;
|
||||
|
||||
this.readMessageBuffer = readMessageBuffer;
|
||||
this.writeMessageBuffer = writeMessageBuffer;
|
||||
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
|
||||
writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
|
||||
|
||||
this.messageReaderFactory = messageReaderFactory;
|
||||
|
||||
@ -67,6 +62,7 @@ public class SocketProcessor implements Runnable {
|
||||
writeSelector = Selector.open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
@ -97,6 +93,7 @@ public class SocketProcessor implements Runnable {
|
||||
newSocket.messageWriter = new MessageWriter();
|
||||
|
||||
socketMap.put(newSocket.socketId, newSocket);
|
||||
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
|
||||
|
||||
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
|
||||
key.attach(newSocket);
|
||||
@ -109,7 +106,7 @@ public class SocketProcessor implements Runnable {
|
||||
int readReady = readSelector.selectNow();
|
||||
|
||||
if (readReady > 0) {
|
||||
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
|
||||
Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
|
||||
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
|
||||
|
||||
while (keyIterator.hasNext()) {
|
||||
@ -121,26 +118,40 @@ public class SocketProcessor implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
public void registerSocketIdListener(ISocketIdListener listener) {
|
||||
socketIdListeners.add(listener);
|
||||
}
|
||||
|
||||
private void readFromSocket(SelectionKey key) throws IOException {
|
||||
Socket socket = (Socket) key.attachment();
|
||||
socket.messageReader.read(socket, this.readByteBuffer);
|
||||
boolean cancelled = false;
|
||||
|
||||
List<Message> fullMessages = socket.messageReader.getMessages();
|
||||
if (fullMessages.size() > 0) {
|
||||
for (Message message : fullMessages) {
|
||||
message.socketId = socket.socketId;
|
||||
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
|
||||
// IMessageWriter for this socket.
|
||||
try {
|
||||
socket.messageReader.read(socket, readByteBuffer);
|
||||
|
||||
List<Message> fullMessages = socket.messageReader.getMessages();
|
||||
if (fullMessages.size() > 0) {
|
||||
for (Message message : fullMessages) {
|
||||
message.socketId = socket.socketId;
|
||||
// the message processor will eventually push outgoing messages into an
|
||||
// IMessageWriter for this socket.
|
||||
messageProcessor.process(message, writeProxy);
|
||||
}
|
||||
fullMessages.clear();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
|
||||
cancelled = true;
|
||||
} finally {
|
||||
if (cancelled || socket.endOfStreamReached) {
|
||||
System.out.println("Socket closed: " + socket.socketId);
|
||||
socketMap.remove(socket.socketId);
|
||||
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
|
||||
key.attach(null);
|
||||
key.cancel();
|
||||
key.channel().close();
|
||||
}
|
||||
fullMessages.clear();
|
||||
}
|
||||
|
||||
if (socket.endOfStreamReached) {
|
||||
System.out.println("Socket closed: " + socket.socketId);
|
||||
socketMap.remove(socket.socketId);
|
||||
key.attach(null);
|
||||
key.cancel();
|
||||
key.channel().close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,10 +167,10 @@ public class SocketProcessor implements Runnable {
|
||||
registerNonEmptySockets();
|
||||
|
||||
// Select from the Selector.
|
||||
int writeReady = this.writeSelector.selectNow();
|
||||
int writeReady = writeSelector.selectNow();
|
||||
|
||||
if (writeReady > 0) {
|
||||
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
|
||||
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
|
||||
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
||||
|
||||
while (keyIterator.hasNext()) {
|
||||
@ -167,9 +178,9 @@ public class SocketProcessor implements Runnable {
|
||||
|
||||
Socket socket = (Socket) key.attachment();
|
||||
|
||||
socket.messageWriter.write(socket, this.writeByteBuffer);
|
||||
socket.messageWriter.write(socket, writeByteBuffer);
|
||||
|
||||
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
|
||||
if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
|
||||
|
||||
keyIterator.remove();
|
||||
}
|
||||
@ -185,7 +196,7 @@ public class SocketProcessor implements Runnable {
|
||||
|
||||
private void cancelEmptySockets() {
|
||||
for (Socket socket : nonEmptyToEmptySockets) {
|
||||
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
|
||||
SelectionKey key = socket.socketChannel.keyFor(writeSelector);
|
||||
key.cancel();
|
||||
}
|
||||
nonEmptyToEmptySockets.clear();
|
||||
|
10
src/main/java/module-info.java
Normal file
10
src/main/java/module-info.java
Normal file
@ -0,0 +1,10 @@
|
||||
/**
|
||||
* Contains the entire server API.
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
* @since java-nio-server v0.1-beta
|
||||
*/
|
||||
module java.nio.server {
|
||||
|
||||
exports com.jenkov.nioserver;
|
||||
}
|
Reference in New Issue
Block a user