Merge pull request #20 from informatik-ag-ngl/f/user_status_change_event

Added UserStatusChangeProcessor
This commit is contained in:
delvh 2020-02-05 22:20:55 +01:00 committed by GitHub
commit 5ae067a305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 168 additions and 24 deletions

View File

@ -1,5 +1,6 @@
package envoy.server; package envoy.server;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -7,6 +8,10 @@ import java.util.Set;
import com.jenkov.nioserver.ISocketIdListener; import com.jenkov.nioserver.ISocketIdListener;
import envoy.data.User.UserStatus;
import envoy.server.database.PersistenceManager;
import envoy.server.processors.UserStatusChangeProcessor;
/** /**
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
* File: <strong>ConnectionManager.java</strong><br> * File: <strong>ConnectionManager.java</strong><br>
@ -44,9 +49,17 @@ public class ConnectionManager implements ISocketIdListener {
@Override @Override
public void socketCancelled(long socketId) { public void socketCancelled(long socketId) {
if (!pendingSockets.remove(socketId)) if (!pendingSockets.remove(socketId)) {
// notifying contacts of this users offline-going
envoy.server.data.User user = PersistenceManager.getPersistenceManager().getUserById(getUserIdBySocketId(socketId));
user.setStatus(UserStatus.OFFLINE);
user.setLastSeen(new Date());
UserStatusChangeProcessor.updateUserStatus(user);
// removing the socket
sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue())); sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue()));
} }
}
@Override @Override
public void socketRegistered(long socketId) { pendingSockets.add(socketId); } public void socketRegistered(long socketId) { pendingSockets.add(socketId); }
@ -64,12 +77,21 @@ public class ConnectionManager implements ISocketIdListener {
} }
/** /**
* @param userId the ID of the user registered at the a socket * @param userId the ID of the user registered at a socket
* @return the ID of the socket * @return the ID of the socket
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha
*/ */
public long getSocketId(long userId) { return sockets.get(userId); } public long getSocketId(long userId) { return sockets.get(userId); }
/**
* @param socketId the id of the socket whose User is needed
* @return the userId associated with this socketId
* @since Envoy Server Standalone v0.1-alpha
*/
public long getUserIdBySocketId(long socketId) {
return sockets.entrySet().stream().filter((entry) -> entry.getValue().equals(socketId)).findFirst().get().getKey();
}
/** /**
* @param userId the ID of the user to check for * @param userId the ID of the user to check for
* @return {@code true} if the user is online * @return {@code true} if the user is online

View File

@ -36,6 +36,7 @@ public class Startup {
processors.add(new LoginCredentialProcessor()); processors.add(new LoginCredentialProcessor());
processors.add(new MessageProcessor()); processors.add(new MessageProcessor());
processors.add(new MessageStatusChangeProcessor()); processors.add(new MessageStatusChangeProcessor());
processors.add(new UserStatusChangeProcessor());
processors.add(new IdGeneratorRequestProcessor()); processors.add(new IdGeneratorRequestProcessor());
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors)); Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));

View File

@ -108,6 +108,30 @@ public class PersistenceManager {
entityManager.getTransaction().commit(); entityManager.getTransaction().commit();
} }
/**
* Deletes a {@link User} in the database.
*
* @param user the {@link User} to delete
* @since Envoy Server Standalone v0.1-alpha
*/
public void deleteUser(User user) {
entityManager.getTransaction().begin();
entityManager.remove(user);
entityManager.getTransaction().commit();
}
/**
* Deletes a {@link Message} in the database.
*
* @param message the {@link Message} to delete
* @since Envoy Server Standalone v0.1-alpha
*/
public void deleteMessage(Message message) {
entityManager.getTransaction().begin();
entityManager.remove(message);
entityManager.getTransaction().commit();
}
/** /**
* Searches for a {@link User} with a specific id. * Searches for a {@link User} with a specific id.
* *

View File

@ -33,7 +33,6 @@ public class ObjectMessageProcessor implements IMessageProcessor {
*/ */
public ObjectMessageProcessor(Set<ObjectProcessor<?>> processors) { this.processors = processors; } public ObjectMessageProcessor(Set<ObjectProcessor<?>> processors) { this.processors = processors; }
@SuppressWarnings("unchecked")
@Override @Override
public void process(Message message, WriteProxy writeProxy) { public void process(Message message, WriteProxy writeProxy) {
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.sharedArray, message.offset + 4, message.length - 4))) { try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.sharedArray, message.offset + 4, message.length - 4))) {

View File

@ -22,7 +22,7 @@ public class ObjectWriteProxy {
private final WriteProxy writeProxy; private final WriteProxy writeProxy;
/** /**
* Creates an instance of @link{ObjectWriteProxy}. * Creates an instance of {@link ObjectWriteProxy}.
* *
* @param writeProxy the {@link WriteProxy} to write objects to another client * @param writeProxy the {@link WriteProxy} to write objects to another client
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha

View File

@ -10,6 +10,7 @@ import envoy.data.Contacts;
import envoy.data.LoginCredentials; import envoy.data.LoginCredentials;
import envoy.data.Message.MessageStatus; import envoy.data.Message.MessageStatus;
import envoy.data.User; import envoy.data.User;
import envoy.data.User.UserStatus;
import envoy.server.ConnectionManager; import envoy.server.ConnectionManager;
import envoy.server.ObjectProcessor; import envoy.server.ObjectProcessor;
import envoy.server.data.Message; import envoy.server.data.Message;
@ -36,24 +37,23 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
@Override @Override
public void process(LoginCredentials input, long socketId, ObjectWriteProxy writeProxy) throws IOException { public void process(LoginCredentials input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
UserStatusChangeProcessor.setWriteProxy(writeProxy);
System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId)); System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId));
envoy.server.data.User user = getUser(input); envoy.server.data.User user = getUser(input);
// Not logged in successfully // Not logged in successfully
if (user == null) { if (user == null) return;
return;
}
ConnectionManager.getInstance().registerUser(user.getId(), socketId); ConnectionManager.getInstance().registerUser(user.getId(), socketId);
// notifies contacts of this users online-going and updates his status in the
// database
user.setStatus(UserStatus.ONLINE);
UserStatusChangeProcessor.updateUserStatus(user);
// Create contacts // Create contacts
List<User> users = PersistenceManager.getPersistenceManager() Contacts contacts = new Contacts(user.getId(),
.getContacts(user) user.getContacts().stream().map(envoy.server.data.User::toCommonUser).collect(Collectors.toList()));
.stream()
.map(envoy.server.data.User::toCommonUser)
.collect(Collectors.toList());
Contacts contacts = new Contacts(user.getId(), users);
// Complete handshake // Complete handshake
System.out.println("Sending user..."); System.out.println("Sending user...");
@ -63,8 +63,8 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
System.out.println("Acquiring pending messages for the client..."); System.out.println("Acquiring pending messages for the client...");
List<Message> pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(user); List<Message> pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(user);
for (Message msg : pendingMessages) { for (Message msg : pendingMessages) {
System.out.println("Sending message " + msg.toString()); System.out.println("Sending message " + msg.toCommonMessage().toString());
writeProxy.write(socketId, msg); writeProxy.write(socketId, msg.toCommonMessage());
msg.setReceivedDate(new Date()); msg.setReceivedDate(new Date());
msg.setStatus(MessageStatus.RECEIVED); msg.setStatus(MessageStatus.RECEIVED);
PersistenceManager.getPersistenceManager().updateMessage(msg); PersistenceManager.getPersistenceManager().updateMessage(msg);
@ -80,15 +80,14 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
user.setLastSeen(new Date()); user.setLastSeen(new Date());
user.setStatus(User.UserStatus.ONLINE); user.setStatus(User.UserStatus.ONLINE);
user.setPasswordHash(credentials.getPasswordHash()); user.setPasswordHash(credentials.getPasswordHash());
user.setContacts(PersistenceManager.getPersistenceManager().getContacts(user));
persistenceManager.addUser(user); persistenceManager.addUser(user);
} else { } else {
user = persistenceManager.getUserByName(credentials.getName()); user = persistenceManager.getUserByName(credentials.getName());
// TODO: Implement error when user does not exist // TODO: Implement error when user does not exist
if (!Arrays.equals(credentials.getPasswordHash(), user.getPasswordHash())) { if (!Arrays.equals(credentials.getPasswordHash(), user.getPasswordHash())) // TODO: Wrong Password Response
// TODO: Wrong Password Response
return null; return null;
} }
}
return user; return user;
} }
} }

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.util.Date; import java.util.Date;
import envoy.data.Message; import envoy.data.Message;
import envoy.event.MessageStatusChangeEvent;
import envoy.server.ConnectionManager; import envoy.server.ConnectionManager;
import envoy.server.ObjectProcessor; import envoy.server.ObjectProcessor;
import envoy.server.database.PersistenceManager; import envoy.server.database.PersistenceManager;
@ -36,6 +37,7 @@ public class MessageProcessor implements ObjectProcessor<Message> {
// Update the message status to RECEIVED // Update the message status to RECEIVED
message.setReceivedDate(new Date()); message.setReceivedDate(new Date());
message.nextStatus(); message.nextStatus();
writeProxy.write(connectionManager.getSocketId(message.getSenderId()), new MessageStatusChangeEvent(message));
} catch (IOException e) { } catch (IOException e) {
System.err.println("Recipient online. Failed to send message" + message.getId()); System.err.println("Recipient online. Failed to send message" + message.getId());
e.printStackTrace(); e.printStackTrace();

View File

@ -29,8 +29,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStat
// any other status than read is not supposed to be sent to the server // any other status than read is not supposed to be sent to the server
if (input.get() != MessageStatus.READ) throw new EnvoyException("Message" + input.getId() + "has an invalid status"); if (input.get() != MessageStatus.READ) throw new EnvoyException("Message" + input.getId() + "has an invalid status");
} catch (EnvoyException e) { } catch (EnvoyException e) {
e.printStackTrace(); throw new IOException(e);
return;
} }
ConnectionManager conMan = ConnectionManager.getInstance(); ConnectionManager conMan = ConnectionManager.getInstance();
PersistenceManager perMan = PersistenceManager.getPersistenceManager(); PersistenceManager perMan = PersistenceManager.getPersistenceManager();
@ -40,6 +39,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStat
msg.setReadDate(input.getDate()); msg.setReadDate(input.getDate());
perMan.updateMessage(msg); perMan.updateMessage(msg);
if (conMan.isOnline(msg.getRecipient().getId())) writeProxy.write(conMan.getSocketId(msg.getRecipient().getId()), input); // Notifies the sender of the message about the status-update to READ
if (conMan.isOnline(msg.getSender().getId())) writeProxy.write(conMan.getSocketId(msg.getSender().getId()), input);
} }
} }

View File

@ -0,0 +1,97 @@
package envoy.server.processors;
import java.io.IOException;
import envoy.data.User.UserStatus;
import envoy.event.UserStatusChangeEvent;
import envoy.server.ConnectionManager;
import envoy.server.ObjectProcessor;
import envoy.server.data.User;
import envoy.server.database.PersistenceManager;
import envoy.server.net.ObjectWriteProxy;
/**
* This processor handles incoming {@link UserStatusChangeEvent}.<br>
* <br>
* Project: <strong>envoy-server-standalone</strong><br>
* File: <strong>UserStatusChangeProcessor.java</strong><br>
* Created: <strong>1 Feb 2020</strong><br>
*
* @author Leon Hofmeister
* @since Envoy Server Standalone v0.1-alpha
*/
public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChangeEvent> {
private static ObjectWriteProxy writeProxy;
private static PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager();
@Override
public Class<UserStatusChangeEvent> getInputClass() { return UserStatusChangeEvent.class; }
@Override
public void process(UserStatusChangeEvent input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
// new status should not equal old status
if (input.get().equals(persistenceManager.getUserById(input.getId()).getStatus())) {
System.out.println("Received an unnecessary UserStatusChangeEvent");
return;
}
updateUserStatus(input);
}
/**
* Sets the {@link UserStatus} for a given user. Both offline contacts and
* currently online contacts are notified.
*
* @param user the {@link UserStatusChangeEvent} that signals the change
* @since Envoy Server Standalone v0.1-alpha
*/
public static void updateUserStatus(User user) {
// handling for newly logged in clients
persistenceManager.updateUser(user);
// handling for contacts that are already online
notifyContacts(user);
}
/**
* @param evt the {@link UserStatusChangeEvent}
* @since Envoy Server Standalone v0.1-alpha
*/
public static void updateUserStatus(UserStatusChangeEvent evt) { updateUserStatus(persistenceManager.getUserById(evt.getId())); }
/**
* notifies active contacts of this {@link User} that his {@link UserStatus} has
* changed
*
* @param evt the {@link UserStatusChangeEvent} to send to other clients
* @param user the {@link User}
* @throws IOException if sending this update failed for any contact
* @since Envoy Server Standalone v0.1-alpha
*/
private static void notifyContacts(User user) {
UserStatusChangeEvent evt = new UserStatusChangeEvent(user.getId(), user.getStatus());
ConnectionManager connectionManager = ConnectionManager.getInstance();
try {
for (User contact : user.getContacts())
if (connectionManager.isOnline(contact.getId())) writeProxy.write(connectionManager.getSocketId(contact.getId()), evt);
} catch (IOException e) {
e.printStackTrace();
System.err.println("Could not notify online contacts of user " + evt.getId() + " that his status has been changed");
}
}
/**
* This method is only called by the LoginCredentialProcessor because every
* user needs to login (open a socket) before changing his status.
* Needed to ensure propagation of events because an uninitialized writeProxy
* would cause problems.
*
* @param writeProxy the writeProxy that is used to send objects back to clients
* @since Envoy Server Standalone v0.1-alpha
*/
public static void setWriteProxy(ObjectWriteProxy writeProxy) { UserStatusChangeProcessor.writeProxy = writeProxy; }
// TODO may cause an problem if two clients log in at the same time.
// Change Needed.
}