Added ConnectionManager, propagating socketId to processors
This commit is contained in:
parent
10a387beea
commit
02c6f8ad45
79
src/main/java/envoy/server/ConnectionManager.java
Normal file
79
src/main/java/envoy/server/ConnectionManager.java
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package envoy.server;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.jenkov.nioserver.ISocketIdListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Project: <strong>envoy-server-standalone</strong><br>
|
||||||
|
* File: <strong>ConnectionManager.java</strong><br>
|
||||||
|
* Created: <strong>03.01.2020</strong><br>
|
||||||
|
*
|
||||||
|
* @author Kai S. K. Engelbart
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public class ConnectionManager implements ISocketIdListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains all socket IDs that have not yet performed a handshake / acquired
|
||||||
|
* their corresponding user ID.
|
||||||
|
*
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
private Set<Long> pendingSockets = new HashSet<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains all socket IDs that have acquired a user ID as keys to these IDs.
|
||||||
|
*
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
private Map<Long, Long> sockets = new HashMap<>();
|
||||||
|
|
||||||
|
private static ConnectionManager connectionManager = new ConnectionManager();
|
||||||
|
|
||||||
|
private ConnectionManager() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a singleton instance of this object
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public static ConnectionManager getInstance() { return connectionManager; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void socketCancelled(long socketId) {
|
||||||
|
if (!pendingSockets.remove(socketId))
|
||||||
|
sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void socketRegistered(long socketId) { pendingSockets.add(socketId); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Associates a socket ID with a user ID.
|
||||||
|
*
|
||||||
|
* @param socketId the socket ID
|
||||||
|
* @param userId the user ID
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public void registerUser(long socketId, long userId) {
|
||||||
|
sockets.put(socketId, userId);
|
||||||
|
pendingSockets.remove(socketId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param userId the ID of the user registered at the a socket
|
||||||
|
* @return the ID of the socket
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public long getSocketId(long userId) { return sockets.get(userId); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param userId the ID of the user to check for
|
||||||
|
* @return {@code true} if the user is online
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public boolean isOnline(long userId) { return sockets.containsKey(userId); }
|
||||||
|
}
|
@ -22,8 +22,10 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
|
|||||||
public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
|
public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public User process(LoginCredentials input) {
|
public User process(LoginCredentials input, long socketId) {
|
||||||
System.out.println("Received login credentials " + input);
|
System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId));
|
||||||
return new User(currentUserId++, input.getName());
|
User user = new User(currentUserId++, input.getName());
|
||||||
|
ConnectionManager.getInstance().registerUser(socketId, user.getId());
|
||||||
|
return user;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,5 +18,15 @@ public class MessageProcessor implements ObjectProcessor<Message, Void> {
|
|||||||
public Class<Message> getInputClass() { return Message.class; }
|
public Class<Message> getInputClass() { return Message.class; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void process(Message input) { return null; }
|
public Void process(Message message, long socketId) {
|
||||||
|
|
||||||
|
// TODO: Send message to recipient if online
|
||||||
|
ConnectionManager connectionManager = ConnectionManager.getInstance();
|
||||||
|
if (connectionManager.isOnline(message.getRecipientId())) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Add message to database
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,9 +22,10 @@ public interface ObjectProcessor<T, U> {
|
|||||||
Class<T> getInputClass();
|
Class<T> getInputClass();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param input the request object
|
* @param input the request object
|
||||||
|
* @param socketId the ID of the socket from which the object was received
|
||||||
* @return the response object
|
* @return the response object
|
||||||
* @since Envoy Server Standalone v0.1-alpha
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
*/
|
*/
|
||||||
U process(T input);
|
U process(T input, long socketId);
|
||||||
}
|
}
|
@ -32,7 +32,9 @@ public class Startup {
|
|||||||
Set<ObjectProcessor<?, ?>> processors = new HashSet<>();
|
Set<ObjectProcessor<?, ?>> processors = new HashSet<>();
|
||||||
processors.add(new LoginCredentialProcessor());
|
processors.add(new LoginCredentialProcessor());
|
||||||
processors.add(new MessageProcessor());
|
processors.add(new MessageProcessor());
|
||||||
|
|
||||||
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
|
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
|
||||||
server.start();
|
server.start();
|
||||||
|
server.getSocketProcessor().registerSocketIdListener(ConnectionManager.getInstance());
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -25,7 +25,7 @@ import envoy.server.ObjectProcessor;
|
|||||||
*/
|
*/
|
||||||
public class ObjectMessageProcessor implements IMessageProcessor {
|
public class ObjectMessageProcessor implements IMessageProcessor {
|
||||||
|
|
||||||
private final Set<ObjectProcessor<?, ?>> processors;
|
private final Set<ObjectProcessor<?, ?>> processors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The constructor to set the {@link ObjectProcessor}s.
|
* The constructor to set the {@link ObjectProcessor}s.
|
||||||
@ -33,7 +33,9 @@ public class ObjectMessageProcessor implements IMessageProcessor {
|
|||||||
* @param processors the {@link ObjectProcessor} to set
|
* @param processors the {@link ObjectProcessor} to set
|
||||||
* @since Envoy Server Standalone v0.1-alpha
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
*/
|
*/
|
||||||
public ObjectMessageProcessor(Set<ObjectProcessor<?, ?>> processors) { this.processors = processors; }
|
public ObjectMessageProcessor(Set<ObjectProcessor<?, ?>> processors) {
|
||||||
|
this.processors = processors;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
@ -44,7 +46,7 @@ public class ObjectMessageProcessor implements IMessageProcessor {
|
|||||||
|
|
||||||
// Process object
|
// Process object
|
||||||
processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
|
processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
|
||||||
Object responseObj = p.process(p.getInputClass().cast(obj));
|
Object responseObj = p.process(p.getInputClass().cast(obj), message.socketId);
|
||||||
if (responseObj != null) {
|
if (responseObj != null) {
|
||||||
// Create message targeted at the client
|
// Create message targeted at the client
|
||||||
Message response = writeProxy.getMessage();
|
Message response = writeProxy.getMessage();
|
||||||
|
Reference in New Issue
Block a user