Active Code Review

This commit is contained in:
Kai S. K. Engelbart 2020-07-06 11:40:13 +02:00
parent e33bc726ac
commit 5374296e6c
14 changed files with 195 additions and 230 deletions

View File

@ -1,6 +1,6 @@
package envoy.server.data; package envoy.server.data;
import java.util.Date; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -25,14 +25,20 @@ import envoy.data.Group;
) )
public class GroupMessage extends Message { public class GroupMessage extends Message {
/**
* Named query retrieving pending group messages sent to a group containing a
* specific user (parameter {@code userId}) that were sent after a certain time
* stamp (parameter {@code :lastSeen}).
*
* @since Envoy Server Standalone v0.1-beta
*/
public static final String getPendingGroupMsg = "GroupMessage.getPendingGroupMsg"; public static final String getPendingGroupMsg = "GroupMessage.getPendingGroupMsg";
@ElementCollection @ElementCollection
private Map<Long, envoy.data.Message.MessageStatus> memberMessageStatus; private Map<Long, envoy.data.Message.MessageStatus> memberMessageStatus;
@Column(name = "last_status_change_date") @Column(name = "last_status_change_date")
@Temporal(TemporalType.TIMESTAMP) protected LocalDateTime lastStatusChangeDate;
protected Date lastStatusChangeDate;
/** /**
* The constructor for a database object. * The constructor for a database object.
@ -47,10 +53,10 @@ public class GroupMessage extends Message {
* @param groupMessage the {@link envoy.data.GroupMessage} to convert * @param groupMessage the {@link envoy.data.GroupMessage} to convert
* into a * into a
* database {@link GroupMessage} * database {@link GroupMessage}
* @param lastStatusChangeDate the {@link Date} to set * @param lastStatusChangeDate the time stamp to set
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public GroupMessage(envoy.data.GroupMessage groupMessage, Date lastStatusChangeDate) { public GroupMessage(envoy.data.GroupMessage groupMessage, LocalDateTime lastStatusChangeDate) {
super(groupMessage); super(groupMessage);
memberMessageStatus = groupMessage.getMemberStatuses(); memberMessageStatus = groupMessage.getMemberStatuses();
this.lastStatusChangeDate = lastStatusChangeDate; this.lastStatusChangeDate = lastStatusChangeDate;
@ -87,11 +93,11 @@ public class GroupMessage extends Message {
* @return the date at which one of the member statuses changed last * @return the date at which one of the member statuses changed last
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public Date getLastStatusChangeDate() { return lastStatusChangeDate; } public LocalDateTime getLastStatusChangeDate() { return lastStatusChangeDate; }
/** /**
* @param date the date to set * @param date the date to set
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public void setLastStatusChangeDate(Date date) { lastStatusChangeDate = date; } public void setLastStatusChangeDate(LocalDateTime date) { lastStatusChangeDate = date; }
} }

View File

@ -50,7 +50,7 @@ public class ConnectionManager implements ISocketIdListener {
public void socketCancelled(long socketID) { public void socketCancelled(long socketID) {
if (!pendingSockets.remove(socketID)) { if (!pendingSockets.remove(socketID)) {
// Notify contacts of this users offline-going // Notify contacts of this users offline-going
envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIdBySocketID(socketID)); envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIDBySocketID(socketID));
user.setStatus(UserStatus.OFFLINE); user.setStatus(UserStatus.OFFLINE);
user.setLastSeen(LocalDateTime.now()); user.setLastSeen(LocalDateTime.now());
UserStatusChangeProcessor.updateUserStatus(user); UserStatusChangeProcessor.updateUserStatus(user);
@ -87,7 +87,7 @@ public class ConnectionManager implements ISocketIdListener {
* @return the userId associated with this socketId * @return the userId associated with this socketId
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha
*/ */
public long getUserIdBySocketID(long socketID) { public long getUserIDBySocketID(long socketID) {
return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey(); return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey();
} }

View File

@ -1,11 +1,14 @@
package envoy.server.net; package envoy.server.net;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.Stream;
import com.jenkov.nioserver.Message; import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.WriteProxy; import com.jenkov.nioserver.WriteProxy;
import envoy.server.data.Contact;
import envoy.util.EnvoyLog; import envoy.util.EnvoyLog;
import envoy.util.SerializationUtils; import envoy.util.SerializationUtils;
@ -23,6 +26,7 @@ public class ObjectWriteProxy {
private final WriteProxy writeProxy; private final WriteProxy writeProxy;
private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(ObjectWriteProxy.class); private static final Logger logger = EnvoyLog.getLogger(ObjectWriteProxy.class);
/** /**
@ -36,15 +40,18 @@ public class ObjectWriteProxy {
/** /**
* @param recipientSocketID the socket id of the recipient * @param recipientSocketID the socket id of the recipient
* @param obj the object to return to the client * @param obj the object to return to the client
* @throws IOException if the serialization of the object failed * @throws RuntimeException if the serialization of the object failed (this is
* highly unlikely)
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha
*/ */
public void write(long recipientSocketID, Object obj) throws IOException { public void write(long recipientSocketID, Object obj) {
// Create message targeted at the client // Create message targeted at the client
final Message response = writeProxy.getMessage(); final Message response = writeProxy.getMessage();
response.socketId = recipientSocketID; response.socketId = recipientSocketID;
logger.fine("Sending " + obj); logger.fine("Sending " + obj);
try {
// Serialize object to byte array // Serialize object to byte array
final byte[] objBytes = SerializationUtils.writeToByteArray(obj); final byte[] objBytes = SerializationUtils.writeToByteArray(obj);
@ -54,5 +61,28 @@ public class ObjectWriteProxy {
response.writeToMessage(objLen); response.writeToMessage(objLen);
response.writeToMessage(objBytes); response.writeToMessage(objBytes);
writeProxy.enqueue(response); writeProxy.enqueue(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Sends an object to all contact in a set that are online.
*
* @param contacts the contacts to send the object to
* @param message the object to send
* @since Envoy Server Standalone v0.1-beta
*/
public void writeToOnlineContacts(Set<? extends Contact> contacts, Object message) { writeToOnlineContacts(contacts.stream(), message); }
/**
* Sends an object to all contact in a set that are online.
*
* @param contacts the contacts to send the object to
* @param message the object to send
* @since Envoy Server Standalone v0.1-beta
*/
public void writeToOnlineContacts(Stream<? extends Contact> contacts, Object message) {
contacts.map(Contact::getID).filter(connectionManager::isOnline).map(connectionManager::getSocketID).forEach(id -> write(id, message));
} }
} }

View File

@ -1,6 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import envoy.event.ElementOperation; import envoy.event.ElementOperation;
@ -24,10 +23,10 @@ public class ContactOperationProcessor implements ObjectProcessor<ContactOperati
private static final Logger logger = EnvoyLog.getLogger(ContactOperationProcessor.class); private static final Logger logger = EnvoyLog.getLogger(ContactOperationProcessor.class);
@Override @Override
public void process(ContactOperation evt, long socketId, ObjectWriteProxy writeProxy) throws IOException { public void process(ContactOperation evt, long socketId, ObjectWriteProxy writeProxy) {
switch (evt.getOperationType()) { switch (evt.getOperationType()) {
case ADD: case ADD:
final long userID = ConnectionManager.getInstance().getUserIdBySocketID(socketId); final long userID = ConnectionManager.getInstance().getUserIDBySocketID(socketId);
final long contactId = evt.get().getID(); final long contactId = evt.get().getID();
logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID)); logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID));
@ -39,7 +38,7 @@ public class ContactOperationProcessor implements ObjectProcessor<ContactOperati
new ContactOperation(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD)); new ContactOperation(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD));
break; break;
default: default:
logger.warning(String.format("Received %s with an unsupported operation.%n", evt)); logger.warning(String.format("Received %s with an unsupported operation.", evt));
} }
} }

