Merge pull request #1 from informatik-ag-ngl/f/style_adjustment

Converted to Maven project, reformatted source files and added Javadoc headers
This commit is contained in:
Kai S. K. Engelbart 2019-12-28 11:50:57 +02:00 committed by GitHub
commit 6b9a4d4f1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1096 additions and 1099 deletions

27
.classpath Normal file
View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/bin/
/target/

23
.project Normal file
View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>java-nio-server</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>

View File

@ -0,0 +1,8 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.release=enabled
org.eclipse.jdt.core.compiler.source=1.8

View File

@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1

29
pom.xml Normal file
View File

@ -0,0 +1,29 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>informatik-ag-ngl</groupId>
<artifactId>java-nio-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0-M1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>8</release>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,10 +1,13 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtilTest.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public interface IMessageProcessor { public interface IMessageProcessor {
public void process(Message message, WriteProxy writeProxy); public void process(Message message, WriteProxy writeProxy);
} }

View File

@ -5,7 +5,11 @@ import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtilTest.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public interface IMessageReader { public interface IMessageReader {
@ -14,7 +18,4 @@ public interface IMessageReader {
public void read(Socket socket, ByteBuffer byteBuffer) throws IOException; public void read(Socket socket, ByteBuffer byteBuffer) throws IOException;
public List<Message> getMessages(); public List<Message> getMessages();
} }

View File

@ -1,10 +1,13 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtilTest.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public interface IMessageReaderFactory { public interface IMessageReaderFactory {
public IMessageReader createMessageReader(); public IMessageReader createMessageReader();
} }

View File

@ -3,27 +3,31 @@ package com.jenkov.nioserver;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtilTest.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class Message { public class Message {
private MessageBuffer messageBuffer = null; private MessageBuffer messageBuffer;
public long socketId = 0; // the id of source socket or destination socket, depending on whether is going in or out. public long socketId; // the id of source socket or destination socket, depending on whether is going
// in or out.
public byte[] sharedArray = null; public byte[] sharedArray;
public int offset = 0; //offset into sharedArray where this message data starts. public int offset; // offset into sharedArray where this message data starts.
public int capacity = 0; //the size of the section in the sharedArray allocated to this message. public int capacity; // the size of the section in the sharedArray allocated to this message.
public int length = 0; //the number of bytes used of the allocated section. public int length; // the number of bytes used of the allocated section.
public Object metaData = null; public Object metaData;
public Message(MessageBuffer messageBuffer) { public Message(MessageBuffer messageBuffer) { this.messageBuffer = messageBuffer; }
this.messageBuffer = messageBuffer;
}
/** /**
* Writes data from the ByteBuffer into this message - meaning into the buffer backing this message. * Writes data from the ByteBuffer into this message - meaning into the buffer
* backing this message.
* *
* @param byteBuffer The ByteBuffer containing the message data to write. * @param byteBuffer The ByteBuffer containing the message data to write.
* @return * @return
@ -31,35 +35,28 @@ public class Message {
public int writeToMessage(ByteBuffer byteBuffer) { public int writeToMessage(ByteBuffer byteBuffer) {
int remaining = byteBuffer.remaining(); int remaining = byteBuffer.remaining();
while(this.length + remaining > capacity){ while (this.length + remaining > capacity)
if(!this.messageBuffer.expandMessage(this)) { if (!this.messageBuffer.expandMessage(this)) return -1;
return -1;
}
}
int bytesToCopy = Math.min(remaining, this.capacity - this.length); int bytesToCopy = Math.min(remaining, capacity - length);
byteBuffer.get(this.sharedArray, this.offset + this.length, bytesToCopy); byteBuffer.get(sharedArray, offset + length, bytesToCopy);
this.length += bytesToCopy; length += bytesToCopy;
return bytesToCopy; return bytesToCopy;
} }
/** /**
* Writes data from the byte array into this message - meaning into the buffer backing this message. * Writes data from the byte array into this message - meaning into the buffer
* backing this message.
* *
* @param byteArray The byte array containing the message data to write. * @param byteArray The byte array containing the message data to write.
* @return * @return
*/ */
public int writeToMessage(byte[] byteArray){ public int writeToMessage(byte[] byteArray) { return writeToMessage(byteArray, 0, byteArray.length); }
return writeToMessage(byteArray, 0, byteArray.length);
}
/** /**
* Writes data from the byte array into this message - meaning into the buffer backing this message. * Writes data from the byte array into this message - meaning into the buffer
* backing this message.
* *
* @param byteArray The byte array containing the message data to write. * @param byteArray The byte array containing the message data to write.
* @return * @return
@ -67,39 +64,31 @@ public class Message {
public int writeToMessage(byte[] byteArray, int offset, int length) { public int writeToMessage(byte[] byteArray, int offset, int length) {
int remaining = length; int remaining = length;
while(this.length + remaining > capacity){ while (this.length + remaining > capacity)
if(!this.messageBuffer.expandMessage(this)) { if (!this.messageBuffer.expandMessage(this)) return -1;
return -1;
}
}
int bytesToCopy = Math.min(remaining, this.capacity - this.length); int bytesToCopy = Math.min(remaining, capacity - length);
System.arraycopy(byteArray, offset, this.sharedArray, this.offset + this.length, bytesToCopy); System.arraycopy(byteArray, offset, sharedArray, offset + this.length, bytesToCopy);
this.length += bytesToCopy; this.length += bytesToCopy;
return bytesToCopy; return bytesToCopy;
} }
/** /**
* In case the buffer backing the nextMessage contains more than one HTTP message, move all data after the first * In case the buffer backing the nextMessage contains more than one HTTP
* message, move all data after the first
* message to a new Message object. * message to a new Message object.
* *
* @param message The message containing the partial message (after the first message). * @param message The message containing the partial message (after the first
* @param endIndex The end index of the first message in the buffer of the message given as parameter. * message).
* @param endIndex The end index of the first message in the buffer of the
* message given as parameter.
*/ */
public void writePartialMessageToMessage(Message message, int endIndex) { public void writePartialMessageToMessage(Message message, int endIndex) {
int startIndexOfPartialMessage = message.offset + endIndex; int startIndexOfPartialMessage = message.offset + endIndex;
int lengthOfPartialMessage = (message.offset + message.length) - endIndex; int lengthOfPartialMessage = message.offset + message.length - endIndex;
System.arraycopy(message.sharedArray, startIndexOfPartialMessage, this.sharedArray, this.offset, lengthOfPartialMessage); System.arraycopy(message.sharedArray, startIndexOfPartialMessage, sharedArray, offset, lengthOfPartialMessage);
} }
public int writeToByteBuffer(ByteBuffer byteBuffer){ public int writeToByteBuffer(ByteBuffer byteBuffer) { return 0; }
return 0;
}
} }

View File

@ -1,12 +1,16 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
/** /**
* A shared buffer which can contain many messages inside. A message gets a section of the buffer to use. If the * A shared buffer which can contain many messages inside. A message gets a
* message outgrows the section in size, the message requests a larger section and the message is copied to that * section of the buffer to use. If the message outgrows the section in size,
* larger section. The smaller section is then freed again. * the message requests a larger section and the message is copied to that
* larger section. The smaller section is then freed again.<br>
* <br>
* Project: <strong>java-nio-server</strong><br>
* File: <strong>MessageBuffer.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
* *
* * @author jjenkov
* Created by jjenkov on 18-10-2015.
*/ */
public class MessageBuffer { public class MessageBuffer {
@ -15,7 +19,7 @@ public class MessageBuffer {
private static final int CAPACITY_SMALL = 4 * KB; private static final int CAPACITY_SMALL = 4 * KB;
private static final int CAPACITY_MEDIUM = 128 * KB; private static final int CAPACITY_MEDIUM = 128 * KB;
private static final int CAPACITY_LARGE = 1024 * KB; private static final int CAPACITY_LARGE = 1 * MB;
// package scope (default) - so they can be accessed from unit tests. // package scope (default) - so they can be accessed from unit tests.
byte[] smallMessageBuffer = new byte[1024 * 4 * KB]; // 1024 x 4KB messages = 4MB. byte[] smallMessageBuffer = new byte[1024 * 4 * KB]; // 1024 x 4KB messages = 4MB.
@ -26,20 +30,18 @@ public class MessageBuffer {
QueueIntFlip mediumMessageBufferFreeBlocks = new QueueIntFlip(128); // 128 free sections QueueIntFlip mediumMessageBufferFreeBlocks = new QueueIntFlip(128); // 128 free sections
QueueIntFlip largeMessageBufferFreeBlocks = new QueueIntFlip(16); // 16 free sections QueueIntFlip largeMessageBufferFreeBlocks = new QueueIntFlip(16); // 16 free sections
//todo make all message buffer capacities and block sizes configurable // TODO: make all message buffer capacities and block sizes configurable
//todo calculate free block queue sizes based on capacity and block size of buffers. // TODO: calculate free block queue sizes based on capacity and block size of
// buffers.
public MessageBuffer() { public MessageBuffer() {
// add all free sections to all free section queues. // add all free sections to all free section queues.
for(int i=0; i<smallMessageBuffer.length; i+= CAPACITY_SMALL){ for (int i = 0; i < smallMessageBuffer.length; i += CAPACITY_SMALL)
this.smallMessageBufferFreeBlocks.put(i); smallMessageBufferFreeBlocks.put(i);
} for (int i = 0; i < mediumMessageBuffer.length; i += CAPACITY_MEDIUM)
for(int i=0; i<mediumMessageBuffer.length; i+= CAPACITY_MEDIUM){ mediumMessageBufferFreeBlocks.put(i);
this.mediumMessageBufferFreeBlocks.put(i); for (int i = 0; i < largeMessageBuffer.length; i += CAPACITY_LARGE)
} largeMessageBufferFreeBlocks.put(i);
for(int i=0; i<largeMessageBuffer.length; i+= CAPACITY_LARGE){
this.largeMessageBufferFreeBlocks.put(i);
}
} }
public Message getMessage() { public Message getMessage() {
@ -47,7 +49,7 @@ public class MessageBuffer {
if (nextFreeSmallBlock == -1) return null; if (nextFreeSmallBlock == -1) return null;
Message message = new Message(this); //todo get from Message pool - caps memory usage. Message message = new Message(this); // TODO: get from Message pool - caps memory usage.
message.sharedArray = this.smallMessageBuffer; message.sharedArray = this.smallMessageBuffer;
message.capacity = CAPACITY_SMALL; message.capacity = CAPACITY_SMALL;
@ -58,13 +60,11 @@ public class MessageBuffer {
} }
public boolean expandMessage(Message message) { public boolean expandMessage(Message message) {
if(message.capacity == CAPACITY_SMALL){ if (message.capacity == CAPACITY_SMALL)
return moveMessage(message, this.smallMessageBufferFreeBlocks, this.mediumMessageBufferFreeBlocks, this.mediumMessageBuffer, CAPACITY_MEDIUM); return moveMessage(message, smallMessageBufferFreeBlocks, mediumMessageBufferFreeBlocks, mediumMessageBuffer, CAPACITY_MEDIUM);
} else if(message.capacity == CAPACITY_MEDIUM){ else if (message.capacity == CAPACITY_MEDIUM)
return moveMessage(message, this.mediumMessageBufferFreeBlocks, this.largeMessageBufferFreeBlocks, this.largeMessageBuffer, CAPACITY_LARGE); return moveMessage(message, mediumMessageBufferFreeBlocks, largeMessageBufferFreeBlocks, largeMessageBuffer, CAPACITY_LARGE);
} else { else return false;
return false;
}
} }
private boolean moveMessage(Message message, QueueIntFlip srcBlockQueue, QueueIntFlip destBlockQueue, byte[] dest, int newCapacity) { private boolean moveMessage(Message message, QueueIntFlip srcBlockQueue, QueueIntFlip destBlockQueue, byte[] dest, int newCapacity) {
@ -80,9 +80,4 @@ public class MessageBuffer {
message.capacity = newCapacity; message.capacity = newCapacity;
return true; return true;
} }
} }

View File

@ -6,44 +6,36 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* Created by jjenkov on 21-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>MessageWriter.java</strong><br>
* Created: <strong>21 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class MessageWriter { public class MessageWriter {
private List<Message> writeQueue = new ArrayList<>(); private List<Message> writeQueue = new ArrayList<>();
private Message messageInProgress = null; private Message messageInProgress;
private int bytesWritten = 0; private int bytesWritten;
public MessageWriter() {
}
public void enqueue(Message message) { public void enqueue(Message message) {
if(this.messageInProgress == null){ if (messageInProgress == null) messageInProgress = message;
this.messageInProgress = message; else writeQueue.add(message);
} else {
this.writeQueue.add(message);
}
} }
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten); byteBuffer.put(messageInProgress.sharedArray, messageInProgress.offset + bytesWritten, messageInProgress.length - bytesWritten);
byteBuffer.flip(); byteBuffer.flip();
this.bytesWritten += socket.write(byteBuffer); bytesWritten += socket.write(byteBuffer);
byteBuffer.clear(); byteBuffer.clear();
if(bytesWritten >= this.messageInProgress.length){ if (bytesWritten >= messageInProgress.length) {
if(this.writeQueue.size() > 0){ if (writeQueue.size() > 0) messageInProgress = writeQueue.remove(0);
this.messageInProgress = this.writeQueue.remove(0); else messageInProgress = null;
} else { // TODO: unregister from selector
this.messageInProgress = null;
//todo unregister from selector
}
} }
} }
public boolean isEmpty() { public boolean isEmpty() { return writeQueue.isEmpty() && messageInProgress == null; }
return this.writeQueue.isEmpty() && this.messageInProgress == null;
}
} }

View File

@ -1,47 +1,43 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
/** /**
* Same as QueueFillCount, except that QueueFlip uses a flip flag to keep track of when the internal writePos has * Same as QueueFillCount, except that QueueFlip uses a flip flag to keep track
* "overflowed" (meaning it goes back to 0). Other than that, the two implementations are very similar in functionality. * of when the internal writePos has "overflowed" (meaning it goes back to 0).
* Other than that, the two implementations are very similar in
* functionality.<br>
* <br>
* One additional difference is that QueueFlip has an available() method, where
* this is a public variable in QueueFillCount.<br>
* <br>
* Project: <strong>java-nio-server</strong><br>
* File: <strong>QueueIntFlip.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
* *
* One additional difference is that QueueFlip has an available() method, where this is a public variable in * @author jjenkov
* QueueFillCount.
*
* Created by jjenkov on 18-09-2015.
*/ */
public class QueueIntFlip { public class QueueIntFlip {
public int[] elements = null; public int[] elements;
public int capacity = 0; public int capacity;
public int writePos = 0; public int writePos;
public int readPos = 0; public int readPos;
public boolean flipped = false; public boolean flipped;
public QueueIntFlip(int capacity) { public QueueIntFlip(int capacity) {
this.capacity = capacity; this.capacity = capacity;
this.elements = new int[capacity]; //todo get from TypeAllocator ? elements = new int[capacity]; // TODO: get from TypeAllocator ?
} }
public void reset() { public void reset() {
this.writePos = 0; writePos = 0;
this.readPos = 0; readPos = 0;
this.flipped = false; flipped = false;
} }
public int available() { public int available() { return flipped ? capacity - readPos + writePos : writePos - readPos; }
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() { public int remainingCapacity() { return flipped ? readPos - writePos : capacity - writePos; }
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(int element) { public boolean put(int element) {
if (!flipped) { if (!flipped) {
@ -52,9 +48,7 @@ public class QueueIntFlip {
if (writePos < readPos) { if (writePos < readPos) {
elements[writePos++] = element; elements[writePos++] = element;
return true; return true;
} else { } else return false;
return false;
}
} else { } else {
elements[writePos++] = element; elements[writePos++] = element;
return true; return true;
@ -63,9 +57,7 @@ public class QueueIntFlip {
if (writePos < readPos) { if (writePos < readPos) {
elements[writePos++] = element; elements[writePos++] = element;
return true; return true;
} else { } else return false;
return false;
}
} }
} }
@ -78,27 +70,23 @@ public class QueueIntFlip {
if (length <= capacity - writePos) { if (length <= capacity - writePos) {
// new elements fit into top of elements array - copy directly // new elements fit into top of elements array - copy directly
for(; newElementsReadPos < length; newElementsReadPos++){ for (; newElementsReadPos < length; newElementsReadPos++)
this.elements[this.writePos++] = newElements[newElementsReadPos]; elements[writePos++] = newElements[newElementsReadPos];
}
return newElementsReadPos; return newElementsReadPos;
} else { } else {
// new elements must be divided between top and bottom of elements array // new elements must be divided between top and bottom of elements array
// writing to top // writing to top
for(;this.writePos < capacity; this.writePos++){ for (; writePos < capacity; writePos++)
this.elements[this.writePos] = newElements[newElementsReadPos++]; elements[writePos] = newElements[newElementsReadPos++];
}
// writing to bottom // writing to bottom
this.writePos = 0; this.writePos = 0;
this.flipped = true; this.flipped = true;
int endPos = Math.min(this.readPos, length - newElementsReadPos); int endPos = Math.min(readPos, length - newElementsReadPos);
for(; this.writePos < endPos; this.writePos++){ for (; writePos < endPos; writePos++)
this.elements[writePos] = newElements[newElementsReadPos++]; this.elements[writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos; return newElementsReadPos;
} }
@ -107,37 +95,24 @@ public class QueueIntFlip {
// readPos higher than writePos - free sections are: // readPos higher than writePos - free sections are:
// 1) from writePos to readPos // 1) from writePos to readPos
int endPos = Math.min(this.readPos, this.writePos + length); int endPos = Math.min(readPos, writePos + length);
for(; this.writePos < endPos; this.writePos++){ for (; writePos < endPos; writePos++)
this.elements[this.writePos] = newElements[newElementsReadPos++]; elements[writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos; return newElementsReadPos;
} }
} }
public int take() { public int take() {
if(!flipped){ if (!flipped) return readPos < writePos ? elements[readPos++] : -1;
if(readPos < writePos){ else {
return elements[readPos++];
} else {
return -1;
}
} else {
if (readPos == capacity) { if (readPos == capacity) {
readPos = 0; readPos = 0;
flipped = false; flipped = false;
if(readPos < writePos){ return readPos < writePos ? elements[readPos++] : -1;
return elements[readPos++]; } else return elements[readPos++];
} else {
return -1;
}
} else {
return elements[readPos++];
}
} }
} }
@ -146,19 +121,19 @@ public class QueueIntFlip {
if (!flipped) { if (!flipped) {
// writePos higher than readPos - available section is writePos - readPos // writePos higher than readPos - available section is writePos - readPos
int endPos = Math.min(this.writePos, this.readPos + length); int endPos = Math.min(writePos, readPos + length);
for(; this.readPos < endPos; this.readPos++){ for (; readPos < endPos; readPos++)
into[intoWritePos++] = this.elements[this.readPos]; into[intoWritePos++] = elements[readPos];
}
return intoWritePos; return intoWritePos;
} else { } else {
//readPos higher than writePos - available sections are top + bottom of elements array // readPos higher than writePos - available sections are top + bottom of
// elements array
if (length <= capacity - readPos) { if (length <= capacity - readPos) {
//length is lower than the elements available at the top of the elements array - copy directly // length is lower than the elements available at the top of the elements array
for(; intoWritePos < length; intoWritePos++){ // - copy directly
into[intoWritePos] = this.elements[this.readPos++]; for (; intoWritePos < length; intoWritePos++)
} into[intoWritePos] = elements[readPos++];
return intoWritePos; return intoWritePos;
} else { } else {
@ -166,21 +141,18 @@ public class QueueIntFlip {
// split copy into a copy from both top and bottom of elements array. // split copy into a copy from both top and bottom of elements array.
// copy from top // copy from top
for(; this.readPos < capacity; this.readPos++){ for (; readPos < capacity; readPos++)
into[intoWritePos++] = this.elements[this.readPos]; into[intoWritePos++] = elements[readPos];
}
// copy from bottom // copy from bottom
this.readPos = 0; readPos = 0;
this.flipped = false; flipped = false;
int endPos = Math.min(this.writePos, length - intoWritePos); int endPos = Math.min(writePos, length - intoWritePos);
for(; this.readPos < endPos; this.readPos++){ for (; readPos < endPos; readPos++)
into[intoWritePos++] = this.elements[this.readPos]; into[intoWritePos++] = elements[readPos];
}
return intoWritePos; return intoWritePos;
} }
} }
} }
} }

View File

@ -3,19 +3,22 @@ package com.jenkov.nioserver;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** /**
* Created by jjenkov on 24-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>Server.java</strong><br>
* Created: <strong>24 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class Server { public class Server {
private SocketAccepter socketAccepter = null; private SocketAcceptor socketAccepter;
private SocketProcessor socketProcessor = null; private SocketProcessor socketProcessor;
private int tcpPort = 0; private int tcpPort;
private IMessageReaderFactory messageReaderFactory = null; private IMessageReaderFactory messageReaderFactory;
private IMessageProcessor messageProcessor = null; private IMessageProcessor messageProcessor;
public Server(int tcpPort, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) { public Server(int tcpPort, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) {
this.tcpPort = tcpPort; this.tcpPort = tcpPort;
@ -25,22 +28,19 @@ public class Server {
public void start() throws IOException { public void start() throws IOException {
Queue socketQueue = new ArrayBlockingQueue(1024); //move 1024 to ServerConfig Queue<Socket> socketQueue = new ArrayBlockingQueue<>(1024); // TODO: move 1024 to ServerConfig
this.socketAccepter = new SocketAccepter(tcpPort, socketQueue);
socketAccepter = new SocketAcceptor(tcpPort, socketQueue);
MessageBuffer readBuffer = new MessageBuffer(); MessageBuffer readBuffer = new MessageBuffer();
MessageBuffer writeBuffer = new MessageBuffer(); MessageBuffer writeBuffer = new MessageBuffer();
this.socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor); socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
Thread accepterThread = new Thread(this.socketAccepter); Thread accepterThread = new Thread(socketAccepter);
Thread processorThread = new Thread(this.socketProcessor); Thread processorThread = new Thread(socketProcessor);
accepterThread.start(); accepterThread.start();
processorThread.start(); processorThread.start();
} }
} }

View File

@ -5,51 +5,47 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>Socket.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class Socket { public class Socket {
public long socketId; public long socketId;
public SocketChannel socketChannel = null; public SocketChannel socketChannel;
public IMessageReader messageReader = null; public IMessageReader messageReader;
public MessageWriter messageWriter = null; public MessageWriter messageWriter;
public boolean endOfStreamReached = false; public boolean endOfStreamReached;
public Socket() { public Socket(SocketChannel socketChannel) { this.socketChannel = socketChannel; }
}
public Socket(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public int read(ByteBuffer byteBuffer) throws IOException { public int read(ByteBuffer byteBuffer) throws IOException {
int bytesRead = this.socketChannel.read(byteBuffer); int bytesRead = socketChannel.read(byteBuffer);
int totalBytesRead = bytesRead; int totalBytesRead = bytesRead;
while (bytesRead > 0) { while (bytesRead > 0) {
bytesRead = this.socketChannel.read(byteBuffer); bytesRead = socketChannel.read(byteBuffer);
totalBytesRead += bytesRead; totalBytesRead += bytesRead;
} }
if(bytesRead == -1){ if (bytesRead == -1) endOfStreamReached = true;
this.endOfStreamReached = true;
}
return totalBytesRead; return totalBytesRead;
} }
public int write(ByteBuffer byteBuffer) throws IOException { public int write(ByteBuffer byteBuffer) throws IOException {
int bytesWritten = this.socketChannel.write(byteBuffer); int bytesWritten = socketChannel.write(byteBuffer);
int totalBytesWritten = bytesWritten; int totalBytesWritten = bytesWritten;
while (bytesWritten > 0 && byteBuffer.hasRemaining()) { while (bytesWritten > 0 && byteBuffer.hasRemaining()) {
bytesWritten = this.socketChannel.write(byteBuffer); bytesWritten = socketChannel.write(byteBuffer);
totalBytesWritten += bytesWritten; totalBytesWritten += bytesWritten;
} }
return totalBytesWritten; return totalBytesWritten;
} }
} }

View File

@ -1,53 +0,0 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Queue;
/**
* Created by jjenkov on 19-10-2015.
*/
public class SocketAccepter implements Runnable{
private int tcpPort = 0;
private ServerSocketChannel serverSocket = null;
private Queue socketQueue = null;
public SocketAccepter(int tcpPort, Queue socketQueue) {
this.tcpPort = tcpPort;
this.socketQueue = socketQueue;
}
public void run() {
try{
this.serverSocket = ServerSocketChannel.open();
this.serverSocket.bind(new InetSocketAddress(tcpPort));
} catch(IOException e){
e.printStackTrace();
return;
}
while(true){
try{
SocketChannel socketChannel = this.serverSocket.accept();
System.out.println("Socket accepted: " + socketChannel);
//todo check if the queue can even accept more sockets.
this.socketQueue.add(new Socket(socketChannel));
} catch(IOException e){
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,51 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Queue;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>SocketAcceptor.java</strong><br>
* Created: <strong>19 Oct 2015</strong><br>
*
* @author jjenkov
*/
public class SocketAcceptor implements Runnable {
private int tcpPort;
private ServerSocketChannel serverSocket;
private Queue<Socket> socketQueue;
public SocketAcceptor(int tcpPort, Queue<Socket> socketQueue) {
this.tcpPort = tcpPort;
this.socketQueue = socketQueue;
}
public void run() {
try {
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(tcpPort));
} catch (IOException e) {
e.printStackTrace();
return;
}
while (true) {
try {
SocketChannel socketChannel = serverSocket.accept();
System.out.println("Socket accepted: " + socketChannel);
// TODO: check if the queue can even accept more sockets.
this.socketQueue.add(new Socket(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

View File

@ -5,101 +5,108 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.*; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/** /**
* Created by jjenkov on 16-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>SocketProcessor.java</strong><br>
* Created: <strong>16 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class SocketProcessor implements Runnable { public class SocketProcessor implements Runnable {
private Queue<Socket> inboundSocketQueue = null; private Queue<Socket> inboundSocketQueue;
private MessageBuffer readMessageBuffer = null; //todo Not used now - but perhaps will be later - to check for space in the buffer before reading from sockets private MessageBuffer readMessageBuffer; // TODO: Not used now - but perhaps will be later - to check for space in the
private MessageBuffer writeMessageBuffer = null; //todo Not used now - but perhaps will be later - to check for space in the buffer before reading from sockets (space for more to write?) // buffer before reading from sockets
@SuppressWarnings("unused")
private MessageBuffer writeMessageBuffer; // TODO: Not used now - but perhaps will be later - to check for space in the
// buffer before reading from sockets (space for more to write?)
private IMessageReaderFactory messageReaderFactory = null; private IMessageReaderFactory messageReaderFactory;
private Queue<Message> outboundMessageQueue = new LinkedList<>(); //todo use a better / faster queue. private Queue<Message> outboundMessageQueue = new LinkedList<>(); // TODO: use a better / faster queue.
private Map<Long, Socket> socketMap = new HashMap<>(); private Map<Long, Socket> socketMap = new HashMap<>();
private ByteBuffer readByteBuffer = ByteBuffer.allocate(1024 * 1024); private ByteBuffer readByteBuffer = ByteBuffer.allocate(1024 * 1024);
private ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024 * 1024); private ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024 * 1024);
private Selector readSelector = null; private Selector readSelector;
private Selector writeSelector = null; private Selector writeSelector;
private IMessageProcessor messageProcessor = null; private IMessageProcessor messageProcessor;
private WriteProxy writeProxy = null; private WriteProxy writeProxy;
private long nextSocketId = 16 * 1024; //start incoming socket ids from 16K - reserve bottom ids for pre-defined sockets (servers). private long nextSocketId = 16 * 1024; // start incoming socket ids from 16K - reserve bottom ids for pre-defined
// sockets (servers).
private Set<Socket> emptyToNonEmptySockets = new HashSet<>(); private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>(); private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer,
public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue; this.inboundSocketQueue = inboundSocketQueue;
this.readMessageBuffer = readMessageBuffer; this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer; this.writeMessageBuffer = writeMessageBuffer;
this.writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory; this.messageReaderFactory = messageReaderFactory;
this.messageProcessor = messageProcessor; this.messageProcessor = messageProcessor;
this.readSelector = Selector.open(); readSelector = Selector.open();
this.writeSelector = Selector.open(); writeSelector = Selector.open();
} }
public void run() { public void run() {
while (true) { while (true) {
try { try {
executeCycle(); executeCycle();
} catch(IOException e){
e.printStackTrace();
}
try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (IOException | InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
public void executeCycle() throws IOException { public void executeCycle() throws IOException {
takeNewSockets(); takeNewSockets();
readFromSockets(); readFromSockets();
writeToSockets(); writeToSockets();
} }
public void takeNewSockets() throws IOException { public void takeNewSockets() throws IOException {
Socket newSocket = this.inboundSocketQueue.poll(); Socket newSocket = inboundSocketQueue.poll();
while (newSocket != null) { while (newSocket != null) {
newSocket.socketId = this.nextSocketId++; newSocket.socketId = nextSocketId++;
newSocket.socketChannel.configureBlocking(false); newSocket.socketChannel.configureBlocking(false);
newSocket.messageReader = this.messageReaderFactory.createMessageReader(); newSocket.messageReader = messageReaderFactory.createMessageReader();
newSocket.messageReader.init(this.readMessageBuffer); newSocket.messageReader.init(readMessageBuffer);
newSocket.messageWriter = new MessageWriter(); newSocket.messageWriter = new MessageWriter();
this.socketMap.put(newSocket.socketId, newSocket); socketMap.put(newSocket.socketId, newSocket);
SelectionKey key = newSocket.socketChannel.register(this.readSelector, SelectionKey.OP_READ); SelectionKey key = newSocket.socketChannel.register(readSelector, SelectionKey.OP_READ);
key.attach(newSocket); key.attach(newSocket);
newSocket = this.inboundSocketQueue.poll(); newSocket = inboundSocketQueue.poll();
} }
} }
public void readFromSockets() throws IOException { public void readFromSockets() throws IOException {
int readReady = this.readSelector.selectNow(); int readReady = readSelector.selectNow();
if (readReady > 0) { if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys(); Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
@ -107,9 +114,7 @@ public class SocketProcessor implements Runnable {
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next(); SelectionKey key = keyIterator.next();
readFromSocket(key); readFromSocket(key);
keyIterator.remove(); keyIterator.remove();
} }
selectedKeys.clear(); selectedKeys.clear();
@ -124,21 +129,21 @@ public class SocketProcessor implements Runnable {
if (fullMessages.size() > 0) { if (fullMessages.size() > 0) {
for (Message message : fullMessages) { for (Message message : fullMessages) {
message.socketId = socket.socketId; message.socketId = socket.socketId;
this.messageProcessor.process(message, this.writeProxy); //the message processor will eventually push outgoing messages into an IMessageWriter for this socket. messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
} }
fullMessages.clear(); fullMessages.clear();
} }
if (socket.endOfStreamReached) { if (socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId); System.out.println("Socket closed: " + socket.socketId);
this.socketMap.remove(socket.socketId); socketMap.remove(socket.socketId);
key.attach(null); key.attach(null);
key.cancel(); key.cancel();
key.channel().close(); key.channel().close();
} }
} }
public void writeToSockets() throws IOException { public void writeToSockets() throws IOException {
// Take all new messages from outboundMessageQueue // Take all new messages from outboundMessageQueue
@ -164,38 +169,32 @@ public class SocketProcessor implements Runnable {
socket.messageWriter.write(socket, this.writeByteBuffer); socket.messageWriter.write(socket, this.writeByteBuffer);
if(socket.messageWriter.isEmpty()){ if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
this.nonEmptyToEmptySockets.add(socket);
}
keyIterator.remove(); keyIterator.remove();
} }
selectionKeys.clear(); selectionKeys.clear();
} }
} }
private void registerNonEmptySockets() throws ClosedChannelException { private void registerNonEmptySockets() throws ClosedChannelException {
for(Socket socket : emptyToNonEmptySockets){ for (Socket socket : emptyToNonEmptySockets)
socket.socketChannel.register(this.writeSelector, SelectionKey.OP_WRITE, socket); socket.socketChannel.register(writeSelector, SelectionKey.OP_WRITE, socket);
}
emptyToNonEmptySockets.clear(); emptyToNonEmptySockets.clear();
} }
private void cancelEmptySockets() { private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) { for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
key.cancel(); key.cancel();
} }
nonEmptyToEmptySockets.clear(); nonEmptyToEmptySockets.clear();
} }
private void takeNewOutboundMessages() { private void takeNewOutboundMessages() {
Message outMessage = this.outboundMessageQueue.poll(); Message outMessage = outboundMessageQueue.poll();
while (outMessage != null) { while (outMessage != null) {
Socket socket = this.socketMap.get(outMessage.socketId); Socket socket = socketMap.get(outMessage.socketId);
if (socket != null) { if (socket != null) {
MessageWriter messageWriter = socket.messageWriter; MessageWriter messageWriter = socket.messageWriter;
@ -203,13 +202,10 @@ public class SocketProcessor implements Runnable {
messageWriter.enqueue(outMessage); messageWriter.enqueue(outMessage);
nonEmptyToEmptySockets.remove(socket); nonEmptyToEmptySockets.remove(socket);
emptyToNonEmptySockets.add(socket); // not necessary if removed from nonEmptyToEmptySockets in prev. statement. emptyToNonEmptySockets.add(socket); // not necessary if removed from nonEmptyToEmptySockets in prev. statement.
} else{ } else messageWriter.enqueue(outMessage);
messageWriter.enqueue(outMessage);
}
} }
outMessage = this.outboundMessageQueue.poll(); outMessage = outboundMessageQueue.poll();
} }
} }
} }

View File

@ -3,24 +3,23 @@ package com.jenkov.nioserver;
import java.util.Queue; import java.util.Queue;
/** /**
* Created by jjenkov on 22-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>WriteProxy.java</strong><br>
* Created: <strong>22 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class WriteProxy { public class WriteProxy {
private MessageBuffer messageBuffer = null; private MessageBuffer messageBuffer;
private Queue writeQueue = null; private Queue<Message> writeQueue;
public WriteProxy(MessageBuffer messageBuffer, Queue writeQueue) { public WriteProxy(MessageBuffer messageBuffer, Queue<Message> writeQueue) {
this.messageBuffer = messageBuffer; this.messageBuffer = messageBuffer;
this.writeQueue = writeQueue; this.writeQueue = writeQueue;
} }
public Message getMessage(){ public Message getMessage() { return messageBuffer.getMessage(); }
return this.messageBuffer.getMessage();
}
public boolean enqueue(Message message){
return this.writeQueue.offer(message);
}
public boolean enqueue(Message message) { return writeQueue.offer(message); }
} }

View File

@ -1,24 +1,25 @@
package com.jenkov.nioserver.example; package com.jenkov.nioserver.example;
import com.jenkov.nioserver.*; import java.io.IOException;
import com.jenkov.nioserver.IMessageProcessor;
import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.Server;
import com.jenkov.nioserver.http.HttpMessageReaderFactory; import com.jenkov.nioserver.http.HttpMessageReaderFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** /**
* Created by jjenkov on 19-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>Main.java</strong><br>
* Created: <strong>19 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class Main { public class Main {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
String httpResponse = "HTTP/1.1 200 OK\r\n" + String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + "Content-Type: text/html\r\n" + "\r\n"
"Content-Length: 38\r\n" + + "<html><body>Hello World!</body></html>";
"Content-Type: text/html\r\n" +
"\r\n" +
"<html><body>Hello World!</body></html>";
byte[] httpResponseBytes = httpResponse.getBytes("UTF-8"); byte[] httpResponseBytes = httpResponse.getBytes("UTF-8");
@ -35,8 +36,5 @@ public class Main {
Server server = new Server(9999, new HttpMessageReaderFactory(), messageProcessor); Server server = new Server(9999, new HttpMessageReaderFactory(), messageProcessor);
server.start(); server.start();
} }
} }

View File

@ -1,7 +1,11 @@
package com.jenkov.nioserver.http; package com.jenkov.nioserver.http;
/** /**
* Created by jjenkov on 19-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpHeaders.java</strong><br>
* Created: <strong>19 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class HttpHeaders { public class HttpHeaders {
@ -20,8 +24,4 @@ public class HttpHeaders {
public int bodyStartIndex = 0; public int bodyStartIndex = 0;
public int bodyEndIndex = 0; public int bodyEndIndex = 0;
} }

View File

@ -1,38 +1,39 @@
package com.jenkov.nioserver.http; package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageReader;
import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.MessageBuffer;
import com.jenkov.nioserver.Socket;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.jenkov.nioserver.IMessageReader;
import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.MessageBuffer;
import com.jenkov.nioserver.Socket;
/** /**
* Created by jjenkov on 18-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpMessageReader.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class HttpMessageReader implements IMessageReader { public class HttpMessageReader implements IMessageReader {
private MessageBuffer messageBuffer = null; private MessageBuffer messageBuffer;
private List<Message> completeMessages = new ArrayList<Message>(); private List<Message> completeMessages = new ArrayList<>();
private Message nextMessage = null; private Message nextMessage;
public HttpMessageReader() {
}
@Override @Override
public void init(MessageBuffer readMessageBuffer) { public void init(MessageBuffer readMessageBuffer) {
this.messageBuffer = readMessageBuffer; messageBuffer = readMessageBuffer;
this.nextMessage = messageBuffer.getMessage(); nextMessage = messageBuffer.getMessage();
this.nextMessage.metaData = new HttpHeaders(); nextMessage.metaData = new HttpHeaders();
} }
@Override @Override
public void read(Socket socket, ByteBuffer byteBuffer) throws IOException { public void read(Socket socket, ByteBuffer byteBuffer) throws IOException {
int bytesRead = socket.read(byteBuffer); socket.read(byteBuffer);
byteBuffer.flip(); byteBuffer.flip();
if (byteBuffer.remaining() == 0) { if (byteBuffer.remaining() == 0) {
@ -40,11 +41,14 @@ public class HttpMessageReader implements IMessageReader {
return; return;
} }
this.nextMessage.writeToMessage(byteBuffer); nextMessage.writeToMessage(byteBuffer);
int endIndex = HttpUtil.parseHttpRequest(this.nextMessage.sharedArray, this.nextMessage.offset, this.nextMessage.offset + this.nextMessage.length, (HttpHeaders) this.nextMessage.metaData); int endIndex = HttpUtil.parseHttpRequest(nextMessage.sharedArray,
nextMessage.offset,
nextMessage.offset + nextMessage.length,
(HttpHeaders) nextMessage.metaData);
if (endIndex != -1) { if (endIndex != -1) {
Message message = this.messageBuffer.getMessage(); Message message = messageBuffer.getMessage();
message.metaData = new HttpHeaders(); message.metaData = new HttpHeaders();
message.writePartialMessageToMessage(nextMessage, endIndex); message.writePartialMessageToMessage(nextMessage, endIndex);
@ -55,10 +59,6 @@ public class HttpMessageReader implements IMessageReader {
byteBuffer.clear(); byteBuffer.clear();
} }
@Override @Override
public List<Message> getMessages() { public List<Message> getMessages() { return completeMessages; }
return this.completeMessages;
}
} }

View File

@ -2,18 +2,16 @@ package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageReader; import com.jenkov.nioserver.IMessageReader;
import com.jenkov.nioserver.IMessageReaderFactory; import com.jenkov.nioserver.IMessageReaderFactory;
import com.jenkov.nioserver.MessageBuffer;
/** /**
* Created by jjenkov on 18-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpMessageReaderFactory.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class HttpMessageReaderFactory implements IMessageReaderFactory { public class HttpMessageReaderFactory implements IMessageReaderFactory {
public HttpMessageReaderFactory() {
}
@Override @Override
public IMessageReader createMessageReader() { public IMessageReader createMessageReader() { return new HttpMessageReader(); }
return new HttpMessageReader();
}
} }

View File

@ -3,7 +3,11 @@ package com.jenkov.nioserver.http;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
/** /**
* Created by jjenkov on 19-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtil.java</strong><br>
* Created: <strong>19 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class HttpUtil { public class HttpUtil {
@ -13,23 +17,22 @@ public class HttpUtil {
private static final byte[] HEAD = new byte[] { 'H', 'E', 'A', 'D' }; private static final byte[] HEAD = new byte[] { 'H', 'E', 'A', 'D' };
private static final byte[] DELETE = new byte[] { 'D', 'E', 'L', 'E', 'T', 'E' }; private static final byte[] DELETE = new byte[] { 'D', 'E', 'L', 'E', 'T', 'E' };
@SuppressWarnings("unused")
private static final byte[] HOST = new byte[] { 'H', 'o', 's', 't' }; private static final byte[] HOST = new byte[] { 'H', 'o', 's', 't' };
private static final byte[] CONTENT_LENGTH = new byte[] { 'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h' }; private static final byte[] CONTENT_LENGTH = new byte[] { 'C', 'o', 'n', 't', 'e', 'n', 't', '-', 'L', 'e', 'n', 'g', 't', 'h' };
public static int parseHttpRequest(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders) { public static int parseHttpRequest(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders) {
/* /*
int endOfHttpMethod = findNext(src, startIndex, endIndex, (byte) ' '); * int endOfHttpMethod = findNext(src, startIndex, endIndex, (byte) ' ');
if(endOfHttpMethod == -1) return false; * if(endOfHttpMethod == -1) return false;
resolveHttpMethod(src, startIndex, httpHeaders); * resolveHttpMethod(src, startIndex, httpHeaders);
*/ */
// parse HTTP request line // parse HTTP request line
int endOfFirstLine = findNextLineBreak(src, startIndex, endIndex); int endOfFirstLine = findNextLineBreak(src, startIndex, endIndex);
if (endOfFirstLine == -1) return -1; if (endOfFirstLine == -1) return -1;
// parse HTTP headers // parse HTTP headers
int prevEndOfHeader = endOfFirstLine + 1; int prevEndOfHeader = endOfFirstLine + 1;
int endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex); int endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex);
@ -48,9 +51,7 @@ public class HttpUtil {
endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex); endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex);
} }
if(endOfHeader == -1){ if (endOfHeader == -1) { return -1; }
return -1;
}
// check that byte array contains full HTTP message. // check that byte array contains full HTTP message.
int bodyStartIndex = endOfHeader + 1; int bodyStartIndex = endOfHeader + 1;
@ -63,7 +64,6 @@ public class HttpUtil {
return bodyEndIndex; return bodyEndIndex;
} }
return -1; return -1;
} }
@ -82,44 +82,35 @@ public class HttpUtil {
while (index < endIndex && !endOfValueFound) { while (index < endIndex && !endOfValueFound) {
switch (src[index]) { switch (src[index]) {
case '0' : ; case '0':
case '1' : ; case '1':
case '2' : ; case '2':
case '3' : ; case '3':
case '4' : ; case '4':
case '5' : ; case '5':
case '6' : ; case '6':
case '7' : ; case '7':
case '8' : ; case '8':
case '9' : { index++; break; } case '9':
index++;
default: { break;
default:
endOfValueFound = true; endOfValueFound = true;
valueEndIndex = index; valueEndIndex = index;
} }
} }
}
httpHeaders.contentLength = Integer.parseInt(new String(src, valueStartIndex, valueEndIndex - valueStartIndex, "UTF-8")); httpHeaders.contentLength = Integer.parseInt(new String(src, valueStartIndex, valueEndIndex - valueStartIndex, "UTF-8"));
} }
public static int findNext(byte[] src, int startIndex, int endIndex, byte value) { public static int findNext(byte[] src, int startIndex, int endIndex, byte value) {
for(int index = startIndex; index < endIndex; index++){ for (int index = startIndex; index < endIndex; index++)
if (src[index] == value) return index; if (src[index] == value) return index;
}
return -1; return -1;
} }
public static int findNextLineBreak(byte[] src, int startIndex, int endIndex) { public static int findNextLineBreak(byte[] src, int startIndex, int endIndex) {
for(int index = startIndex; index < endIndex; index++){ for (int index = startIndex; index < endIndex; index++)
if(src[index] == '\n'){ if (src[index] == '\n') if (src[index - 1] == '\r') return index;
if(src[index - 1] == '\r'){
return index;
}
};
}
return -1; return -1;
} }
@ -147,9 +138,8 @@ public class HttpUtil {
} }
public static boolean matches(byte[] src, int offset, byte[] value) { public static boolean matches(byte[] src, int offset, byte[] value) {
for(int i=offset, n=0; n < value.length; i++, n++){ for (int i = offset, n = 0; n < value.length; i++, n++)
if (src[i] != value[n]) return false; if (src[i] != value[n]) return false;
}
return true; return true;
} }
} }

View File

@ -1,80 +0,0 @@
package com.jenkov.nioserver;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
/**
* Created by jjenkov on 18-10-2015.
*/
public class MessageBufferTest {
@Test
public void testGetMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
assertNotNull(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(4 * 1024, message.capacity);
Message message2 = messageBuffer.getMessage();
assertNotNull(message2);
assertEquals(4096 , message2.offset);
assertEquals(0 , message2.length);
assertEquals(4 * 1024, message2.capacity);
//todo test what happens if the small buffer space is depleted of messages.
}
@Test
public void testExpandMessage(){
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
byte[] smallSharedArray = message.sharedArray;
assertNotNull(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(4 * 1024, message.capacity);
messageBuffer.expandMessage(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(128 * 1024, message.capacity);
byte[] mediumSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, mediumSharedArray);
messageBuffer.expandMessage(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(1024 * 1024, message.capacity);
byte[] largeSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, largeSharedArray);
assertNotSame(mediumSharedArray, largeSharedArray);
//next expansion should not be possible.
assertFalse(messageBuffer.expandMessage(message));
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(1024 * 1024, message.capacity);
assertSame(message.sharedArray, largeSharedArray);
}
}

View File

@ -1,59 +0,0 @@
package com.jenkov.nioserver;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import java.nio.ByteBuffer;
/**
* Created by jjenkov on 18-10-2015.
*/
public class MessageTest {
@Test
public void testWriteToMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
fill(byteBuffer, 4096);
int written = message.writeToMessage(byteBuffer);
assertEquals(4096, written);
assertEquals(4096, message.length);
assertSame(messageBuffer.smallMessageBuffer, message.sharedArray);
fill(byteBuffer, 124 * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(124 * 1024, written);
assertEquals(128 * 1024, message.length);
assertSame(messageBuffer.mediumMessageBuffer, message.sharedArray);
fill(byteBuffer, (1024-128) * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(896 * 1024, written);
assertEquals(1024 * 1024, message.length);
assertSame(messageBuffer.largeMessageBuffer, message.sharedArray);
fill(byteBuffer, 1);
written = message.writeToMessage(byteBuffer);
assertEquals(-1, written);
}
private void fill(ByteBuffer byteBuffer, int length){
byteBuffer.clear();
for(int i=0; i<length; i++){
byteBuffer.put((byte) (i%128));
}
byteBuffer.flip();
}
}

View File

@ -1,88 +0,0 @@
package com.jenkov.nioserver.http;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
/**
* Created by jjenkov on 19-10-2015.
*/
public class HttpUtilTest {
@Test
public void testResolveHttpMethod() throws UnsupportedEncodingException {
assertHttpMethod("GET / HTTP/1.1\r\n" , HttpHeaders.HTTP_METHOD_GET);
assertHttpMethod("POST / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_POST);
assertHttpMethod("PUT / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_PUT);
assertHttpMethod("HEAD / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_HEAD);
assertHttpMethod("DELETE / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_DELETE);
}
private void assertHttpMethod(String httpRequest, int httpMethod) throws UnsupportedEncodingException {
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.resolveHttpMethod(source, 0, httpHeaders);
assertEquals(httpMethod, httpHeaders.httpMethod);
}
@Test
public void testParseHttpRequest() throws UnsupportedEncodingException {
String httpRequest =
"GET / HTTP/1.1\r\n\r\n";
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders);
assertEquals(0, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n1234";
source = httpRequest.getBytes("UTF-8");
assertEquals(-1, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345" +
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
assertEquals(37, httpHeaders.bodyStartIndex);
assertEquals(42, httpHeaders.bodyEndIndex);
}
}

View File

@ -0,0 +1,79 @@
package com.jenkov.nioserver;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import org.junit.jupiter.api.Test;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>MessageBufferTest.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
*
* @author jjenkov
*/
public class MessageBufferTest {
@Test
public void testGetMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
assertNotNull(message);
assertEquals(0, message.offset);
assertEquals(0, message.length);
assertEquals(4 * 1024, message.capacity);
Message message2 = messageBuffer.getMessage();
assertNotNull(message2);
assertEquals(4096, message2.offset);
assertEquals(0, message2.length);
assertEquals(4 * 1024, message2.capacity);
// TODO: test what happens if the small buffer space is depleted of messages.
}
@Test
public void testExpandMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
byte[] smallSharedArray = message.sharedArray;
assertNotNull(message);
assertEquals(0, message.offset);
assertEquals(0, message.length);
assertEquals(4 * 1024, message.capacity);
messageBuffer.expandMessage(message);
assertEquals(0, message.offset);
assertEquals(0, message.length);
assertEquals(128 * 1024, message.capacity);
byte[] mediumSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, mediumSharedArray);
messageBuffer.expandMessage(message);
assertEquals(0, message.offset);
assertEquals(0, message.length);
assertEquals(1024 * 1024, message.capacity);
byte[] largeSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, largeSharedArray);
assertNotSame(mediumSharedArray, largeSharedArray);
// next expansion should not be possible.
assertFalse(messageBuffer.expandMessage(message));
assertEquals(0, message.offset);
assertEquals(0, message.length);
assertEquals(1024 * 1024, message.capacity);
assertSame(message.sharedArray, largeSharedArray);
}
}

View File

@ -0,0 +1,56 @@
package com.jenkov.nioserver;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import java.nio.ByteBuffer;
import org.junit.jupiter.api.Test;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>MessageTest.java</strong><br>
* Created: <strong>18 Oct 2015</strong><br>
*
* @author jjenkov
*/
public class MessageTest {
@Test
public void testWriteToMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
fill(byteBuffer, 4096);
int written = message.writeToMessage(byteBuffer);
assertEquals(4096, written);
assertEquals(4096, message.length);
assertSame(messageBuffer.smallMessageBuffer, message.sharedArray);
fill(byteBuffer, 124 * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(124 * 1024, written);
assertEquals(128 * 1024, message.length);
assertSame(messageBuffer.mediumMessageBuffer, message.sharedArray);
fill(byteBuffer, (1024 - 128) * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(896 * 1024, written);
assertEquals(1024 * 1024, message.length);
assertSame(messageBuffer.largeMessageBuffer, message.sharedArray);
fill(byteBuffer, 1);
written = message.writeToMessage(byteBuffer);
assertEquals(-1, written);
}
private void fill(ByteBuffer byteBuffer, int length) {
byteBuffer.clear();
for (int i = 0; i < length; i++)
byteBuffer.put((byte) (i % 128));
byteBuffer.flip();
}
}

View File

@ -1,15 +1,19 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.junit.jupiter.api.Test;
/** /**
* Created by jjenkov on 21-10-2015. * Project: <strong>java-nio-server</strong><br>
* File: <strong>SelectorTest.java</strong><br>
* Created: <strong>21 Oct 2015</strong><br>
*
* @author jjenkov
*/ */
public class SelectorTest { public class SelectorTest {
@ -27,10 +31,5 @@ public class SelectorTest {
SelectionKey key2 = socketChannel.register(selector, SelectionKey.OP_WRITE); SelectionKey key2 = socketChannel.register(selector, SelectionKey.OP_WRITE);
key2.cancel(); key2.cancel();
} }
} }

View File

@ -0,0 +1,67 @@
package com.jenkov.nioserver.http;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.UnsupportedEncodingException;
import org.junit.jupiter.api.Test;
/**
* Project: <strong>java-nio-server</strong><br>
* File: <strong>HttpUtilTest.java</strong><br>
* Created: <strong>19 Oct 2015</strong><br>
*
* @author jjenkov
*/
public class HttpUtilTest {
@Test
public void testResolveHttpMethod() throws UnsupportedEncodingException {
assertHttpMethod("GET / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_GET);
assertHttpMethod("POST / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_POST);
assertHttpMethod("PUT / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_PUT);
assertHttpMethod("HEAD / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_HEAD);
assertHttpMethod("DELETE / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_DELETE);
}
private void assertHttpMethod(String httpRequest, int httpMethod) throws UnsupportedEncodingException {
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.resolveHttpMethod(source, 0, httpHeaders);
assertEquals(httpMethod, httpHeaders.httpMethod);
}
@Test
public void testParseHttpRequest() throws UnsupportedEncodingException {
String httpRequest = "GET / HTTP/1.1\r\n\r\n";
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders);
assertEquals(0, httpHeaders.contentLength);
httpRequest = "GET / HTTP/1.1\r\n" + "Content-Length: 5\r\n" + "\r\n1234";
source = httpRequest.getBytes("UTF-8");
assertEquals(-1, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest = "GET / HTTP/1.1\r\n" + "Content-Length: 5\r\n" + "\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest = "GET / HTTP/1.1\r\n" + "Content-Length: 5\r\n" + "\r\n12345" + "GET / HTTP/1.1\r\n" + "Content-Length: 5\r\n" + "\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
assertEquals(37, httpHeaders.bodyStartIndex);
assertEquals(42, httpHeaders.bodyEndIndex);
}
}