Added EventProcessor and methods to handle MessageStatus changes
additionally cleaned up whole project, fixed some Javadoc errors and added a few database and connection options. Sorry for the huge commit, there was almost no time inbetween where a commit would have been possible, as to solve every problem, a new problem arose. However, as of now, f/message_handling should be ready to be merged into develop, besides that it could not be tested yet.
This commit is contained in:
parent
3fc7ee7f3f
commit
dc6199806f
@ -1,7 +1,11 @@
|
||||
eclipse.preferences.version=1
|
||||
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
|
||||
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
|
||||
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
|
||||
org.eclipse.jdt.core.compiler.compliance=1.8
|
||||
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
|
||||
org.eclipse.jdt.core.compiler.debug.localVariable=generate
|
||||
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
|
||||
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
|
||||
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
|
||||
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
|
||||
|
@ -54,13 +54,13 @@ public class ConnectionManager implements ISocketIdListener {
|
||||
/**
|
||||
* Associates a socket ID with a user ID.
|
||||
*
|
||||
* @param socketId the socket ID
|
||||
* @param userId the user ID
|
||||
* @param socketId the socket ID
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public void registerUser(long socketId, long userId) {
|
||||
sockets.put(socketId, userId);
|
||||
pendingSockets.remove(socketId);
|
||||
public void registerUser(long userId, long socketId) {
|
||||
sockets.put(userId, socketId);
|
||||
pendingSockets.remove(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,32 +0,0 @@
|
||||
package envoy.server;
|
||||
|
||||
import envoy.data.Message;
|
||||
import envoy.server.net.ObjectWriteProxy;
|
||||
|
||||
/**
|
||||
* This {@link ObjectProcessor} handles incoming {@link Message}s.<br>
|
||||
* <br>
|
||||
* Project: <strong>envoy-server-standalone</strong><br>
|
||||
* File: <strong>MessageProcessor.java</strong><br>
|
||||
* Created: <strong>30.12.2019</strong><br>
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public class MessageProcessor implements ObjectProcessor<Message> {
|
||||
|
||||
@Override
|
||||
public Class<Message> getInputClass() { return Message.class; }
|
||||
|
||||
@Override
|
||||
public void process(Message message, long socketId, ObjectWriteProxy writeProxy) {
|
||||
|
||||
// TODO: Send message to recipient if online
|
||||
ConnectionManager connectionManager = ConnectionManager.getInstance();
|
||||
if (connectionManager.isOnline(message.getRecipientId())) {
|
||||
|
||||
}
|
||||
|
||||
// TODO: Add message to database
|
||||
}
|
||||
}
|
@ -27,7 +27,8 @@ public interface ObjectProcessor<T> {
|
||||
/**
|
||||
* @param input the request object
|
||||
* @param socketId the ID of the socket from which the object was received
|
||||
* @return the response object
|
||||
* @param writeProxy the object that allows writing to a client
|
||||
* @throws IOException if something went wrong during processing
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
void process(T input, long socketId, ObjectWriteProxy writeProxy) throws IOException;
|
||||
|
@ -8,6 +8,9 @@ import com.jenkov.nioserver.Server;
|
||||
|
||||
import envoy.server.net.ObjectMessageProcessor;
|
||||
import envoy.server.net.ObjectMessageReader;
|
||||
import envoy.server.processors.EventProcessor;
|
||||
import envoy.server.processors.LoginCredentialProcessor;
|
||||
import envoy.server.processors.MessageProcessor;
|
||||
|
||||
/**
|
||||
* Starts the server.<br>
|
||||
@ -32,6 +35,7 @@ public class Startup {
|
||||
Set<ObjectProcessor<?>> processors = new HashSet<>();
|
||||
processors.add(new LoginCredentialProcessor());
|
||||
processors.add(new MessageProcessor());
|
||||
processors.add(new EventProcessor());
|
||||
// new PersistenceManager();
|
||||
Server server = new Server(8080, () -> new ObjectMessageReader(), new ObjectMessageProcessor(processors));
|
||||
server.start();
|
||||
|
@ -12,6 +12,7 @@ import javax.persistence.Temporal;
|
||||
import javax.persistence.TemporalType;
|
||||
|
||||
import envoy.data.MessageBuilder;
|
||||
import envoy.server.database.PersistenceManager;
|
||||
|
||||
/**
|
||||
* This class serves as a way to let Hibernate communicate with the server
|
||||
@ -32,11 +33,10 @@ import envoy.data.MessageBuilder;
|
||||
{ @NamedQuery(
|
||||
query = "SELECT m FROM Message m WHERE m.recipient =:recipient AND m.status = envoy.data.Message$MessageStatus.SENT",
|
||||
name = "getUnreadMessages"
|
||||
),
|
||||
@NamedQuery(
|
||||
), @NamedQuery(
|
||||
query = "SELECT m FROM Message m WHERE m.sender =:sender AND m.status = :status",
|
||||
name = "find read messages"//TODO do we need this namedQuery?
|
||||
), @NamedQuery(query = "SELECT m FROM Message m WHERE m.id = :messageId", name = "get message") }//TODO do we need this namedQuery?
|
||||
name = "find read messages"// TODO do we need this namedQuery?
|
||||
), @NamedQuery(query = "SELECT m FROM Message m WHERE m.id = :messageId", name = "getMessageById") }
|
||||
)
|
||||
public class Message {
|
||||
|
||||
@ -76,9 +76,14 @@ public class Message {
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public Message(envoy.data.Message message) {
|
||||
PersistenceManager persMan = PersistenceManager.getPersistenceManager();
|
||||
id = message.getId();
|
||||
status = message.getStatus();
|
||||
text = message.getText();
|
||||
creationDate = message.getCreationDate();
|
||||
sender = persMan.getUserById(message.getSenderId());
|
||||
recipient = persMan.getUserById(message.getRecipientId());
|
||||
// attachment = message.getAttachment().toByteArray();DOES NOT WORK YET
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -44,6 +44,32 @@ public class User {
|
||||
@ElementCollection
|
||||
private List<User> contacts;
|
||||
|
||||
/**
|
||||
* Creates an instance of @link{User}.
|
||||
* Solely used for JPA/ Hibernate
|
||||
*
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public User() {}
|
||||
|
||||
/**
|
||||
* Creates an instance of @link{User}.
|
||||
*
|
||||
* @param user the {@link envoy.data.User} to convert
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public User(envoy.data.User user) {
|
||||
id = user.getId();
|
||||
name = user.getName();
|
||||
status = user.getStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a database {@link User} converted into an {@link envoy.data.User}
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public envoy.data.User toCommonUser() { return new envoy.data.User(this.id, this.name); }
|
||||
|
||||
/**
|
||||
* @return the id of a {link envoy.data.User}
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
|
@ -20,6 +20,21 @@ import envoy.server.data.User;
|
||||
*/
|
||||
public class PersistenceManager {
|
||||
|
||||
private static final PersistenceManager persistenceManager = new PersistenceManager();
|
||||
|
||||
/**
|
||||
* Creates the singleton instance of the @link{PersistenceManager}.
|
||||
*
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
private PersistenceManager() {}
|
||||
|
||||
/**
|
||||
* @return the {@link PersistenceManager} singleton
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public static PersistenceManager getPersistenceManager() { return persistenceManager; }
|
||||
|
||||
private EntityManager entityManager = Persistence.createEntityManagerFactory("envoy").createEntityManager();
|
||||
|
||||
/**
|
||||
@ -55,7 +70,7 @@ public class PersistenceManager {
|
||||
public void updateMessage(Message message) { entityManager.unwrap(Session.class).merge(message); }
|
||||
|
||||
/**
|
||||
* Searches for a user with a specific id.
|
||||
* Searches for a {@link User} with a specific id.
|
||||
*
|
||||
* @param id - the id to search for
|
||||
* @return the user with the specified id
|
||||
@ -63,6 +78,17 @@ public class PersistenceManager {
|
||||
*/
|
||||
public User getUserById(long id) { return (User) entityManager.createNamedQuery("getUserById").setParameter("id", id).getSingleResult(); }
|
||||
|
||||
/**
|
||||
* Searches for a {@link Message} with a specific id.
|
||||
*
|
||||
* @param id - the id to search for
|
||||
* @return the message with the specified id
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public Message getMessageById(long id) {
|
||||
return (Message) entityManager.createNamedQuery("getMessageById").setParameter("id", id).getSingleResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all messages received while being offline.
|
||||
*
|
||||
|
@ -41,11 +41,7 @@ public class ObjectMessageProcessor implements IMessageProcessor {
|
||||
System.out.println("Read object: " + obj.toString());
|
||||
|
||||
// Process object
|
||||
processors.stream()
|
||||
.filter(p -> p.getInputClass().isInstance(obj))
|
||||
.forEach((@SuppressWarnings(
|
||||
"rawtypes"
|
||||
) ObjectProcessor p) -> {
|
||||
processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
|
||||
try {
|
||||
p.process(p.getInputClass().cast(obj), message.socketId, new ObjectWriteProxy(writeProxy));
|
||||
} catch (IOException e) {
|
||||
|
@ -8,6 +8,8 @@ import com.jenkov.nioserver.WriteProxy;
|
||||
import envoy.util.SerializationUtils;
|
||||
|
||||
/**
|
||||
* This class defines methods to send an object to a client.<br>
|
||||
* <br>
|
||||
* Project: <strong>envoy-server-standalone</strong><br>
|
||||
* File: <strong>ObjectWriteProxy.java</strong><br>
|
||||
* Created: <strong>04.01.2020</strong><br>
|
||||
@ -19,8 +21,20 @@ public class ObjectWriteProxy {
|
||||
|
||||
private final WriteProxy writeProxy;
|
||||
|
||||
/**
|
||||
* Creates an instance of @link{ObjectWriteProxy}.
|
||||
*
|
||||
* @param writeProxy the {@link WriteProxy} to write objects to another client
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public ObjectWriteProxy(WriteProxy writeProxy) { this.writeProxy = writeProxy; }
|
||||
|
||||
/**
|
||||
* @param recipientSocketId the socket id of the recipient
|
||||
* @param obj the object to return to the client
|
||||
* @throws IOException if the serialization of the object failed
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public void write(long recipientSocketId, Object obj) throws IOException {
|
||||
// Create message targeted at the client
|
||||
Message response = writeProxy.getMessage();
|
||||
|
77
src/main/java/envoy/server/processors/EventProcessor.java
Normal file
77
src/main/java/envoy/server/processors/EventProcessor.java
Normal file
@ -0,0 +1,77 @@
|
||||
package envoy.server.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import envoy.data.Message;
|
||||
import envoy.data.Message.MessageStatus;
|
||||
import envoy.event.Event;
|
||||
import envoy.event.MessageStatusChangeEvent;
|
||||
import envoy.exception.EnvoyException;
|
||||
import envoy.server.ConnectionManager;
|
||||
import envoy.server.ObjectProcessor;
|
||||
import envoy.server.database.PersistenceManager;
|
||||
import envoy.server.net.ObjectWriteProxy;
|
||||
|
||||
/**
|
||||
* Project: <strong>envoy-server-standalone</strong><br>
|
||||
* File: <strong>EventProcessor.java</strong><br>
|
||||
* Created: <strong>10 Jan 2020</strong><br>
|
||||
*
|
||||
* @author Leon Hofmeister
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public class EventProcessor implements ObjectProcessor<Event<?>> {
|
||||
|
||||
private Event<?> event;
|
||||
|
||||
/**
|
||||
* Creates an instance of @link{EventProcessor}.
|
||||
*
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public EventProcessor() {}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Class<Event<?>> getInputClass() { return (Class<Event<?>>) event.getClass(); }
|
||||
|
||||
@Override
|
||||
public void process(Event<?> input, long socketId, ObjectWriteProxy writeProxy) throws IOException {
|
||||
event = input;
|
||||
if (event instanceof MessageStatusChangeEvent) try {
|
||||
applyMessageStatusChange((MessageStatusChangeEvent) event, writeProxy);
|
||||
} catch (EnvoyException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Redirects messageStatus changes to the database and to the recipient of the
|
||||
* {@link Message}.
|
||||
*
|
||||
* @param event the {@link MessageStatusChangeEvent} to adjust
|
||||
* @throws EnvoyException if the {@link Message} has an invalid state
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
private void applyMessageStatusChange(MessageStatusChangeEvent event, ObjectWriteProxy writeProxy) throws EnvoyException {
|
||||
if (!(event.get() == MessageStatus.READ))// check that no invalid MessageStatuses are sent
|
||||
throw new EnvoyException("Message" + event.getId() + "has an invalid status");
|
||||
|
||||
ConnectionManager conMan = ConnectionManager.getInstance();
|
||||
PersistenceManager perMan = PersistenceManager.getPersistenceManager();
|
||||
envoy.server.data.Message msg = perMan.getMessageById(event.getId());
|
||||
|
||||
msg.setStatus(event.get());
|
||||
msg.setReadDate(event.getDate());
|
||||
|
||||
if (conMan.isOnline(msg.getRecipient().getId())) try {
|
||||
writeProxy.write(conMan.getSocketId(msg.getRecipient().getId()), event);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Recipient online. Failed to send MessageStatusChangedEvent at message" + event.getId());
|
||||
e.printStackTrace();
|
||||
}
|
||||
perMan.updateMessage(msg);
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,11 +1,18 @@
|
||||
package envoy.server;
|
||||
package envoy.server.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import envoy.data.Contacts;
|
||||
import envoy.data.LoginCredentials;
|
||||
import envoy.data.Message.MessageStatus;
|
||||
import envoy.data.User;
|
||||
import envoy.server.ConnectionManager;
|
||||
import envoy.server.ObjectProcessor;
|
||||
import envoy.server.data.Message;
|
||||
import envoy.server.database.PersistenceManager;
|
||||
import envoy.server.net.ObjectWriteProxy;
|
||||
|
||||
/**
|
||||
@ -42,5 +49,13 @@ public class LoginCredentialProcessor implements ObjectProcessor<LoginCredential
|
||||
writeProxy.write(socketId, user);
|
||||
System.out.println("Sending contacts...");
|
||||
writeProxy.write(socketId, contacts);
|
||||
System.out.println("Sending unread messages and updating them in the database...");
|
||||
List<Message> pendingMessages = PersistenceManager.getPersistenceManager().getUnreadMessages(new envoy.server.data.User(user));
|
||||
pendingMessages.forEach((msg) -> {
|
||||
msg.setReceivedDate(new Date());
|
||||
msg.setStatus(MessageStatus.RECEIVED);
|
||||
PersistenceManager.getPersistenceManager().updateMessage(msg);
|
||||
});
|
||||
writeProxy.write(socketId, pendingMessages);
|
||||
}
|
||||
}
|
47
src/main/java/envoy/server/processors/MessageProcessor.java
Normal file
47
src/main/java/envoy/server/processors/MessageProcessor.java
Normal file
@ -0,0 +1,47 @@
|
||||
package envoy.server.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
|
||||
import envoy.data.Message;
|
||||
import envoy.event.MessageStatusChangeEvent;
|
||||
import envoy.server.ConnectionManager;
|
||||
import envoy.server.ObjectProcessor;
|
||||
import envoy.server.database.PersistenceManager;
|
||||
import envoy.server.net.ObjectWriteProxy;
|
||||
|
||||
/**
|
||||
* This {@link ObjectProcessor} handles incoming {@link Message}s.<br>
|
||||
* <br>
|
||||
* Project: <strong>envoy-server-standalone</strong><br>
|
||||
* File: <strong>MessageProcessor.java</strong><br>
|
||||
* Created: <strong>30.12.2019</strong><br>
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
* @since Envoy Server Standalone v0.1-alpha
|
||||
*/
|
||||
public class MessageProcessor implements ObjectProcessor<Message> {
|
||||
|
||||
@Override
|
||||
public Class<Message> getInputClass() { return Message.class; }
|
||||
|
||||
@Override
|
||||
public void process(Message message, long socketId, ObjectWriteProxy writeProxy) {
|
||||
|
||||
ConnectionManager connectionManager = ConnectionManager.getInstance();
|
||||
message.nextStatus();
|
||||
if (connectionManager.isOnline(message.getRecipientId())) try {// if recipient is online, he receives the message directly
|
||||
writeProxy.write(connectionManager.getSocketId(message.getRecipientId()), message);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Recipient online. Failed to send message" + message.getId());
|
||||
e.printStackTrace();
|
||||
}
|
||||
try {// sender receives confirmation that the server received the message
|
||||
writeProxy.write(connectionManager.getSocketId(message.getSenderId()),
|
||||
new MessageStatusChangeEvent(message.getId(), message.getStatus(), new Date()));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
PersistenceManager.getPersistenceManager().addMessage(new envoy.server.data.Message(message));
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user