Some refactorings based on suggestions from @delvh
This commit is contained in:
		| @@ -213,26 +213,26 @@ public class PersistenceManager { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	/** | 	/** | ||||||
| 	 * Adds a user to the contact list of another user and vice versa. | 	 * Adds a contact to the contact list of another contact and vice versa. | ||||||
| 	 * | 	 * | ||||||
| 	 * @param userId1 the ID of the first user | 	 * @param contactID1 the ID of the first contact | ||||||
| 	 * @param userId2 the ID of the second user | 	 * @param contactID2 the ID of the second contact | ||||||
| 	 * @since Envoy Server Standalone v0.1-alpha | 	 * @since Envoy Server Standalone v0.1-alpha | ||||||
| 	 */ | 	 */ | ||||||
| 	public void addUserContact(long userId1, long userId2) { | 	public void addContactBidirectional(long contactID1, long contactID2) { | ||||||
|  |  | ||||||
| 		// Get users by ID | 		// Get users by ID | ||||||
| 		Contact	u1	= getContactByID(userId1); | 		Contact	c1	= getContactByID(contactID1); | ||||||
| 		Contact	u2	= getContactByID(userId2); | 		Contact	c2	= getContactByID(contactID2); | ||||||
|  |  | ||||||
| 		// Add users to each others contact lists | 		// Add users to each others contact lists | ||||||
| 		u1.getContacts().add(u2); | 		c1.getContacts().add(c2); | ||||||
| 		u2.getContacts().add(u1); | 		c2.getContacts().add(c1); | ||||||
|  |  | ||||||
| 		// Synchronize changes with the database | 		// Synchronize changes with the database | ||||||
| 		transaction.begin(); | 		transaction.begin(); | ||||||
| 		entityManager.merge(u1); | 		entityManager.merge(c1); | ||||||
| 		entityManager.merge(u2); | 		entityManager.merge(c2); | ||||||
| 		transaction.commit(); | 		transaction.commit(); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,10 +1,6 @@ | |||||||
| package envoy.server.net; | package envoy.server.net; | ||||||
|  |  | ||||||
| import java.util.Date; | import java.util.*; | ||||||
| import java.util.HashMap; |  | ||||||
| import java.util.HashSet; |  | ||||||
| import java.util.Map; |  | ||||||
| import java.util.Set; |  | ||||||
| import java.util.stream.Collectors; | import java.util.stream.Collectors; | ||||||
|  |  | ||||||
| import com.jenkov.nioserver.ISocketIdListener; | import com.jenkov.nioserver.ISocketIdListener; | ||||||
| @@ -53,7 +49,7 @@ public class ConnectionManager implements ISocketIdListener { | |||||||
| 	public void socketCancelled(long socketID) { | 	public void socketCancelled(long socketID) { | ||||||
| 		if (!pendingSockets.remove(socketID)) { | 		if (!pendingSockets.remove(socketID)) { | ||||||
| 			// Notify contacts of this users offline-going | 			// Notify contacts of this users offline-going | ||||||
| 			envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIdBySocketId(socketID)); | 			envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIdBySocketID(socketID)); | ||||||
| 			user.setStatus(UserStatus.OFFLINE); | 			user.setStatus(UserStatus.OFFLINE); | ||||||
| 			user.setLastSeen(new Date()); | 			user.setLastSeen(new Date()); | ||||||
| 			UserStatusChangeProcessor.updateUserStatus(user); | 			UserStatusChangeProcessor.updateUserStatus(user); | ||||||
| @@ -83,14 +79,14 @@ public class ConnectionManager implements ISocketIdListener { | |||||||
| 	 * @return the ID of the socket | 	 * @return the ID of the socket | ||||||
| 	 * @since Envoy Server Standalone v0.1-alpha | 	 * @since Envoy Server Standalone v0.1-alpha | ||||||
| 	 */ | 	 */ | ||||||
| 	public long getSocketId(long userID) { return sockets.get(userID); } | 	public long getSocketID(long userID) { return sockets.get(userID); } | ||||||
|  |  | ||||||
| 	/** | 	/** | ||||||
| 	 * @param socketID the id of the socket whose User is needed | 	 * @param socketID the id of the socket whose User is needed | ||||||
| 	 * @return the userId associated with this socketId | 	 * @return the userId associated with this socketId | ||||||
| 	 * @since Envoy Server Standalone v0.1-alpha | 	 * @since Envoy Server Standalone v0.1-alpha | ||||||
| 	 */ | 	 */ | ||||||
| 	public long getUserIdBySocketId(long socketID) { | 	public long getUserIdBySocketID(long socketID) { | ||||||
| 		return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey(); | 		return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey(); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -108,8 +104,6 @@ public class ConnectionManager implements ISocketIdListener { | |||||||
| 	public Set<Long> getOnlineUsers() { return sockets.keySet(); } | 	public Set<Long> getOnlineUsers() { return sockets.keySet(); } | ||||||
|  |  | ||||||
| 	/** | 	/** | ||||||
| 	 * Returns all members of a group who are currently online. |  | ||||||
| 	 * |  | ||||||
| 	 * @param group the group to search for | 	 * @param group the group to search for | ||||||
| 	 * @return a set of all IDs of currently active members in this group | 	 * @return a set of all IDs of currently active members in this group | ||||||
| 	 * @since Envoy Server Standalone v0.1-beta | 	 * @since Envoy Server Standalone v0.1-beta | ||||||
|   | |||||||
| @@ -27,15 +27,15 @@ public class ContactOperationProcessor implements ObjectProcessor<ContactOperati | |||||||
| 	public void process(ContactOperationEvent evt, long socketId, ObjectWriteProxy writeProxy) throws IOException { | 	public void process(ContactOperationEvent evt, long socketId, ObjectWriteProxy writeProxy) throws IOException { | ||||||
| 		switch (evt.getOperationType()) { | 		switch (evt.getOperationType()) { | ||||||
| 			case ADD: | 			case ADD: | ||||||
| 				final long userID = ConnectionManager.getInstance().getUserIdBySocketId(socketId); | 				final long userID = ConnectionManager.getInstance().getUserIdBySocketID(socketId); | ||||||
| 				final long contactId = evt.get().getID(); | 				final long contactId = evt.get().getID(); | ||||||
|  |  | ||||||
| 				logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID)); | 				logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID)); | ||||||
| 				PersistenceManager.getInstance().addUserContact(userID, contactId); | 				PersistenceManager.getInstance().addContactBidirectional(userID, contactId); | ||||||
|  |  | ||||||
| 				// Notify the contact if online | 				// Notify the contact if online | ||||||
| 				if (ConnectionManager.getInstance().isOnline(contactId)) | 				if (ConnectionManager.getInstance().isOnline(contactId)) | ||||||
| 					writeProxy.write(connectionManager.getSocketId(contactId), | 					writeProxy.write(connectionManager.getSocketID(contactId), | ||||||
| 							new ContactOperationEvent(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD)); | 							new ContactOperationEvent(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD)); | ||||||
| 				break; | 				break; | ||||||
| 			default: | 			default: | ||||||
|   | |||||||
| @@ -33,7 +33,7 @@ public class ContactSearchProcessor implements ObjectProcessor<ContactSearchRequ | |||||||
| 	public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException { | 	public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException { | ||||||
| 		writeProxy.write(socketID, | 		writeProxy.write(socketID, | ||||||
| 				new ContactSearchResult(PersistenceManager.getInstance() | 				new ContactSearchResult(PersistenceManager.getInstance() | ||||||
| 					.searchUsers(request.get(), ConnectionManager.getInstance().getUserIdBySocketId(socketID)) | 					.searchUsers(request.get(), ConnectionManager.getInstance().getUserIdBySocketID(socketID)) | ||||||
| 					.stream() | 					.stream() | ||||||
| 					.map(User::toCommon) | 					.map(User::toCommon) | ||||||
| 					.collect(Collectors.toList()))); | 					.collect(Collectors.toList()))); | ||||||
|   | |||||||
| @@ -30,15 +30,15 @@ public class GroupCreationProcessor implements ObjectProcessor<GroupCreationEven | |||||||
| 		group.setName(input.get()); | 		group.setName(input.get()); | ||||||
| 		group.setContacts(new HashSet<>()); | 		group.setContacts(new HashSet<>()); | ||||||
| 		input.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add); | 		input.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add); | ||||||
| 		group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIdBySocketId(socketID))); | 		group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIdBySocketID(socketID))); | ||||||
| 		group.getContacts().forEach(c -> c.getContacts().add(group)); | 		group.getContacts().forEach(c -> c.getContacts().add(group)); | ||||||
| 		group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIdBySocketId(socketID))); | 		group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIdBySocketID(socketID))); | ||||||
| 		persistenceManager.addContact(group); | 		persistenceManager.addContact(group); | ||||||
| 		group.getContacts() | 		group.getContacts() | ||||||
| 			.stream() | 			.stream() | ||||||
| 			.map(Contact::getID) | 			.map(Contact::getID) | ||||||
| 			.filter(connectionManager::isOnline) | 			.filter(connectionManager::isOnline) | ||||||
| 			.map(connectionManager::getSocketId) | 			.map(connectionManager::getSocketID) | ||||||
| 			.forEach(memberSocketID -> { | 			.forEach(memberSocketID -> { | ||||||
| 				try { | 				try { | ||||||
| 					writeProxy.write(memberSocketID, new ContactOperationEvent(group.toCommon(), ElementOperation.ADD)); | 					writeProxy.write(memberSocketID, new ContactOperationEvent(group.toCommon(), ElementOperation.ADD)); | ||||||
|   | |||||||
| @@ -31,10 +31,8 @@ public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> { | |||||||
| 		ConnectionManager connectionManager = ConnectionManager.getInstance(); | 		ConnectionManager connectionManager = ConnectionManager.getInstance(); | ||||||
|  |  | ||||||
| 		final var members = PersistenceManager.getInstance().getGroupByID(groupMessage.getRecipientID()).getContacts(); | 		final var members = PersistenceManager.getInstance().getGroupByID(groupMessage.getRecipientID()).getContacts(); | ||||||
| 		for (long i = 0; i < groupMessage.getMemberStatuses().size(); i++) { | 		groupMessage.getMemberStatuses().replaceAll((id, oldStatus) -> MessageStatus.SENT); | ||||||
| 			groupMessage.getMemberStatuses().replace(i, MessageStatus.SENT); | 		members.forEach(user -> setMemberStatus(connectionManager, groupMessage, user.getID())); | ||||||
| 		} |  | ||||||
| 		members.forEach(user -> { setMemberStatus(connectionManager, groupMessage, user.getID()); }); |  | ||||||
|  |  | ||||||
| 		// Checks if all memberMessageStatuses are RECEIVED and if so sets the | 		// Checks if all memberMessageStatuses are RECEIVED and if so sets the | ||||||
| 		// groupMessage Status to RECEIVED. | 		// groupMessage Status to RECEIVED. | ||||||
| @@ -51,7 +49,7 @@ public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> { | |||||||
| 	private void sendToMember(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID, ObjectWriteProxy writeProxy) { | 	private void sendToMember(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID, ObjectWriteProxy writeProxy) { | ||||||
| 		if (connectionManager.isOnline(memberID)) try { | 		if (connectionManager.isOnline(memberID)) try { | ||||||
| 			// If recipient is online, send the groupMessage directly | 			// If recipient is online, send the groupMessage directly | ||||||
| 			writeProxy.write(connectionManager.getSocketId(memberID), groupMessage); | 			writeProxy.write(connectionManager.getSocketID(memberID), groupMessage); | ||||||
| 		} catch (IOException e) { | 		} catch (IOException e) { | ||||||
| 			logger.warning("Recipient online. Failed to send message" + groupMessage.getID()); | 			logger.warning("Recipient online. Failed to send message" + groupMessage.getID()); | ||||||
| 			e.printStackTrace(); | 			e.printStackTrace(); | ||||||
|   | |||||||
| @@ -46,7 +46,7 @@ public class GroupResizeProcessor implements ObjectProcessor<GroupResizeEvent> { | |||||||
| 			.stream() | 			.stream() | ||||||
| 			.map(Contact::getID) | 			.map(Contact::getID) | ||||||
| 			.filter(connectionManager::isOnline) | 			.filter(connectionManager::isOnline) | ||||||
| 			.map(connectionManager::getSocketId) | 			.map(connectionManager::getSocketID) | ||||||
| 			.forEach(memberSocketID -> { | 			.forEach(memberSocketID -> { | ||||||
| 				try { | 				try { | ||||||
| 					writeProxy.write(memberSocketID, commonGroup); | 					writeProxy.write(memberSocketID, commonGroup); | ||||||
|   | |||||||
| @@ -69,7 +69,7 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential | |||||||
| 				if (connectionManager.isOnline(msg.getSender().getID())) { | 				if (connectionManager.isOnline(msg.getSender().getID())) { | ||||||
| 					var evt = new MessageStatusChangeEvent(msg.toCommon()); | 					var evt = new MessageStatusChangeEvent(msg.toCommon()); | ||||||
| 					logger.info("Sending messageStatusChangeEvent to sender " + evt); | 					logger.info("Sending messageStatusChangeEvent to sender " + evt); | ||||||
| 					writeProxy.write(connectionManager.getSocketId(msg.getSender().getID()), evt); | 					writeProxy.write(connectionManager.getSocketID(msg.getSender().getID()), evt); | ||||||
| 				} | 				} | ||||||
| 				PersistenceManager.getInstance().updateMessage(msg); | 				PersistenceManager.getInstance().updateMessage(msg); | ||||||
| 			} else { | 			} else { | ||||||
|   | |||||||
| @@ -8,7 +8,6 @@ import javax.persistence.EntityExistsException; | |||||||
|  |  | ||||||
| import envoy.data.Message; | import envoy.data.Message; | ||||||
| import envoy.data.Message.MessageStatus; | import envoy.data.Message.MessageStatus; | ||||||
| import envoy.event.MessageStatusChangeEvent; |  | ||||||
| import envoy.server.data.PersistenceManager; | import envoy.server.data.PersistenceManager; | ||||||
| import envoy.server.net.ConnectionManager; | import envoy.server.net.ConnectionManager; | ||||||
| import envoy.server.net.ObjectWriteProxy; | import envoy.server.net.ObjectWriteProxy; | ||||||
| @@ -31,7 +30,7 @@ public class MessageProcessor implements ObjectProcessor<Message> { | |||||||
|  |  | ||||||
| 	@Override | 	@Override | ||||||
| 	public void process(Message message, long socketID, ObjectWriteProxy writeProxy) { | 	public void process(Message message, long socketID, ObjectWriteProxy writeProxy) { | ||||||
| 		if (message.getStatus!=MessageStatus.WAITING) { | 		if (message.getStatus() != MessageStatus.WAITING) { | ||||||
| 			logger.warning("Received message with invalid status: " + message); | 			logger.warning("Received message with invalid status: " + message); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| @@ -49,7 +48,7 @@ public class MessageProcessor implements ObjectProcessor<Message> { | |||||||
| 	private void sendToUser(ConnectionManager connectionManager, Message message, ObjectWriteProxy writeProxy) { | 	private void sendToUser(ConnectionManager connectionManager, Message message, ObjectWriteProxy writeProxy) { | ||||||
| 		if (connectionManager.isOnline(message.getRecipientID())) try { | 		if (connectionManager.isOnline(message.getRecipientID())) try { | ||||||
| 			// If recipient is online, send the message directly | 			// If recipient is online, send the message directly | ||||||
| 			writeProxy.write(connectionManager.getSocketId(message.getRecipientID()), message); | 			writeProxy.write(connectionManager.getSocketID(message.getRecipientID()), message); | ||||||
| 			// Update the message status to RECEIVED | 			// Update the message status to RECEIVED | ||||||
| 			message.setReceivedDate(new Date()); | 			message.setReceivedDate(new Date()); | ||||||
| 			message.nextStatus(); | 			message.nextStatus(); | ||||||
|   | |||||||
| @@ -34,7 +34,7 @@ public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStat | |||||||
|  |  | ||||||
| 		// Notifies the sender of the message about the status-update to READ | 		// Notifies the sender of the message about the status-update to READ | ||||||
| 		final long senderID = msg.getSender().getID(); | 		final long senderID = msg.getSender().getID(); | ||||||
| 		if (connectionManager.isOnline(senderID)) writeProxy.write(connectionManager.getSocketId(senderID), input); | 		if (connectionManager.isOnline(senderID)) writeProxy.write(connectionManager.getSocketID(senderID), input); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@Override | 	@Override | ||||||
|   | |||||||
| @@ -76,7 +76,7 @@ public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChan | |||||||
| 		ConnectionManager		connectionManager	= ConnectionManager.getInstance(); | 		ConnectionManager		connectionManager	= ConnectionManager.getInstance(); | ||||||
| 		try { | 		try { | ||||||
| 			for (envoy.server.data.Contact contact : user.getContacts()) | 			for (envoy.server.data.Contact contact : user.getContacts()) | ||||||
| 				if (connectionManager.isOnline(contact.getID())) writeProxy.write(connectionManager.getSocketId(contact.getID()), evt); | 				if (connectionManager.isOnline(contact.getID())) writeProxy.write(connectionManager.getSocketID(contact.getID()), evt); | ||||||
| 		} catch (IOException e) { | 		} catch (IOException e) { | ||||||
| 			e.printStackTrace(); | 			e.printStackTrace(); | ||||||
| 			logger.warning("Could not notify online contacts of user " + evt.getID() + " that his status has been changed"); | 			logger.warning("Could not notify online contacts of user " + evt.getID() + " that his status has been changed"); | ||||||
|   | |||||||
| @@ -24,4 +24,4 @@ | |||||||
|  |  | ||||||
| 		</properties> | 		</properties> | ||||||
| 	</persistence-unit> | 	</persistence-unit> | ||||||
| </persistence> | </persistence> | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user