From 02c6f8ad455ef22e2133ec5bd9c9102b11ba8392 Mon Sep 17 00:00:00 2001 From: kske Date: Fri, 3 Jan 2020 18:11:38 +0200 Subject: [PATCH] Added ConnectionManager, propagating socketId to processors --- .../java/envoy/server/ConnectionManager.java | 79 +++++++++++++++++++ .../server/LoginCredentialProcessor.java | 8 +- .../java/envoy/server/MessageProcessor.java | 12 ++- .../java/envoy/server/ObjectProcessor.java | 5 +- src/main/java/envoy/server/Startup.java | 2 + .../server/net/ObjectMessageProcessor.java | 8 +- 6 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 src/main/java/envoy/server/ConnectionManager.java diff --git a/src/main/java/envoy/server/ConnectionManager.java b/src/main/java/envoy/server/ConnectionManager.java new file mode 100644 index 0000000..ac4b0c0 --- /dev/null +++ b/src/main/java/envoy/server/ConnectionManager.java @@ -0,0 +1,79 @@ +package envoy.server; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.jenkov.nioserver.ISocketIdListener; + +/** + * Project: envoy-server-standalone
+ * File: ConnectionManager.java
+ * Created: 03.01.2020
+ * + * @author Kai S. K. Engelbart + * @since Envoy Server Standalone v0.1-alpha + */ +public class ConnectionManager implements ISocketIdListener { + + /** + * Contains all socket IDs that have not yet performed a handshake / acquired + * their corresponding user ID. + * + * @since Envoy Server Standalone v0.1-alpha + */ + private Set pendingSockets = new HashSet<>(); + + /** + * Contains all socket IDs that have acquired a user ID as keys to these IDs. + * + * @since Envoy Server Standalone v0.1-alpha + */ + private Map sockets = new HashMap<>(); + + private static ConnectionManager connectionManager = new ConnectionManager(); + + private ConnectionManager() {} + + /** + * @return a singleton instance of this object + * @since Envoy Server Standalone v0.1-alpha + */ + public static ConnectionManager getInstance() { return connectionManager; } + + @Override + public void socketCancelled(long socketId) { + if (!pendingSockets.remove(socketId)) + sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue())); + } + + @Override + public void socketRegistered(long socketId) { pendingSockets.add(socketId); } + + /** + * Associates a socket ID with a user ID. + * + * @param socketId the socket ID + * @param userId the user ID + * @since Envoy Server Standalone v0.1-alpha + */ + public void registerUser(long socketId, long userId) { + sockets.put(socketId, userId); + pendingSockets.remove(socketId); + } + + /** + * @param userId the ID of the user registered at the a socket + * @return the ID of the socket + * @since Envoy Server Standalone v0.1-alpha + */ + public long getSocketId(long userId) { return sockets.get(userId); } + + /** + * @param userId the ID of the user to check for + * @return {@code true} if the user is online + * @since Envoy Server Standalone v0.1-alpha + */ + public boolean isOnline(long userId) { return sockets.containsKey(userId); } +} diff --git a/src/main/java/envoy/server/LoginCredentialProcessor.java b/src/main/java/envoy/server/LoginCredentialProcessor.java index f208a3e..0a84edf 100644 --- a/src/main/java/envoy/server/LoginCredentialProcessor.java +++ b/src/main/java/envoy/server/LoginCredentialProcessor.java @@ -22,8 +22,10 @@ public class LoginCredentialProcessor implements ObjectProcessor getInputClass() { return LoginCredentials.class; } @Override - public User process(LoginCredentials input) { - System.out.println("Received login credentials " + input); - return new User(currentUserId++, input.getName()); + public User process(LoginCredentials input, long socketId) { + System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId)); + User user = new User(currentUserId++, input.getName()); + ConnectionManager.getInstance().registerUser(socketId, user.getId()); + return user; } } diff --git a/src/main/java/envoy/server/MessageProcessor.java b/src/main/java/envoy/server/MessageProcessor.java index ea52c55..8d80e53 100644 --- a/src/main/java/envoy/server/MessageProcessor.java +++ b/src/main/java/envoy/server/MessageProcessor.java @@ -18,5 +18,15 @@ public class MessageProcessor implements ObjectProcessor { public Class getInputClass() { return Message.class; } @Override - public Void process(Message input) { return null; } + public Void process(Message message, long socketId) { + + // TODO: Send message to recipient if online + ConnectionManager connectionManager = ConnectionManager.getInstance(); + if (connectionManager.isOnline(message.getRecipientId())) { + + } + + // TODO: Add message to database + return null; + } } diff --git a/src/main/java/envoy/server/ObjectProcessor.java b/src/main/java/envoy/server/ObjectProcessor.java index 44c07e1..07c0480 100644 --- a/src/main/java/envoy/server/ObjectProcessor.java +++ b/src/main/java/envoy/server/ObjectProcessor.java @@ -22,9 +22,10 @@ public interface ObjectProcessor { Class getInputClass(); /** - * @param input the request object + * @param input the request object + * @param socketId the ID of the socket from which the object was received * @return the response object * @since Envoy Server Standalone v0.1-alpha */ - U process(T input); + U process(T input, long socketId); } \ No newline at end of file diff --git a/src/main/java/envoy/server/Startup.java b/src/main/java/envoy/server/Startup.java index dd295a4..b616946 100644 --- a/src/main/java/envoy/server/Startup.java +++ b/src/main/java/envoy/server/Startup.java @@ -32,7 +32,9 @@ public class Startup { Set> processors = new HashSet<>(); processors.add(new LoginCredentialProcessor()); processors.add(new MessageProcessor()); + Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors)); server.start(); + server.getSocketProcessor().registerSocketIdListener(ConnectionManager.getInstance()); } } \ No newline at end of file diff --git a/src/main/java/envoy/server/net/ObjectMessageProcessor.java b/src/main/java/envoy/server/net/ObjectMessageProcessor.java index 6f652d9..1bf707f 100644 --- a/src/main/java/envoy/server/net/ObjectMessageProcessor.java +++ b/src/main/java/envoy/server/net/ObjectMessageProcessor.java @@ -25,7 +25,7 @@ import envoy.server.ObjectProcessor; */ public class ObjectMessageProcessor implements IMessageProcessor { - private final Set> processors; + private final Set> processors; /** * The constructor to set the {@link ObjectProcessor}s. @@ -33,7 +33,9 @@ public class ObjectMessageProcessor implements IMessageProcessor { * @param processors the {@link ObjectProcessor} to set * @since Envoy Server Standalone v0.1-alpha */ - public ObjectMessageProcessor(Set> processors) { this.processors = processors; } + public ObjectMessageProcessor(Set> processors) { + this.processors = processors; + } @SuppressWarnings("unchecked") @Override @@ -44,7 +46,7 @@ public class ObjectMessageProcessor implements IMessageProcessor { // Process object processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> { - Object responseObj = p.process(p.getInputClass().cast(obj)); + Object responseObj = p.process(p.getInputClass().cast(obj), message.socketId); if (responseObj != null) { // Create message targeted at the client Message response = writeProxy.getMessage();