Compare commits

...

8 Commits

12 changed files with 166 additions and 52 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>

17
.github/workflows/maven.yml vendored Normal file
View File

@ -0,0 +1,17 @@
name: Java CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Build with Maven
run: mvn -B package --file pom.xml

View File

@ -0,0 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/test/java=UTF-8
encoding/<project>=UTF-8

View File

@ -1,8 +1,8 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.codegen.targetPlatform=11
org.eclipse.jdt.core.compiler.compliance=11
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=enabled
org.eclipse.jdt.core.compiler.source=1.8
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=11

52
pom.xml
View File

@ -2,9 +2,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>informatik-ag-ngl</groupId>
<artifactId>java-nio-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<version>0.1-beta</version>
<name>java-nio-server</name>
<url>https://github.com/informatik-ag-ngl/java-nio-server</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
@ -16,13 +27,42 @@
</dependencies>
<build>
<finalName>java-nio-server</finalName>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>8</release>
</configuration>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -0,0 +1,25 @@
package com.jenkov.nioserver;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>ISocketIdListener.java</strong><br>
* Created: <strong>03.01.2020</strong><br>
*
* @author Kai S. K. Engelbart
*/
public interface ISocketIdListener {
/**
* Is invoked when a new {@link Socket} is registered by the server
*
* @param socketId the ID of the newly registered socket
*/
void socketRegistered(long socketId);
/**
* Is invoked when a {@link Socket} is cancelled by the server
*
* @param socketId the ID of the cancelled socket
*/
void socketCancelled(long socketId);
}

View File

@ -67,8 +67,8 @@ public class Message {
while (this.length + remaining > capacity)
if (!this.messageBuffer.expandMessage(this)) return -1;
int bytesToCopy = Math.min(remaining, capacity - length);
System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy);
int bytesToCopy = Math.min(remaining, capacity - this.length);
System.arraycopy(byteArray, offset, sharedArray, this.offset + this.length, bytesToCopy);
this.length += bytesToCopy;
return bytesToCopy;
}

View File

@ -19,8 +19,10 @@ public class MessageWriter {
private int bytesWritten;
public void enqueue(Message message) {
if (messageInProgress == null) messageInProgress = message;
else writeQueue.add(message);
if (messageInProgress == null) {
messageInProgress = message;
bytesWritten = 0;
} else writeQueue.add(message);
}
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
@ -31,8 +33,10 @@ public class MessageWriter {
byteBuffer.clear();
if (bytesWritten >= messageInProgress.length) {
if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0);
else messageInProgress = null;
if (writeQueue.size() > 0) {
messageInProgress = writeQueue.remove(0);
bytesWritten = 0;
} else messageInProgress = null;
// TODO: unregister from selector
}
}

View File

@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
*/
public class Server {
private SocketAcceptor socketAccepter;
private SocketAcceptor socketAcceptor;
private SocketProcessor socketProcessor;
private int tcpPort;
@ -30,17 +30,21 @@ public class Server {
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
socketAccepter = new SocketAcceptor(tcpPort, socketQueue);
socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
MessageBuffer readBuffer = new MessageBuffer();
MessageBuffer writeBuffer = new MessageBuffer();
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
Thread accepterThread = new Thread(socketAccepter);
Thread accepterThread = new Thread(socketAcceptor);
Thread processorThread = new Thread(socketProcessor);
accepterThread.start();
processorThread.start();
}
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
public SocketProcessor getSocketProcessor() { return socketProcessor; }
}

View File

@ -47,5 +47,4 @@ public class Socket {
return totalBytesWritten;
}
}

View File

@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.*;
/**
* Project: <strong>java-nio-server</strong><br>
@ -51,13 +44,15 @@ public class SocketProcessor implements Runnable {
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue;
this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer;
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory;
@ -67,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open();
}
@Override
public void run() {
while (true) {
try {
@ -97,6 +93,7 @@ public class SocketProcessor implements Runnable {
newSocket.messageWriter = new MessageWriter();
socketMap.put(newSocket.socketId, newSocket);
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
key.attach(newSocket);
@ -109,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow();
if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
@ -121,26 +118,40 @@ public class SocketProcessor implements Runnable {
}
}
public void registerSocketIdListener(ISocketIdListener listener) {
socketIdListeners.add(listener);
}
private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer);
boolean cancelled = false;
List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) {
for (Message message : fullMessages) {
message.socketId = socket.socketId;
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
try {
socket.messageReader.read(socket, readByteBuffer);
List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) {
for (Message message : fullMessages) {
message.socketId = socket.socketId;
// the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
messageProcessor.process(message, writeProxy);
}
fullMessages.clear();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
cancelled = true;
} finally {
if (cancelled || socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null);
key.cancel();
key.channel().close();
}
fullMessages.clear();
}
if (socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
key.attach(null);
key.cancel();
key.channel().close();
}
}
@ -156,10 +167,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets();
// Select from the Selector.
int writeReady = this.writeSelector.selectNow();
int writeReady = writeSelector.selectNow();
if (writeReady > 0) {
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
@ -167,9 +178,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer);
socket.messageWriter.write(socket, writeByteBuffer);
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove();
}
@ -185,7 +196,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel();
}
nonEmptyToEmptySockets.clear();

View File

@ -0,0 +1,10 @@
/**
* Contains the entire server API.
*
* @author Kai S. K. Engelbart
* @since java-nio-server v0.1-beta
*/
module java.nio.server {
exports com.jenkov.nioserver;
}