diff --git a/src/main/java/envoy/server/ConnectionManager.java b/src/main/java/envoy/server/ConnectionManager.java index 202334f..9dfd56b 100644 --- a/src/main/java/envoy/server/ConnectionManager.java +++ b/src/main/java/envoy/server/ConnectionManager.java @@ -1,5 +1,6 @@ package envoy.server; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -7,6 +8,10 @@ import java.util.Set; import com.jenkov.nioserver.ISocketIdListener; +import envoy.data.User.UserStatus; +import envoy.server.database.PersistenceManager; +import envoy.server.processors.UserStatusChangeProcessor; + /** * Project: envoy-server-standalone
* File: ConnectionManager.java
@@ -44,8 +49,16 @@ public class ConnectionManager implements ISocketIdListener { @Override public void socketCancelled(long socketId) { - if (!pendingSockets.remove(socketId)) + if (!pendingSockets.remove(socketId)) { + // notifying contacts of this users offline-going + envoy.server.data.User user = PersistenceManager.getPersistenceManager().getUserById(getUserIdBySocketId(socketId)); + user.setStatus(UserStatus.OFFLINE); + user.setLastSeen(new Date()); + UserStatusChangeProcessor.updateUserStatus(user); + + // removing the socket sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue())); + } } @Override @@ -64,12 +77,21 @@ public class ConnectionManager implements ISocketIdListener { } /** - * @param userId the ID of the user registered at the a socket + * @param userId the ID of the user registered at a socket * @return the ID of the socket * @since Envoy Server Standalone v0.1-alpha */ public long getSocketId(long userId) { return sockets.get(userId); } + /** + * @param socketId the id of the socket whose User is needed + * @return the userId associated with this socketId + * @since Envoy Server Standalone v0.1-alpha + */ + public long getUserIdBySocketId(long socketId) { + return sockets.entrySet().stream().filter((entry) -> entry.getValue().equals(socketId)).findFirst().get().getKey(); + } + /** * @param userId the ID of the user to check for * @return {@code true} if the user is online diff --git a/src/main/java/envoy/server/Startup.java b/src/main/java/envoy/server/Startup.java index 097399e..82d9915 100644 --- a/src/main/java/envoy/server/Startup.java +++ b/src/main/java/envoy/server/Startup.java @@ -36,6 +36,7 @@ public class Startup { processors.add(new LoginCredentialProcessor()); processors.add(new MessageProcessor()); processors.add(new MessageStatusChangeProcessor()); + processors.add(new UserStatusChangeProcessor()); processors.add(new IdGeneratorRequestProcessor()); Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors)); diff --git a/src/main/java/envoy/server/database/PersistenceManager.java b/src/main/java/envoy/server/database/PersistenceManager.java index 3ec686a..803f177 100644 --- a/src/main/java/envoy/server/database/PersistenceManager.java +++ b/src/main/java/envoy/server/database/PersistenceManager.java @@ -108,6 +108,30 @@ public class PersistenceManager { entityManager.getTransaction().commit(); } + /** + * Deletes a {@link User} in the database. + * + * @param user the {@link User} to delete + * @since Envoy Server Standalone v0.1-alpha + */ + public void deleteUser(User user) { + entityManager.getTransaction().begin(); + entityManager.remove(user); + entityManager.getTransaction().commit(); + } + + /** + * Deletes a {@link Message} in the database. + * + * @param message the {@link Message} to delete + * @since Envoy Server Standalone v0.1-alpha + */ + public void deleteMessage(Message message) { + entityManager.getTransaction().begin(); + entityManager.remove(message); + entityManager.getTransaction().commit(); + } + /** * Searches for a {@link User} with a specific id. * diff --git a/src/main/java/envoy/server/net/ObjectMessageProcessor.java b/src/main/java/envoy/server/net/ObjectMessageProcessor.java index 226fb69..3bbe43e 100644 --- a/src/main/java/envoy/server/net/ObjectMessageProcessor.java +++ b/src/main/java/envoy/server/net/ObjectMessageProcessor.java @@ -33,7 +33,6 @@ public class ObjectMessageProcessor implements IMessageProcessor { */ public ObjectMessageProcessor(Set> processors) { this.processors = processors; } - @SuppressWarnings("unchecked") @Override public void process(Message message, WriteProxy writeProxy) { try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.sharedArray, message.offset + 4, message.length - 4))) { diff --git a/src/main/java/envoy/server/net/ObjectWriteProxy.java b/src/main/java/envoy/server/net/ObjectWriteProxy.java index 2ec9223..825daeb 100644 --- a/src/main/java/envoy/server/net/ObjectWriteProxy.java +++ b/src/main/java/envoy/server/net/ObjectWriteProxy.java @@ -22,7 +22,7 @@ public class ObjectWriteProxy { private final WriteProxy writeProxy; /** - * Creates an instance of @link{ObjectWriteProxy}. + * Creates an instance of {@link ObjectWriteProxy}. * * @param writeProxy the {@link WriteProxy} to write objects to another client * @since Envoy Server Standalone v0.1-alpha diff --git a/src/main/java/envoy/server/processors/LoginCredentialProcessor.java b/src/main/java/envoy/server/processors/LoginCredentialProcessor.java index 376989a..acd2b2e 100644 --- a/src/main/java/envoy/server/processors/LoginCredentialProcessor.java +++ b/src/main/java/envoy/server/processors/LoginCredentialProcessor.java @@ -10,6 +10,7 @@ import envoy.data.Contacts; import envoy.data.LoginCredentials; import envoy.data.Message.MessageStatus; import envoy.data.User; +import envoy.data.User.UserStatus; import envoy.server.ConnectionManager; import envoy.server.ObjectProcessor; import envoy.server.data.Message; @@ -29,31 +30,30 @@ import envoy.server.net.ObjectWriteProxy; */ public class LoginCredentialProcessor implements ObjectProcessor { - private PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager(); + private PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager(); @Override public Class getInputClass() { return LoginCredentials.class; } @Override public void process(LoginCredentials input, long socketId, ObjectWriteProxy writeProxy) throws IOException { + UserStatusChangeProcessor.setWriteProxy(writeProxy); System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId)); envoy.server.data.User user = getUser(input); // Not logged in successfully - if (user == null) { - return; - } - + if (user == null) return; ConnectionManager.getInstance().registerUser(user.getId(), socketId); + // notifies contacts of this users online-going and updates his status in the + // database + user.setStatus(UserStatus.ONLINE); + UserStatusChangeProcessor.updateUserStatus(user); + // Create contacts - List users = PersistenceManager.getPersistenceManager() - .getContacts(user) - .stream() - .map(envoy.server.data.User::toCommonUser) - .collect(Collectors.toList()); - Contacts contacts = new Contacts(user.getId(), users); + Contacts contacts = new Contacts(user.getId(), + user.getContacts().stream().map(envoy.server.data.User::toCommonUser).collect(Collectors.toList())); // Complete handshake System.out.println("Sending user..."); @@ -63,8 +63,8 @@ public class LoginCredentialProcessor implements ObjectProcessor pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(user); for (Message msg : pendingMessages) { - System.out.println("Sending message " + msg.toString()); - writeProxy.write(socketId, msg); + System.out.println("Sending message " + msg.toCommonMessage().toString()); + writeProxy.write(socketId, msg.toCommonMessage()); msg.setReceivedDate(new Date()); msg.setStatus(MessageStatus.RECEIVED); PersistenceManager.getPersistenceManager().updateMessage(msg); @@ -73,21 +73,20 @@ public class LoginCredentialProcessor implements ObjectProcessor { // Update the message status to RECEIVED message.setReceivedDate(new Date()); message.nextStatus(); + writeProxy.write(connectionManager.getSocketId(message.getSenderId()), new MessageStatusChangeEvent(message)); } catch (IOException e) { System.err.println("Recipient online. Failed to send message" + message.getId()); e.printStackTrace(); diff --git a/src/main/java/envoy/server/processors/MessageStatusChangeProcessor.java b/src/main/java/envoy/server/processors/MessageStatusChangeProcessor.java index 4eec2bc..216e028 100644 --- a/src/main/java/envoy/server/processors/MessageStatusChangeProcessor.java +++ b/src/main/java/envoy/server/processors/MessageStatusChangeProcessor.java @@ -29,8 +29,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor + *
+ * Project: envoy-server-standalone
+ * File: UserStatusChangeProcessor.java
+ * Created: 1 Feb 2020
+ * + * @author Leon Hofmeister + * @since Envoy Server Standalone v0.1-alpha + */ +public class UserStatusChangeProcessor implements ObjectProcessor { + + private static ObjectWriteProxy writeProxy; + private static PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager(); + + @Override + public Class getInputClass() { return UserStatusChangeEvent.class; } + + @Override + public void process(UserStatusChangeEvent input, long socketId, ObjectWriteProxy writeProxy) throws IOException { + // new status should not equal old status + if (input.get().equals(persistenceManager.getUserById(input.getId()).getStatus())) { + System.out.println("Received an unnecessary UserStatusChangeEvent"); + return; + } + updateUserStatus(input); + + } + + /** + * Sets the {@link UserStatus} for a given user. Both offline contacts and + * currently online contacts are notified. + * + * @param user the {@link UserStatusChangeEvent} that signals the change + * @since Envoy Server Standalone v0.1-alpha + */ + + public static void updateUserStatus(User user) { + // handling for newly logged in clients + persistenceManager.updateUser(user); + + // handling for contacts that are already online + notifyContacts(user); + } + + /** + * @param evt the {@link UserStatusChangeEvent} + * @since Envoy Server Standalone v0.1-alpha + */ + public static void updateUserStatus(UserStatusChangeEvent evt) { updateUserStatus(persistenceManager.getUserById(evt.getId())); } + + /** + * notifies active contacts of this {@link User} that his {@link UserStatus} has + * changed + * + * @param evt the {@link UserStatusChangeEvent} to send to other clients + * @param user the {@link User} + * @throws IOException if sending this update failed for any contact + * @since Envoy Server Standalone v0.1-alpha + */ + private static void notifyContacts(User user) { + UserStatusChangeEvent evt = new UserStatusChangeEvent(user.getId(), user.getStatus()); + ConnectionManager connectionManager = ConnectionManager.getInstance(); + try { + for (User contact : user.getContacts()) + if (connectionManager.isOnline(contact.getId())) writeProxy.write(connectionManager.getSocketId(contact.getId()), evt); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Could not notify online contacts of user " + evt.getId() + " that his status has been changed"); + } + } + + /** + * This method is only called by the LoginCredentialProcessor because every + * user needs to login (open a socket) before changing his status. + * Needed to ensure propagation of events because an uninitialized writeProxy + * would cause problems. + * + * @param writeProxy the writeProxy that is used to send objects back to clients + * @since Envoy Server Standalone v0.1-alpha + */ + public static void setWriteProxy(ObjectWriteProxy writeProxy) { UserStatusChangeProcessor.writeProxy = writeProxy; } + // TODO may cause an problem if two clients log in at the same time. + // Change Needed. +}