Compare commits
11 Commits
master
...
v0.1-alpha
Author | SHA1 | Date | |
---|---|---|---|
9d430dbb75 | |||
6c3ceaae36 | |||
028399a3e6 | |||
5cf2959e1a | |||
27ac6b43c0 | |||
0044e5c200 | |||
35bce6dae7 | |||
ef44963f20 | |||
282f9b916a | |||
|
aedb2a9ac0 | ||
6b9a4d4f1c |
17
.github/workflows/maven.yml
vendored
Normal file
17
.github/workflows/maven.yml
vendored
Normal file
@ -0,0 +1,17 @@
|
||||
name: Java CI
|
||||
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: Set up JDK 1.8
|
||||
uses: actions/setup-java@v1
|
||||
with:
|
||||
java-version: 1.8
|
||||
- name: Build with Maven
|
||||
run: mvn -B package --file pom.xml
|
4
.settings/org.eclipse.core.resources.prefs
Normal file
4
.settings/org.eclipse.core.resources.prefs
Normal file
@ -0,0 +1,4 @@
|
||||
eclipse.preferences.version=1
|
||||
encoding//src/main/java=UTF-8
|
||||
encoding//src/test/java=UTF-8
|
||||
encoding/<project>=UTF-8
|
@ -4,5 +4,5 @@ org.eclipse.jdt.core.compiler.compliance=1.8
|
||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
|
||||
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
|
||||
org.eclipse.jdt.core.compiler.release=enabled
|
||||
org.eclipse.jdt.core.compiler.release=disabled
|
||||
org.eclipse.jdt.core.compiler.source=1.8
|
||||
|
41
pom.xml
41
pom.xml
@ -2,10 +2,21 @@
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>informatik-ag-ngl</groupId>
|
||||
<artifactId>java-nio-server</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
|
||||
<name>java-nio-server</name>
|
||||
<url>https://github.com/informatik-ag-ngl/java-nio-server</url>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
@ -16,13 +27,33 @@
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>java-nio-server</finalName>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.0</version>
|
||||
<configuration>
|
||||
<release>8</release>
|
||||
</configuration>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-sources</id>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>3.1.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-javadocs</id>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
25
src/main/java/com/jenkov/nioserver/ISocketIdListener.java
Normal file
25
src/main/java/com/jenkov/nioserver/ISocketIdListener.java
Normal file
@ -0,0 +1,25 @@
|
||||
package com.jenkov.nioserver;
|
||||
|
||||
/**
|
||||
* Project: <strong>java-nio-server</strong><br>
|
||||
* File: <strong>ISocketIdListener.java</strong><br>
|
||||
* Created: <strong>03.01.2020</strong><br>
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
*/
|
||||
public interface ISocketIdListener {
|
||||
|
||||
/**
|
||||
* Is invoked when a new {@link Socket} is registered by the server
|
||||
*
|
||||
* @param socketId the ID of the newly registered socket
|
||||
*/
|
||||
void socketRegistered(long socketId);
|
||||
|
||||
/**
|
||||
* Is invoked when a {@link Socket} is cancelled by the server
|
||||
*
|
||||
* @param socketId the ID of the cancelled socket
|
||||
*/
|
||||
void socketCancelled(long socketId);
|
||||
}
|
@ -68,7 +68,7 @@ public class Message {
|
||||
if (!this.messageBuffer.expandMessage(this)) return -1;
|
||||
|
||||
int bytesToCopy = Math.min(remaining, capacity - length);
|
||||
System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy);
|
||||
System.arraycopy(byteArray, offset, sharedArray, this.offset + this.length, bytesToCopy);
|
||||
this.length += bytesToCopy;
|
||||
return bytesToCopy;
|
||||
}
|
||||
|
@ -19,8 +19,10 @@ public class MessageWriter {
|
||||
private int bytesWritten;
|
||||
|
||||
public void enqueue(Message message) {
|
||||
if (messageInProgress == null) messageInProgress = message;
|
||||
else writeQueue.add(message);
|
||||
if (messageInProgress == null) {
|
||||
messageInProgress = message;
|
||||
bytesWritten = 0;
|
||||
} else writeQueue.add(message);
|
||||
}
|
||||
|
||||
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
|
||||
@ -31,8 +33,10 @@ public class MessageWriter {
|
||||
byteBuffer.clear();
|
||||
|
||||
if (bytesWritten >= messageInProgress.length) {
|
||||
if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0);
|
||||
else messageInProgress = null;
|
||||
if (writeQueue.size() > 0) {
|
||||
messageInProgress = writeQueue.remove(0);
|
||||
bytesWritten = 0;
|
||||
} else messageInProgress = null;
|
||||
// TODO: unregister from selector
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
||||
*/
|
||||
public class Server {
|
||||
|
||||
private SocketAcceptor socketAccepter;
|
||||
private SocketAcceptor socketAcceptor;
|
||||
private SocketProcessor socketProcessor;
|
||||
|
||||
private int tcpPort;
|
||||
@ -30,17 +30,21 @@ public class Server {
|
||||
|
||||
Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
|
||||
|
||||
socketAccepter = new SocketAcceptor(tcpPort, socketQueue);
|
||||
socketAcceptor = new SocketAcceptor(tcpPort, socketQueue);
|
||||
|
||||
MessageBuffer readBuffer = new MessageBuffer();
|
||||
MessageBuffer writeBuffer = new MessageBuffer();
|
||||
|
||||
socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
|
||||
|
||||
Thread accepterThread = new Thread(socketAccepter);
|
||||
Thread accepterThread = new Thread(socketAcceptor);
|
||||
Thread processorThread = new Thread(socketProcessor);
|
||||
|
||||
accepterThread.start();
|
||||
processorThread.start();
|
||||
}
|
||||
|
||||
public SocketAcceptor getSocketAcceptor() { return socketAcceptor; }
|
||||
|
||||
public SocketProcessor getSocketProcessor() { return socketProcessor; }
|
||||
}
|
@ -47,5 +47,4 @@ public class Socket {
|
||||
|
||||
return totalBytesWritten;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Project: <strong>java-nio-server</strong><br>
|
||||
@ -51,13 +44,15 @@ public class SocketProcessor implements Runnable {
|
||||
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
|
||||
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
|
||||
|
||||
private Set<ISocketIdListener> socketIdListeners = new HashSet<>();
|
||||
|
||||
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
|
||||
IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
|
||||
this.inboundSocketQueue = inboundSocketQueue;
|
||||
|
||||
this.readMessageBuffer = readMessageBuffer;
|
||||
this.writeMessageBuffer = writeMessageBuffer;
|
||||
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
|
||||
writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
|
||||
|
||||
this.messageReaderFactory = messageReaderFactory;
|
||||
|
||||
@ -67,6 +62,7 @@ public class SocketProcessor implements Runnable {
|
||||
writeSelector = Selector.open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
@ -97,6 +93,7 @@ public class SocketProcessor implements Runnable {
|
||||
newSocket.messageWriter = new MessageWriter();
|
||||
|
||||
socketMap.put(newSocket.socketId, newSocket);
|
||||
socketIdListeners.forEach(l -> l.socketRegistered(nextSocketId - 1));
|
||||
|
||||
SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
|
||||
key.attach(newSocket);
|
||||
@ -109,7 +106,7 @@ public class SocketProcessor implements Runnable {
|
||||
int readReady = readSelector.selectNow();
|
||||
|
||||
if (readReady > 0) {
|
||||
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
|
||||
Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
|
||||
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
|
||||
|
||||
while (keyIterator.hasNext()) {
|
||||
@ -121,26 +118,40 @@ public class SocketProcessor implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
public void registerSocketIdListener(ISocketIdListener listener) {
|
||||
socketIdListeners.add(listener);
|
||||
}
|
||||
|
||||
private void readFromSocket(SelectionKey key) throws IOException {
|
||||
Socket socket = (Socket) key.attachment();
|
||||
socket.messageReader.read(socket, this.readByteBuffer);
|
||||
boolean cancelled = false;
|
||||
|
||||
List<Message> fullMessages = socket.messageReader.getMessages();
|
||||
if (fullMessages.size() > 0) {
|
||||
for (Message message : fullMessages) {
|
||||
message.socketId = socket.socketId;
|
||||
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
|
||||
// IMessageWriter for this socket.
|
||||
try {
|
||||
socket.messageReader.read(socket, readByteBuffer);
|
||||
|
||||
List<Message> fullMessages = socket.messageReader.getMessages();
|
||||
if (fullMessages.size() > 0) {
|
||||
for (Message message : fullMessages) {
|
||||
message.socketId = socket.socketId;
|
||||
// the message processor will eventually push outgoing messages into an
|
||||
// IMessageWriter for this socket.
|
||||
messageProcessor.process(message, writeProxy);
|
||||
}
|
||||
fullMessages.clear();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
|
||||
cancelled = true;
|
||||
} finally {
|
||||
if (cancelled || socket.endOfStreamReached) {
|
||||
System.out.println("Socket closed: " + socket.socketId);
|
||||
socketMap.remove(socket.socketId);
|
||||
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
|
||||
key.attach(null);
|
||||
key.cancel();
|
||||
key.channel().close();
|
||||
}
|
||||
fullMessages.clear();
|
||||
}
|
||||
|
||||
if (socket.endOfStreamReached) {
|
||||
System.out.println("Socket closed: " + socket.socketId);
|
||||
socketMap.remove(socket.socketId);
|
||||
key.attach(null);
|
||||
key.cancel();
|
||||
key.channel().close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,10 +167,10 @@ public class SocketProcessor implements Runnable {
|
||||
registerNonEmptySockets();
|
||||
|
||||
// Select from the Selector.
|
||||
int writeReady = this.writeSelector.selectNow();
|
||||
int writeReady = writeSelector.selectNow();
|
||||
|
||||
if (writeReady > 0) {
|
||||
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
|
||||
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
|
||||
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
||||
|
||||
while (keyIterator.hasNext()) {
|
||||
@ -167,9 +178,9 @@ public class SocketProcessor implements Runnable {
|
||||
|
||||
Socket socket = (Socket) key.attachment();
|
||||
|
||||
socket.messageWriter.write(socket, this.writeByteBuffer);
|
||||
socket.messageWriter.write(socket, writeByteBuffer);
|
||||
|
||||
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
|
||||
if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
|
||||
|
||||
keyIterator.remove();
|
||||
}
|
||||
@ -185,7 +196,7 @@ public class SocketProcessor implements Runnable {
|
||||
|
||||
private void cancelEmptySockets() {
|
||||
for (Socket socket : nonEmptyToEmptySockets) {
|
||||
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
|
||||
SelectionKey key = socket.socketChannel.keyFor(writeSelector);
|
||||
key.cancel();
|
||||
}
|
||||
nonEmptyToEmptySockets.clear();
|
||||
|
Reference in New Issue
Block a user