View File

@ -33,7 +33,7 @@ public class ContactSearchProcessor implements ObjectProcessor<ContactSearchRequ
public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException {
writeProxy.write(socketID, writeProxy.write(socketID,
new ContactSearchResult(PersistenceManager.getInstance() new ContactSearchResult(PersistenceManager.getInstance()
.searchUsers(request.get(), ConnectionManager.getInstance().getUserIdBySocketID(socketID)) .searchUsers(request.get(), ConnectionManager.getInstance().getUserIDBySocketID(socketID))
.stream() .stream()
.map(User::toCommon) .map(User::toCommon)
.collect(Collectors.toList()))); .collect(Collectors.toList())));

View File

@ -1,6 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import envoy.event.ElementOperation; import envoy.event.ElementOperation;
@ -25,27 +24,21 @@ public class GroupCreationProcessor implements ObjectProcessor<GroupCreation> {
private final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final ConnectionManager connectionManager = ConnectionManager.getInstance();
@Override @Override
public void process(GroupCreation groupCreation, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(GroupCreation groupCreation, long socketID, ObjectWriteProxy writeProxy) {
envoy.server.data.Group group = new envoy.server.data.Group(); envoy.server.data.Group group = new envoy.server.data.Group();
group.setName(groupCreation.get()); group.setName(groupCreation.get());
group.setContacts(new HashSet<>()); group.setContacts(new HashSet<>());
groupCreation.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add); groupCreation.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add);
group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIdBySocketID(socketID))); group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIDBySocketID(socketID)));
group.getContacts().forEach(c -> c.getContacts().add(group)); group.getContacts().forEach(c -> c.getContacts().add(group));
group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIdBySocketID(socketID))); group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIDBySocketID(socketID)));
persistenceManager.addContact(group); persistenceManager.addContact(group);
group.getContacts() group.getContacts()
.stream() .stream()
.map(Contact::getID) .map(Contact::getID)
.filter(connectionManager::isOnline) .filter(connectionManager::isOnline)
.map(connectionManager::getSocketID) .map(connectionManager::getSocketID)
.forEach(memberSocketID -> { .forEach(memberSocketID -> writeProxy.write(memberSocketID, new ContactOperation(group.toCommon(), ElementOperation.ADD)));
try {
writeProxy.write(memberSocketID, new ContactOperation(group.toCommon(), ElementOperation.ADD));
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -1,13 +1,14 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException; import static envoy.data.Message.MessageStatus.*;
import java.util.Date;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.EntityExistsException; import javax.persistence.EntityExistsException;
import envoy.data.GroupMessage; import envoy.data.GroupMessage;
import envoy.data.Message.MessageStatus;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
@ -24,60 +25,43 @@ import envoy.util.EnvoyLog;
*/ */
public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> { public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> {
private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(GroupCreationProcessor.class); private static final Logger logger = EnvoyLog.getLogger(GroupCreationProcessor.class);
@Override @Override
public void process(GroupMessage groupMessage, long socketID, ObjectWriteProxy writeProxy) { public void process(GroupMessage groupMessage, long socketID, ObjectWriteProxy writeProxy) {
groupMessage.nextStatus(); groupMessage.nextStatus();
ConnectionManager connectionManager = ConnectionManager.getInstance();
final var members = PersistenceManager.getInstance().getGroupByID(groupMessage.getRecipientID()).getContacts(); // Update statuses to SENT / RECEIVED depending on online status
members.forEach(user -> groupMessage.getMemberStatuses().replace(user.getID(), MessageStatus.SENT)); groupMessage.getMemberStatuses().replaceAll((memberID, status) -> connectionManager.isOnline(memberID) ? RECEIVED : SENT);
members.forEach(user -> { setMemberStatus(connectionManager, groupMessage, user.getID()); });
// Setting memberStatus of sender to READ
groupMessage.getMemberStatuses()
.replace(members.stream().filter(sender -> groupMessage.getSenderID() == sender.getID()).findAny().get().getID(), MessageStatus.READ);
// Checks if all memberMessageStatuses are RECEIVED and if so sets the // Set status for sender to READ
// groupMessage Status to RECEIVED and sends a MessageStatusChange to the groupMessage.getMemberStatuses().replace(groupMessage.getSenderID(), READ);
// sender, if he is still online.
if (!groupMessage.getMemberStatuses().containsValue(MessageStatus.SENT)) { // Increment the overall status to RECEIVED if necessary
groupMessage.setStatus(MessageStatus.RECEIVED); if (Collections.min(groupMessage.getMemberStatuses().values()) == RECEIVED) {
if (connectionManager.isOnline(connectionManager.getUserIdBySocketID(socketID))) try { groupMessage.nextStatus();
// Notify the sender of the status change
writeProxy.write(socketID, new MessageStatusChange(groupMessage)); writeProxy.write(socketID, new MessageStatusChange(groupMessage));
} catch (IOException e) {
logger.warning("Sender of the groupMessage online. Failed to send MessageStatusChange");
e.printStackTrace();
}
} }
members.stream() // Deliver the message to the recipients that are online
.filter(m -> groupMessage.getSenderID() != m.getID()) writeProxy.writeToOnlineContacts(
.forEach(user -> { sendToMember(connectionManager, groupMessage, user.getID(), writeProxy); }); persistenceManager.getGroupByID(groupMessage.getRecipientID())
.getContacts()
.stream()
.filter(c -> c.getID() != groupMessage.getSenderID()),
groupMessage);
try { try {
PersistenceManager.getInstance().addMessage(new envoy.server.data.GroupMessage(groupMessage, new Date())); PersistenceManager.getInstance().addMessage(new envoy.server.data.GroupMessage(groupMessage, LocalDateTime.now()));
} catch (EntityExistsException e) { } catch (EntityExistsException e) {
logger.warning("Received a groupMessage with an ID that already exists"); logger.warning("Received a groupMessage with an ID that already exists");
} }
} }
private void sendToMember(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID, ObjectWriteProxy writeProxy) {
if (connectionManager.isOnline(memberID)) try {
// If recipient is online, send the groupMessage directly
writeProxy.write(connectionManager.getSocketID(memberID), groupMessage);
} catch (IOException e) {
logger.warning("Recipient online. Failed to send message" + groupMessage.getID());
e.printStackTrace();
}
}
private void setMemberStatus(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID) {
if (connectionManager.isOnline(memberID))
// Update the message status of the member to RECEIVED
groupMessage.getMemberStatuses().replace(memberID, MessageStatus.RECEIVED);
}
@Override @Override
public Class<GroupMessage> getInputClass() { return GroupMessage.class; } public Class<GroupMessage> getInputClass() { return GroupMessage.class; }
} }

View File

@ -1,15 +1,20 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException; import static envoy.data.Message.MessageStatus.READ;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
import envoy.data.Message.MessageStatus; import envoy.data.Message.MessageStatus;
import envoy.event.GroupMessageStatusChange; import envoy.event.GroupMessageStatusChange;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.exception.EnvoyException;
import envoy.server.data.GroupMessage; import envoy.server.data.GroupMessage;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog;
/** /**
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
@ -21,46 +26,38 @@ import envoy.server.net.ObjectWriteProxy;
*/ */
public class GroupMessageStatusChangeProcessor implements ObjectProcessor<GroupMessageStatusChange> { public class GroupMessageStatusChangeProcessor implements ObjectProcessor<GroupMessageStatusChange> {
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance(); private final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private final Logger logger = EnvoyLog.getLogger(MessageStatusChangeProcessor.class);
@Override @Override
public void process(GroupMessageStatusChange input, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(GroupMessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) {
GroupMessage gmsg = (GroupMessage) persistenceManager.getMessageByID(input.getID()); GroupMessage gmsg = (GroupMessage) persistenceManager.getMessageByID(statusChange.getID());
if (gmsg.getStatus() == input.get())
throw new IOException(new EnvoyException("The groupMessage already has the status " + input.get()));
gmsg.getMemberMessageStatus().replace(input.getMemberID(), input.get()); // Any other status than READ is not supposed to be sent to the server
if (statusChange.get() != MessageStatus.READ) {
// Notifying the other members about memberStatusChange logger.log(Level.WARNING, "Invalid " + statusChange);
gmsg.getMemberMessageStatus().keySet().stream().filter(k -> connectionManager.getUserIdBySocketID(socketID) != k).forEach(k -> { return;
if(connectionManager.isOnline(k)) {
try {
writeProxy.write(connectionManager.getSocketID(k), input);
} catch (IOException e) {
// TODO Maybe Try Catch block is not necessarily needed but if so write proper
// logger statement here.
e.printStackTrace();
} }
}
});
if (!gmsg.getMemberMessageStatus().containsValue(MessageStatus.SENT) // Apply the status change
&& !gmsg.getMemberMessageStatus().containsValue(MessageStatus.RECEIVED)) { gmsg.getMemberMessageStatus().replace(statusChange.getMemberID(), statusChange.get());
// Notifying the other members about the status change
gmsg.getMemberMessageStatus()
.keySet()
.stream()
.filter(k -> connectionManager.getUserIDBySocketID(socketID) != k)
.filter(connectionManager::isOnline)
.forEach(k -> writeProxy.write(connectionManager.getSocketID(k), statusChange));
// Increment overall status to READ if necessary
if (Collections.min(gmsg.getMemberMessageStatus().values()) == READ) {
gmsg.read(); gmsg.read();
// Notifying the other members about messageStatusChange
gmsg.getMemberMessageStatus().keySet().forEach(k -> { // Notify online members about the status change
if (connectionManager.isOnline(k)) { writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
try { new MessageStatusChange(gmsg.getID(), gmsg.getStatus(), LocalDateTime.now()));
writeProxy.write(connectionManager.getSocketID(k), new MessageStatusChange(input.getID(), input.get(), input.getDate()));
} catch (IOException e) {
// TODO Maybe Try Catch block is not necessarily needed but if so write proper
// logger statement here.
e.printStackTrace();
}
}
});
} }
persistenceManager.updateMessage(gmsg); persistenceManager.updateMessage(gmsg);
} }

View File

@ -1,7 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import envoy.event.GroupResize; import envoy.event.GroupResize;
import envoy.server.data.Contact; import envoy.server.data.Contact;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
@ -22,7 +20,7 @@ public class GroupResizeProcessor implements ObjectProcessor<GroupResize> {
private static final ConnectionManager connectionManager = ConnectionManager.getInstance(); private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
@Override @Override
public void process(GroupResize groupResize, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(GroupResize groupResize, long socketID, ObjectWriteProxy writeProxy) {
// Acquire the group to resize from the database // Acquire the group to resize from the database
var group = persistenceManager.getGroupByID(groupResize.getGroupID()); var group = persistenceManager.getGroupByID(groupResize.getGroupID());
@ -47,13 +45,7 @@ public class GroupResizeProcessor implements ObjectProcessor<GroupResize> {
.map(Contact::getID) .map(Contact::getID)
.filter(connectionManager::isOnline) .filter(connectionManager::isOnline)
.map(connectionManager::getSocketID) .map(connectionManager::getSocketID)
.forEach(memberSocketID -> { .forEach(memberSocketID -> writeProxy.write(memberSocketID, commonGroup));
try {
writeProxy.write(memberSocketID, commonGroup);
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -3,12 +3,8 @@ package envoy.server.processors;
import static envoy.data.User.UserStatus.ONLINE; import static envoy.data.User.UserStatus.ONLINE;
import static envoy.event.HandshakeRejection.*; import static envoy.event.HandshakeRejection.*;
import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Arrays; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.NoResultException; import javax.persistence.NoResultException;
@ -46,7 +42,7 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
private static final Logger logger = EnvoyLog.getLogger(LoginCredentialProcessor.class); private static final Logger logger = EnvoyLog.getLogger(LoginCredentialProcessor.class);
@Override @Override
public void process(LoginCredentials credentials, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(LoginCredentials credentials, long socketID, ObjectWriteProxy writeProxy) {
// Cache this write proxy for user-independant notifications // Cache this write proxy for user-independant notifications
UserStatusChangeProcessor.setWriteProxy(writeProxy); UserStatusChangeProcessor.setWriteProxy(writeProxy);
@ -122,9 +118,6 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
pendingMessages.removeIf(GroupMessage.class::isInstance); pendingMessages.removeIf(GroupMessage.class::isInstance);
logger.fine("Sending " + pendingMessages.size() + " pending messages to " + user + "..."); logger.fine("Sending " + pendingMessages.size() + " pending messages to " + user + "...");
List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance().getPendingGroupMessages(user);
logger.fine("Sending " + pendingGroupMessages.size() + " pending group messages to " + user + "...");
for (var msg : pendingMessages) { for (var msg : pendingMessages) {
final var msgCommon = msg.toCommon(); final var msgCommon = msg.toCommon();
if (msg.getStatus() == MessageStatus.SENT) { if (msg.getStatus() == MessageStatus.SENT) {
@ -142,39 +135,36 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
} else writeProxy.write(socketID, new MessageStatusChange(msgCommon)); } else writeProxy.write(socketID, new MessageStatusChange(msgCommon));
} }
for (GroupMessage gmsg : pendingGroupMessages) { List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance().getPendingGroupMessages(user);
logger.fine("Sending " + pendingGroupMessages.size() + " pending group messages to " + user + "...");
for (var gmsg : pendingGroupMessages) {
final var gmsgCommon = gmsg.toCommon();
// Deliver the message to the user if he hasn't received it yet
if (gmsg.getMemberMessageStatus().get(user.getID()) == MessageStatus.SENT) { if (gmsg.getMemberMessageStatus().get(user.getID()) == MessageStatus.SENT) {
gmsg.getMemberMessageStatus().replace(user.getID(), MessageStatus.RECEIVED); gmsg.getMemberMessageStatus().replace(user.getID(), MessageStatus.RECEIVED);
envoy.data.GroupMessage groupMessage = gmsg.toCommon(); writeProxy.write(socketID, gmsgCommon);
writeProxy.write(socketID, groupMessage);
// Sending GroupMessageStatusChanges to all members, that already received the // Notify all online group members about the status change
// groupMessage. writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
gmsg.getMemberMessageStatus().keySet().stream() new GroupMessageStatusChange(gmsg.getID(), MessageStatus.RECEIVED, LocalDateTime.now(),
.filter(k -> (gmsg.getMemberMessageStatus().get(k) == MessageStatus.RECEIVED connectionManager.getUserIDBySocketID(socketID)));
|| gmsg.getMemberMessageStatus().get(k) == MessageStatus.READ) && connectionManager.getUserIdBySocketID(socketID) != k)
.forEach(k -> { if (Collections.min(gmsg.getMemberMessageStatus().values()) == MessageStatus.RECEIVED) {
if (connectionManager.isOnline(k)) try {
writeProxy.write(connectionManager.getSocketID(k),
(new GroupMessageStatusChange(gmsg.getID(), MessageStatus.RECEIVED, LocalDateTime.now(),
connectionManager.getUserIdBySocketID(socketID))));
} catch (IOException e) {
logger.log(Level.WARNING, "Could not deliver group message status changes", e);
}
});
if (!gmsg.getMemberMessageStatus().containsValue(MessageStatus.SENT)) {
gmsg.received(); gmsg.received();
// Sending MessageStatusChangeEvent to all other members
for (Map.Entry<Long, MessageStatus> entry : gmsg.getMemberMessageStatus().entrySet()) { // Notify online members about the status change
if (entry.getKey() == connectionManager.getUserIdBySocketID(socketID)) continue; writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
if (connectionManager.isOnline(entry.getKey())) { new MessageStatusChange(gmsg.getID(), gmsg.getStatus(), LocalDateTime.now()));
writeProxy.write(connectionManager.getSocketID(entry.getKey()), new MessageStatusChange(gmsg.toCommon()));
}
}
} }
PersistenceManager.getInstance().updateMessage(gmsg); PersistenceManager.getInstance().updateMessage(gmsg);
} else { } else {
// Sending memberStatusEvents and MessageStatusChange Events
writeProxy.write(socketID, new MessageStatusChange(gmsg.toCommon())); // Deliver just a status change instead of the whole message
// TODO: Notify group members about the status change
writeProxy.write(socketID, new MessageStatusChange(gmsgCommon));
} }
} }
} }

View File

@ -1,6 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -34,9 +33,8 @@ public class MessageProcessor implements ObjectProcessor<Message> {
public void process(Message message, long socketID, ObjectWriteProxy writeProxy) { public void process(Message message, long socketID, ObjectWriteProxy writeProxy) {
// Makes sure, that there are no groupMessages processed here, because // Makes sure, that there are no groupMessages processed here, because
// groupMessage is a subclass of message. // groupMessage is a subclass of message.
if (message instanceof envoy.data.GroupMessage)) { if (message instanceof envoy.data.GroupMessage) return;
return;
}
message.nextStatus(); message.nextStatus();
// Convert to server message // Convert to server message
@ -62,8 +60,6 @@ public class MessageProcessor implements ObjectProcessor<Message> {
} }
} catch (EntityExistsException e) { } catch (EntityExistsException e) {
logger.log(Level.WARNING, "Received " + message + " with an ID that already exists!"); logger.log(Level.WARNING, "Received " + message + " with an ID that already exists!");
} catch (IOException e) {
logger.log(Level.WARNING, "Failed to deliver " + message + ":", e);
} }
} }

View File

@ -1,14 +1,16 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException; import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import envoy.data.Message.MessageStatus; import envoy.data.Message.MessageStatus;
import envoy.event.GroupMessageStatusChange; import envoy.event.GroupMessageStatusChange;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.exception.EnvoyException;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog;
/** /**
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
@ -20,15 +22,20 @@ import envoy.server.net.ObjectWriteProxy;
*/ */
public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStatusChange> { public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStatusChange> {
private final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final ConnectionManager connectionManager = ConnectionManager.getInstance();
private final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private final Logger logger = EnvoyLog.getLogger(MessageStatusChangeProcessor.class);
@Override @Override
public void process(MessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(MessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) throws IOException {
// Filtering out subclass objects, which should not be processed here. // Filtering out subclass objects, which should not be processed here.
if (statusChange instanceof GroupMessageStatusChange)) return; if (statusChange instanceof GroupMessageStatusChange) return;
// Any other status than READ is not supposed to be sent to the server // Any other status than READ is not supposed to be sent to the server
if (statusChange.get() != MessageStatus.READ) throw new IOException(new EnvoyException(statusChange + " has an invalid status")); if (statusChange.get() != MessageStatus.READ) {
logger.log(Level.WARNING, "Invalid " + statusChange);
return;
}
final var msg = persistenceManager.getMessageByID(statusChange.getID()); final var msg = persistenceManager.getMessageByID(statusChange.getID());
msg.read(); msg.read();

View File

@ -5,8 +5,6 @@ import java.io.IOException;
import envoy.event.NameChange; import envoy.event.NameChange;
import envoy.server.data.Contact; import envoy.server.data.Contact;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.data.User;
import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
/** /**
@ -19,22 +17,16 @@ import envoy.server.net.ObjectWriteProxy;
*/ */
public class NameChangeProcessor implements ObjectProcessor<NameChange> { public class NameChangeProcessor implements ObjectProcessor<NameChange> {
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
@Override @Override
public void process(NameChange nameChange, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(NameChange nameChange, long socketID, ObjectWriteProxy writeProxy) throws IOException {
PersistenceManager persistenceManager = PersistenceManager.getInstance();
ConnectionManager connectionManager = ConnectionManager.getInstance();
Contact toUpdate = persistenceManager.getContactByID(nameChange.getID()); Contact toUpdate = persistenceManager.getContactByID(nameChange.getID());
toUpdate.setName(nameChange.get()); toUpdate.setName(nameChange.get());
persistenceManager.updateContact(toUpdate); persistenceManager.updateContact(toUpdate);
// Notify online contacts of the name change // Notify online contacts of the name change
toUpdate.getContacts().stream().filter(User.class::isInstance).map(Contact::getID).filter(connectionManager::isOnline).forEach(userID -> { writeProxy.writeToOnlineContacts(toUpdate.getContacts(), nameChange);
try {
writeProxy.write(userID, nameChange);
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -1,19 +1,17 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import envoy.data.User.UserStatus; import envoy.data.User.UserStatus;
import envoy.event.UserStatusChange; import envoy.event.UserStatusChange;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.data.User; import envoy.server.data.User;
import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog; import envoy.util.EnvoyLog;
/** /**
* This processor handles incoming {@link UserStatusChange}.<br> * This processor handles incoming {@link UserStatusChange}.
* <br> * <p>
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
* File: <strong>UserStatusChangeProcessor.java</strong><br> * File: <strong>UserStatusChangeProcessor.java</strong><br>
* Created: <strong>1 Feb 2020</strong><br> * Created: <strong>1 Feb 2020</strong><br>
@ -24,15 +22,15 @@ import envoy.util.EnvoyLog;
public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChange> { public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChange> {
private static ObjectWriteProxy writeProxy; private static ObjectWriteProxy writeProxy;
private static PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(UserStatusChangeProcessor.class); private static final Logger logger = EnvoyLog.getLogger(UserStatusChangeProcessor.class);
@Override @Override
public Class<UserStatusChange> getInputClass() { return UserStatusChange.class; } public Class<UserStatusChange> getInputClass() { return UserStatusChange.class; }
@Override @Override
public void process(UserStatusChange input, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(UserStatusChange input, long socketID, ObjectWriteProxy writeProxy) {
// new status should not equal old status // new status should not equal old status
if (input.get().equals(persistenceManager.getUserByID(input.getID()).getStatus())) { if (input.get().equals(persistenceManager.getUserByID(input.getID()).getStatus())) {
logger.warning("Received an unnecessary UserStatusChange"); logger.warning("Received an unnecessary UserStatusChange");
@ -55,7 +53,7 @@ public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChan
persistenceManager.updateContact(user); persistenceManager.updateContact(user);
// Handling for contacts that are already online // Handling for contacts that are already online
notifyContacts(user); writeProxy.writeToOnlineContacts(user.getContacts(), new UserStatusChange(user.getID(), user.getStatus()));
} }
/** /**
@ -64,25 +62,6 @@ public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChan
*/ */
public static void updateUserStatus(UserStatusChange evt) { updateUserStatus(persistenceManager.getUserByID(evt.getID())); } public static void updateUserStatus(UserStatusChange evt) { updateUserStatus(persistenceManager.getUserByID(evt.getID())); }
/**
* notifies active contacts of this {@link User} that his {@link UserStatus} has
* changed
*
* @param user the {@link User}
* @since Envoy Server Standalone v0.1-alpha
*/
private static void notifyContacts(User user) {
UserStatusChange evt = new UserStatusChange(user.getID(), user.getStatus());
ConnectionManager connectionManager = ConnectionManager.getInstance();
try {
for (envoy.server.data.Contact contact : user.getContacts())
if (connectionManager.isOnline(contact.getID())) writeProxy.write(connectionManager.getSocketID(contact.getID()), evt);
} catch (IOException e) {
e.printStackTrace();
logger.warning("Could not notify online contacts of user " + evt.getID() + " that his status has been changed");
}
}
/** /**
* This method is only called by the LoginCredentialProcessor because every * This method is only called by the LoginCredentialProcessor because every
* user needs to login (open a socket) before changing his status. * user needs to login (open a socket) before changing his status.