Added ConnectionManager, propagating socketId to processors
This commit is contained in:
		
							
								
								
									
										79
									
								
								src/main/java/envoy/server/ConnectionManager.java
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										79
									
								
								src/main/java/envoy/server/ConnectionManager.java
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,79 @@
 | 
				
			|||||||
 | 
					package envoy.server;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.HashMap;
 | 
				
			||||||
 | 
					import java.util.HashSet;
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					import java.util.Set;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.jenkov.nioserver.ISocketIdListener;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Project: <strong>envoy-server-standalone</strong><br>
 | 
				
			||||||
 | 
					 * File: <strong>ConnectionManager.java</strong><br>
 | 
				
			||||||
 | 
					 * Created: <strong>03.01.2020</strong><br>
 | 
				
			||||||
 | 
					 * 
 | 
				
			||||||
 | 
					 * @author Kai S. K. Engelbart
 | 
				
			||||||
 | 
					 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					public class ConnectionManager implements ISocketIdListener {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * Contains all socket IDs that have not yet performed a handshake / acquired
 | 
				
			||||||
 | 
						 * their corresponding user ID.
 | 
				
			||||||
 | 
						 * 
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						private Set<Long> pendingSockets = new HashSet<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * Contains all socket IDs that have acquired a user ID as keys to these IDs.
 | 
				
			||||||
 | 
						 * 
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						private Map<Long, Long> sockets = new HashMap<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private static ConnectionManager connectionManager = new ConnectionManager();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private ConnectionManager() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * @return a singleton instance of this object
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						public static ConnectionManager getInstance() { return connectionManager; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public void socketCancelled(long socketId) {
 | 
				
			||||||
 | 
							if (!pendingSockets.remove(socketId))
 | 
				
			||||||
 | 
								sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue()));
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public void socketRegistered(long socketId) { pendingSockets.add(socketId); }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * Associates a socket ID with a user ID.
 | 
				
			||||||
 | 
						 * 
 | 
				
			||||||
 | 
						 * @param socketId the socket ID
 | 
				
			||||||
 | 
						 * @param userId   the user ID
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						public void registerUser(long socketId, long userId) {
 | 
				
			||||||
 | 
							sockets.put(socketId, userId);
 | 
				
			||||||
 | 
							pendingSockets.remove(socketId);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * @param userId the ID of the user registered at the a socket
 | 
				
			||||||
 | 
						 * @return the ID of the socket
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						public long getSocketId(long userId) { return sockets.get(userId); }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/**
 | 
				
			||||||
 | 
						 * @param userId the ID of the user to check for
 | 
				
			||||||
 | 
						 * @return {@code true} if the user is online
 | 
				
			||||||
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						public boolean isOnline(long userId) { return sockets.containsKey(userId); }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -22,8 +22,10 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
 | 
				
			|||||||
	public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
 | 
						public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Override
 | 
						@Override
 | 
				
			||||||
	public User process(LoginCredentials input) {
 | 
						public User process(LoginCredentials input, long socketId) {
 | 
				
			||||||
		System.out.println("Received login credentials " + input);
 | 
							System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId));
 | 
				
			||||||
		return new User(currentUserId++, input.getName());
 | 
							User user = new User(currentUserId++, input.getName());
 | 
				
			||||||
 | 
							ConnectionManager.getInstance().registerUser(socketId, user.getId());
 | 
				
			||||||
 | 
							return user;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,5 +18,15 @@ public class MessageProcessor implements ObjectProcessor<Message, Void> {
 | 
				
			|||||||
	public Class<Message> getInputClass() { return Message.class; }
 | 
						public Class<Message> getInputClass() { return Message.class; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Override
 | 
						@Override
 | 
				
			||||||
	public Void process(Message input) { return null; }
 | 
						public Void process(Message message, long socketId) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// TODO: Send message to recipient if online
 | 
				
			||||||
 | 
							ConnectionManager connectionManager = ConnectionManager.getInstance();
 | 
				
			||||||
 | 
							if (connectionManager.isOnline(message.getRecipientId())) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// TODO: Add message to database
 | 
				
			||||||
 | 
							return null;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -22,9 +22,10 @@ public interface ObjectProcessor<T, U> {
 | 
				
			|||||||
	Class<T> getInputClass();
 | 
						Class<T> getInputClass();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/**
 | 
						/**
 | 
				
			||||||
	 * @param input the request object
 | 
						 * @param input    the request object
 | 
				
			||||||
 | 
						 * @param socketId the ID of the socket from which the object was received
 | 
				
			||||||
	 * @return the response object
 | 
						 * @return the response object
 | 
				
			||||||
	 * @since Envoy Server Standalone v0.1-alpha
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	U process(T input);
 | 
						U process(T input, long socketId);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -32,7 +32,9 @@ public class Startup {
 | 
				
			|||||||
		Set<ObjectProcessor<?, ?>> processors = new HashSet<>();
 | 
							Set<ObjectProcessor<?, ?>> processors = new HashSet<>();
 | 
				
			||||||
		processors.add(new LoginCredentialProcessor());
 | 
							processors.add(new LoginCredentialProcessor());
 | 
				
			||||||
		processors.add(new MessageProcessor());
 | 
							processors.add(new MessageProcessor());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
 | 
							Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
 | 
				
			||||||
		server.start();
 | 
							server.start();
 | 
				
			||||||
 | 
							server.getSocketProcessor().registerSocketIdListener(ConnectionManager.getInstance());
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -25,7 +25,7 @@ import envoy.server.ObjectProcessor;
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
public class ObjectMessageProcessor implements IMessageProcessor {
 | 
					public class ObjectMessageProcessor implements IMessageProcessor {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private final Set<ObjectProcessor<?, ?>> processors;
 | 
						private final Set<ObjectProcessor<?, ?>>	processors;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/**
 | 
						/**
 | 
				
			||||||
	 * The constructor to set the {@link ObjectProcessor}s.
 | 
						 * The constructor to set the {@link ObjectProcessor}s.
 | 
				
			||||||
@@ -33,7 +33,9 @@ public class ObjectMessageProcessor implements IMessageProcessor {
 | 
				
			|||||||
	 * @param processors the {@link ObjectProcessor} to set
 | 
						 * @param processors the {@link ObjectProcessor} to set
 | 
				
			||||||
	 * @since Envoy Server Standalone v0.1-alpha
 | 
						 * @since Envoy Server Standalone v0.1-alpha
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	public ObjectMessageProcessor(Set<ObjectProcessor<?, ?>> processors) { this.processors = processors; }
 | 
						public ObjectMessageProcessor(Set<ObjectProcessor<?, ?>> processors) {
 | 
				
			||||||
 | 
							this.processors			= processors;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@SuppressWarnings("unchecked")
 | 
						@SuppressWarnings("unchecked")
 | 
				
			||||||
	@Override
 | 
						@Override
 | 
				
			||||||
@@ -44,7 +46,7 @@ public class ObjectMessageProcessor implements IMessageProcessor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
			// Process object
 | 
								// Process object
 | 
				
			||||||
			processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
 | 
								processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
 | 
				
			||||||
				Object responseObj = p.process(p.getInputClass().cast(obj));
 | 
									Object responseObj = p.process(p.getInputClass().cast(obj), message.socketId);
 | 
				
			||||||
				if (responseObj != null) {
 | 
									if (responseObj != null) {
 | 
				
			||||||
					// Create message targeted at the client
 | 
										// Create message targeted at the client
 | 
				
			||||||
					Message response = writeProxy.getMessage();
 | 
										Message response = writeProxy.getMessage();
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user