Merge pull request #20 from informatik-ag-ngl/f/user_status_change_event
Added UserStatusChangeProcessor
This commit is contained in:
commit
f2ac9bbf8e
@ -1,5 +1,6 @@
|
|||||||
package envoy.server;
|
package envoy.server;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -7,6 +8,10 @@ import java.util.Set;
|
|||||||
|
|
||||||
import com.jenkov.nioserver.ISocketIdListener;
|
import com.jenkov.nioserver.ISocketIdListener;
|
||||||
|
|
||||||
|
import envoy.data.User.UserStatus;
|
||||||
|
import envoy.server.database.PersistenceManager;
|
||||||
|
import envoy.server.processors.UserStatusChangeProcessor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Project: <strong>envoy-server-standalone</strong><br>
|
* Project: <strong>envoy-server-standalone</strong><br>
|
||||||
* File: <strong>ConnectionManager.java</strong><br>
|
* File: <strong>ConnectionManager.java</strong><br>
|
||||||
@ -44,8 +49,16 @@ public class ConnectionManager implements ISocketIdListener {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void socketCancelled(long socketId) {
|
public void socketCancelled(long socketId) {
|
||||||
if (!pendingSockets.remove(socketId))
|
if (!pendingSockets.remove(socketId)) {
|
||||||
|
// notifying contacts of this users offline-going
|
||||||
|
envoy.server.data.User user = PersistenceManager.getPersistenceManager().getUserById(getUserIdBySocketId(socketId));
|
||||||
|
user.setStatus(UserStatus.OFFLINE);
|
||||||
|
user.setLastSeen(new Date());
|
||||||
|
UserStatusChangeProcessor.updateUserStatus(user);
|
||||||
|
|
||||||
|
// removing the socket
|
||||||
sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue()));
|
sockets.entrySet().stream().filter(e -> e.getValue() == socketId).forEach(e -> sockets.remove(e.getValue()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -64,12 +77,21 @@ public class ConnectionManager implements ISocketIdListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param userId the ID of the user registered at the a socket
|
* @param userId the ID of the user registered at a socket
|
||||||
* @return the ID of the socket
|
* @return the ID of the socket
|
||||||
* @since Envoy Server Standalone v0.1-alpha
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
*/
|
*/
|
||||||
public long getSocketId(long userId) { return sockets.get(userId); }
|
public long getSocketId(long userId) { return sockets.get(userId); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param socketId the id of the socket whose User is needed
|
||||||
|
* @return the userId associated with this socketId
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public long getUserIdBySocketId(long socketId) {
|
||||||
|
return sockets.entrySet().stream().filter((entry) -> entry.getValue().equals(socketId)).findFirst().get().getKey();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param userId the ID of the user to check for
|
* @param userId the ID of the user to check for
|
||||||
* @return {@code true} if the user is online
|
* @return {@code true} if the user is online
|
||||||
|
@ -36,6 +36,7 @@ public class Startup {
|
|||||||
processors.add(new LoginCredentialProcessor());
|
processors.add(new LoginCredentialProcessor());
|
||||||
processors.add(new MessageProcessor());
|
processors.add(new MessageProcessor());
|
||||||
processors.add(new MessageStatusChangeProcessor());
|
processors.add(new MessageStatusChangeProcessor());
|
||||||
|
processors.add(new UserStatusChangeProcessor());
|
||||||
processors.add(new IdGeneratorRequestProcessor());
|
processors.add(new IdGeneratorRequestProcessor());
|
||||||
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
|
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
|
||||||
|
|
||||||
|
@ -108,6 +108,30 @@ public class PersistenceManager {
|
|||||||
entityManager.getTransaction().commit();
|
entityManager.getTransaction().commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes a {@link User} in the database.
|
||||||
|
*
|
||||||
|
* @param user the {@link User} to delete
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public void deleteUser(User user) {
|
||||||
|
entityManager.getTransaction().begin();
|
||||||
|
entityManager.remove(user);
|
||||||
|
entityManager.getTransaction().commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes a {@link Message} in the database.
|
||||||
|
*
|
||||||
|
* @param message the {@link Message} to delete
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public void deleteMessage(Message message) {
|
||||||
|
entityManager.getTransaction().begin();
|
||||||
|
entityManager.remove(message);
|
||||||
|
entityManager.getTransaction().commit();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Searches for a {@link User} with a specific id.
|
* Searches for a {@link User} with a specific id.
|
||||||
*
|
*
|
||||||
|
@ -33,7 +33,6 @@ public class ObjectMessageProcessor implements IMessageProcessor {
|
|||||||
*/
|
*/
|
||||||
public ObjectMessageProcessor(Set<ObjectProcessor<?>> processors) { this.processors = processors; }
|
public ObjectMessageProcessor(Set<ObjectProcessor<?>> processors) { this.processors = processors; }
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
@Override
|
||||||
public void process(Message message, WriteProxy writeProxy) {
|
public void process(Message message, WriteProxy writeProxy) {
|
||||||
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.sharedArray, message.offset + 4, message.length - 4))) {
|
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.sharedArray, message.offset + 4, message.length - 4))) {
|
||||||
|
@ -22,7 +22,7 @@ public class ObjectWriteProxy {
|
|||||||
private final WriteProxy writeProxy;
|
private final WriteProxy writeProxy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an instance of @link{ObjectWriteProxy}.
|
* Creates an instance of {@link ObjectWriteProxy}.
|
||||||
*
|
*
|
||||||
* @param writeProxy the {@link WriteProxy} to write objects to another client
|
* @param writeProxy the {@link WriteProxy} to write objects to another client
|
||||||
* @since Envoy Server Standalone v0.1-alpha
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
@ -10,6 +10,7 @@ import envoy.data.Contacts;
|
|||||||
import envoy.data.LoginCredentials;
|
import envoy.data.LoginCredentials;
|
||||||
import envoy.data.Message.MessageStatus;
|
import envoy.data.Message.MessageStatus;
|
||||||
import envoy.data.User;
|
import envoy.data.User;
|
||||||
|
import envoy.data.User.UserStatus;
|
||||||
import envoy.server.ConnectionManager;
|
import envoy.server.ConnectionManager;
|
||||||
import envoy.server.ObjectProcessor;
|
import envoy.server.ObjectProcessor;
|
||||||
import envoy.server.data.Message;
|
import envoy.server.data.Message;
|
||||||
@ -29,31 +30,30 @@ import envoy.server.net.ObjectWriteProxy;
|
|||||||
*/
|
*/
|
||||||
public class LoginCredentialProcessor implements ObjectProcessor<LoginCredentials> {
|
public class LoginCredentialProcessor implements ObjectProcessor<LoginCredentials> {
|
||||||
|
|
||||||
private PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager();
|
private PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
|
public Class<LoginCredentials> getInputClass() { return LoginCredentials.class; }
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(LoginCredentials input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
|
public void process(LoginCredentials input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
|
||||||
|
UserStatusChangeProcessor.setWriteProxy(writeProxy);
|
||||||
System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId));
|
System.out.println(String.format("Received login credentials %s from socket ID %d", input, socketId));
|
||||||
|
|
||||||
envoy.server.data.User user = getUser(input);
|
envoy.server.data.User user = getUser(input);
|
||||||
|
|
||||||
// Not logged in successfully
|
// Not logged in successfully
|
||||||
if (user == null) {
|
if (user == null) return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnectionManager.getInstance().registerUser(user.getId(), socketId);
|
ConnectionManager.getInstance().registerUser(user.getId(), socketId);
|
||||||
|
|
||||||
|
// notifies contacts of this users online-going and updates his status in the
|
||||||
|
// database
|
||||||
|
user.setStatus(UserStatus.ONLINE);
|
||||||
|
UserStatusChangeProcessor.updateUserStatus(user);
|
||||||
|
|
||||||
// Create contacts
|
// Create contacts
|
||||||
List<User> users = PersistenceManager.getPersistenceManager()
|
Contacts contacts = new Contacts(user.getId(),
|
||||||
.getContacts(user)
|
user.getContacts().stream().map(envoy.server.data.User::toCommonUser).collect(Collectors.toList()));
|
||||||
.stream()
|
|
||||||
.map(envoy.server.data.User::toCommonUser)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
Contacts contacts = new Contacts(user.getId(), users);
|
|
||||||
|
|
||||||
// Complete handshake
|
// Complete handshake
|
||||||
System.out.println("Sending user...");
|
System.out.println("Sending user...");
|
||||||
@ -63,8 +63,8 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
|
|||||||
System.out.println("Acquiring pending messages for the client...");
|
System.out.println("Acquiring pending messages for the client...");
|
||||||
List<Message> pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(user);
|
List<Message> pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(user);
|
||||||
for (Message msg : pendingMessages) {
|
for (Message msg : pendingMessages) {
|
||||||
System.out.println("Sending message " + msg.toString());
|
System.out.println("Sending message " + msg.toCommonMessage().toString());
|
||||||
writeProxy.write(socketId, msg);
|
writeProxy.write(socketId, msg.toCommonMessage());
|
||||||
msg.setReceivedDate(new Date());
|
msg.setReceivedDate(new Date());
|
||||||
msg.setStatus(MessageStatus.RECEIVED);
|
msg.setStatus(MessageStatus.RECEIVED);
|
||||||
PersistenceManager.getPersistenceManager().updateMessage(msg);
|
PersistenceManager.getPersistenceManager().updateMessage(msg);
|
||||||
@ -73,21 +73,20 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
|
|||||||
|
|
||||||
private envoy.server.data.User getUser(LoginCredentials credentials) {
|
private envoy.server.data.User getUser(LoginCredentials credentials) {
|
||||||
envoy.server.data.User user;
|
envoy.server.data.User user;
|
||||||
|
|
||||||
if (credentials.isRegistration()) {
|
if (credentials.isRegistration()) {
|
||||||
user = new envoy.server.data.User();
|
user = new envoy.server.data.User();
|
||||||
user.setName(credentials.getName());
|
user.setName(credentials.getName());
|
||||||
user.setLastSeen(new Date());
|
user.setLastSeen(new Date());
|
||||||
user.setStatus(User.UserStatus.ONLINE);
|
user.setStatus(User.UserStatus.ONLINE);
|
||||||
user.setPasswordHash(credentials.getPasswordHash());
|
user.setPasswordHash(credentials.getPasswordHash());
|
||||||
|
user.setContacts(PersistenceManager.getPersistenceManager().getContacts(user));
|
||||||
persistenceManager.addUser(user);
|
persistenceManager.addUser(user);
|
||||||
} else {
|
} else {
|
||||||
user = persistenceManager.getUserByName(credentials.getName());
|
user = persistenceManager.getUserByName(credentials.getName());
|
||||||
// TODO: Implement error when user does not exist
|
// TODO: Implement error when user does not exist
|
||||||
if (!Arrays.equals(credentials.getPasswordHash(), user.getPasswordHash())) {
|
if (!Arrays.equals(credentials.getPasswordHash(), user.getPasswordHash())) // TODO: Wrong Password Response
|
||||||
// TODO: Wrong Password Response
|
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import java.io.IOException;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
import envoy.data.Message;
|
import envoy.data.Message;
|
||||||
|
import envoy.event.MessageStatusChangeEvent;
|
||||||
import envoy.server.ConnectionManager;
|
import envoy.server.ConnectionManager;
|
||||||
import envoy.server.ObjectProcessor;
|
import envoy.server.ObjectProcessor;
|
||||||
import envoy.server.database.PersistenceManager;
|
import envoy.server.database.PersistenceManager;
|
||||||
@ -36,6 +37,7 @@ public class MessageProcessor implements ObjectProcessor<Message> {
|
|||||||
// Update the message status to RECEIVED
|
// Update the message status to RECEIVED
|
||||||
message.setReceivedDate(new Date());
|
message.setReceivedDate(new Date());
|
||||||
message.nextStatus();
|
message.nextStatus();
|
||||||
|
writeProxy.write(connectionManager.getSocketId(message.getSenderId()), new MessageStatusChangeEvent(message));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.err.println("Recipient online. Failed to send message" + message.getId());
|
System.err.println("Recipient online. Failed to send message" + message.getId());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -29,8 +29,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStat
|
|||||||
// any other status than read is not supposed to be sent to the server
|
// any other status than read is not supposed to be sent to the server
|
||||||
if (input.get() != MessageStatus.READ) throw new EnvoyException("Message" + input.getId() + "has an invalid status");
|
if (input.get() != MessageStatus.READ) throw new EnvoyException("Message" + input.getId() + "has an invalid status");
|
||||||
} catch (EnvoyException e) {
|
} catch (EnvoyException e) {
|
||||||
e.printStackTrace();
|
throw new IOException(e);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
ConnectionManager conMan = ConnectionManager.getInstance();
|
ConnectionManager conMan = ConnectionManager.getInstance();
|
||||||
PersistenceManager perMan = PersistenceManager.getPersistenceManager();
|
PersistenceManager perMan = PersistenceManager.getPersistenceManager();
|
||||||
@ -40,6 +39,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStat
|
|||||||
msg.setReadDate(input.getDate());
|
msg.setReadDate(input.getDate());
|
||||||
perMan.updateMessage(msg);
|
perMan.updateMessage(msg);
|
||||||
|
|
||||||
if (conMan.isOnline(msg.getRecipient().getId())) writeProxy.write(conMan.getSocketId(msg.getRecipient().getId()), input);
|
// Notifies the sender of the message about the status-update to READ
|
||||||
|
if (conMan.isOnline(msg.getSender().getId())) writeProxy.write(conMan.getSocketId(msg.getSender().getId()), input);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,97 @@
|
|||||||
|
package envoy.server.processors;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import envoy.data.User.UserStatus;
|
||||||
|
import envoy.event.UserStatusChangeEvent;
|
||||||
|
import envoy.server.ConnectionManager;
|
||||||
|
import envoy.server.ObjectProcessor;
|
||||||
|
import envoy.server.data.User;
|
||||||
|
import envoy.server.database.PersistenceManager;
|
||||||
|
import envoy.server.net.ObjectWriteProxy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This processor handles incoming {@link UserStatusChangeEvent}.<br>
|
||||||
|
* <br>
|
||||||
|
* Project: <strong>envoy-server-standalone</strong><br>
|
||||||
|
* File: <strong>UserStatusChangeProcessor.java</strong><br>
|
||||||
|
* Created: <strong>1 Feb 2020</strong><br>
|
||||||
|
*
|
||||||
|
* @author Leon Hofmeister
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChangeEvent> {
|
||||||
|
|
||||||
|
private static ObjectWriteProxy writeProxy;
|
||||||
|
private static PersistenceManager persistenceManager = PersistenceManager.getPersistenceManager();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<UserStatusChangeEvent> getInputClass() { return UserStatusChangeEvent.class; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(UserStatusChangeEvent input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
|
||||||
|
// new status should not equal old status
|
||||||
|
if (input.get().equals(persistenceManager.getUserById(input.getId()).getStatus())) {
|
||||||
|
System.out.println("Received an unnecessary UserStatusChangeEvent");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
updateUserStatus(input);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the {@link UserStatus} for a given user. Both offline contacts and
|
||||||
|
* currently online contacts are notified.
|
||||||
|
*
|
||||||
|
* @param user the {@link UserStatusChangeEvent} that signals the change
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
|
||||||
|
public static void updateUserStatus(User user) {
|
||||||
|
// handling for newly logged in clients
|
||||||
|
persistenceManager.updateUser(user);
|
||||||
|
|
||||||
|
// handling for contacts that are already online
|
||||||
|
notifyContacts(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param evt the {@link UserStatusChangeEvent}
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public static void updateUserStatus(UserStatusChangeEvent evt) { updateUserStatus(persistenceManager.getUserById(evt.getId())); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* notifies active contacts of this {@link User} that his {@link UserStatus} has
|
||||||
|
* changed
|
||||||
|
*
|
||||||
|
* @param evt the {@link UserStatusChangeEvent} to send to other clients
|
||||||
|
* @param user the {@link User}
|
||||||
|
* @throws IOException if sending this update failed for any contact
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
private static void notifyContacts(User user) {
|
||||||
|
UserStatusChangeEvent evt = new UserStatusChangeEvent(user.getId(), user.getStatus());
|
||||||
|
ConnectionManager connectionManager = ConnectionManager.getInstance();
|
||||||
|
try {
|
||||||
|
for (User contact : user.getContacts())
|
||||||
|
if (connectionManager.isOnline(contact.getId())) writeProxy.write(connectionManager.getSocketId(contact.getId()), evt);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.err.println("Could not notify online contacts of user " + evt.getId() + " that his status has been changed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is only called by the LoginCredentialProcessor because every
|
||||||
|
* user needs to login (open a socket) before changing his status.
|
||||||
|
* Needed to ensure propagation of events because an uninitialized writeProxy
|
||||||
|
* would cause problems.
|
||||||
|
*
|
||||||
|
* @param writeProxy the writeProxy that is used to send objects back to clients
|
||||||
|
* @since Envoy Server Standalone v0.1-alpha
|
||||||
|
*/
|
||||||
|
public static void setWriteProxy(ObjectWriteProxy writeProxy) { UserStatusChangeProcessor.writeProxy = writeProxy; }
|
||||||
|
// TODO may cause an problem if two clients log in at the same time.
|
||||||
|
// Change Needed.
|
||||||
|
}
|
Reference in New Issue
Block a user