From 655ac58f3293ef6f6b6f79975bbe7a6d7aa8eed9 Mon Sep 17 00:00:00 2001 From: CyB3RC0nN0R Date: Thu, 2 Jan 2020 17:11:41 +0200 Subject: [PATCH] Added contact initialization to handshake and ReceivedMessageProcessor --- src/main/java/envoy/client/Client.java | 29 +++- src/main/java/envoy/client/LocalDB.java | 160 ------------------ .../client/ReceivedMessageProcessor.java | 32 ++++ src/main/java/envoy/client/Receiver.java | 12 +- 4 files changed, 63 insertions(+), 170 deletions(-) create mode 100644 src/main/java/envoy/client/ReceivedMessageProcessor.java diff --git a/src/main/java/envoy/client/Client.java b/src/main/java/envoy/client/Client.java index 37fc456..4885a92 100644 --- a/src/main/java/envoy/client/Client.java +++ b/src/main/java/envoy/client/Client.java @@ -7,11 +7,10 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; +import javax.naming.TimeLimitExceededException; + import envoy.client.util.EnvoyLog; -import envoy.data.LoginCredentials; -import envoy.data.Message; -import envoy.data.User; -import envoy.exception.EnvoyException; +import envoy.data.*; import envoy.util.SerializationUtils; /** @@ -33,12 +32,17 @@ public class Client implements Closeable { private volatile User sender; private User recipient; + private volatile Contacts contacts; + private Config config = Config.getInstance(); private static final Logger logger = EnvoyLog.getLogger(Client.class.getSimpleName()); /** - * Enters the online mode by acquiring a user ID from the server. + * Enters the online mode by acquiring a user ID from the server. As a + * connection has to be established and a handshake has to be made, this method + * will block for up to 5 seconds. If the handshake does exceed this time limit, + * an exception is thrown. * * @param credentials the login credentials of the user * @throws Exception if the online mode could not be entered or the request @@ -56,6 +60,7 @@ public class Client implements Closeable { // Register user creation processor receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; }); + receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; }); // Start receiver new Thread(receiver).start(); @@ -66,12 +71,19 @@ public class Client implements Closeable { // Wait for a maximum of five seconds to acquire the sender object long start = System.currentTimeMillis(); - while (sender == null) { - if (System.currentTimeMillis() - start > 5000) throw new EnvoyException("Did not log in after 5 seconds"); + while (sender == null || contacts == null) { + if (System.currentTimeMillis() - start > 5000) throw new TimeLimitExceededException("Did not log in after 5 seconds"); Thread.sleep(500); } + logger.info("Handshake completed."); online = true; + + // Remove user creation processor + receiver.removeAllProcessors(); + + // Register processors for message and status handling + receiver.registerProcessor(Message.class, new ReceivedMessageProcessor()); } /** @@ -84,6 +96,7 @@ public class Client implements Closeable { public void sendMessage(Message message) throws IOException { checkOnline(); SerializationUtils.writeBytesWithLength(message, socket.getOutputStream()); + message.nextStatus(); } /** @@ -94,7 +107,7 @@ public class Client implements Closeable { public Map getUsers() { checkOnline(); Map users = new HashMap<>(); - sender.getContacts().forEach(u -> users.put(u.getName(), u)); + contacts.getContacts().forEach(u -> users.put(u.getName(), u)); return users; } diff --git a/src/main/java/envoy/client/LocalDB.java b/src/main/java/envoy/client/LocalDB.java index 5bc5421..2c40e65 100644 --- a/src/main/java/envoy/client/LocalDB.java +++ b/src/main/java/envoy/client/LocalDB.java @@ -84,166 +84,6 @@ public class LocalDB { */ public void loadChats() throws ClassNotFoundException, IOException { chats = SerializationUtils.read(localDBFile, ArrayList.class); } - // /** - // * Creates a {@link Sync} object filled with the changes that occurred to the - // * local database since the last synchronization. - // * - // * @param userId the ID of the user that is synchronized by this client - // * @return {@link Sync} object filled with the current changes - // * @since Envoy v0.1-alpha - // */ - // public Sync fillSync(long userId) { - // addWaitingMessagesToSync(); - // - // sync.getMessages().addAll(readMessages.getMessages()); - // readMessages.getMessages().clear(); - // - // logger.finest(String.format("Filled sync with %d messages.", - // sync.getMessages().size())); - // return sync; - // } - // - // /** - // * Applies the changes carried by a {@link Sync} object to the local database - // * - // * @param returnSync the {@link Sync} object to apply - // * @since Envoy v0.1-alpha - // */ - // public void applySync(Sync returnSync) { - // for (int i = 0; i < returnSync.getMessages().size(); i++) { - // - // // The message has an ID - // if (returnSync.getMessages().get(i).getMetadata().getMessageId() != 0) { - // - // // Messages are processes differently corresponding to their state - // switch (returnSync.getMessages().get(i).getMetadata().getState()) { - // case SENT: - // // Update previously waiting and now sent messages that were assigned an ID - // by - // // the server - // sync.getMessages().get(i).getMetadata().setMessageId(returnSync.getMessages().get(i).getMetadata().getMessageId()); - // sync.getMessages().get(i).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState()); - // break; - // case RECEIVED: - // if (returnSync.getMessages().get(i).getMetadata().getSender() != 0) { - // // these are the unread Messages from the server - // unreadMessagesSync.getMessages().add(returnSync.getMessages().get(i)); - // - // // Create and dispatch message creation event - // EventBus.getInstance().dispatch(new - // MessageCreationEvent(returnSync.getMessages().get(i))); - // } else { - // // Update Messages in localDB to state RECEIVED - // for (Chat chat : getChats()) - // if (chat.getRecipient().getId() == - // returnSync.getMessages().get(i).getMetadata().getRecipient()) - // for (int j = 0; j < chat.getModel().getSize(); j++) - // if (chat.getModel().get(j).getMetadata().getMessageId() == - // returnSync.getMessages() - // .get(i) - // .getMetadata() - // .getMessageId()) - // chat.getModel().get(j).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState()); - // } - // break; - // case READ: - // // Update local Messages to state READ - // logger.info("Message with ID: " + - // returnSync.getMessages().get(i).getMetadata().getMessageId() - // + "was initialized to be set to READ in localDB."); - // for (Chat chat : getChats()) - // if (chat.getRecipient().getId() == - // returnSync.getMessages().get(i).getMetadata().getRecipient()) { - // logger.info("Chat with: " + chat.getRecipient().getId() + "was selected."); - // for (int k = 0; k < chat.getModel().getSize(); k++) - // if (chat.getModel().get(k).getMetadata().getMessageId() == - // returnSync.getMessages() - // .get(i) - // .getMetadata() - // .getMessageId()) { - // logger.info("Message with ID: " + - // chat.getModel().get(k).getMetadata().getMessageId() + "was selected."); - // chat.getModel().get(k).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState()); - // logger.info("Message State is now: " + - // chat.getModel().get(k).getMetadata().getState()); - // } - // } - // break; - // } - // } - // } - // - // // Updating UserStatus of all users in LocalDB - // for (User user : returnSync.getUsers()) - // for (Chat chat : getChats()) - // if (user.getId() == chat.getRecipient().getId()) - // chat.getRecipient().setStatus(user.getStatus()); - // - // sync.getMessages().clear(); - // sync.getUsers().clear(); - // } - // - // /** - // * Adds the unread messages returned from the server in the latest sync to the - // * right chats in the LocalDB. - // * - // * @since Envoy v0.1-alpha - // */ - // public void addUnreadMessagesToLocalDB() { - // for (Message message : unreadMessagesSync.getMessages()) - // for (Chat chat : getChats()) - // if (message.getMetadata().getSender() == chat.getRecipient().getId()) { - // chat.appendMessage(message); - // break; - // } - // } - // - // /** - // * Changes all messages with state {@code RECEIVED} of a specific chat to - // state - // * {@code READ}. - // *
- // * Adds these messages to the {@code readMessages} {@link Sync} object. - // * - // * @param currentChat the {@link Chat} that was just opened - // * @since Envoy v0.1-alpha - // */ - // public void setMessagesToRead(Chat currentChat) { - // for (int i = currentChat.getModel().size() - 1; i >= 0; --i) - // if (currentChat.getModel().get(i).getMetadata().getRecipient() != - // currentChat.getRecipient().getId()) - // if (currentChat.getModel().get(i).getMetadata().getState() == - // MessageState.RECEIVED) { - // currentChat.getModel().get(i).getMetadata().setState(MessageState.READ); - // readMessages.getMessages().add(currentChat.getModel().get(i)); - // } else break; - // } - // - // /** - // * Adds all messages with state {@code WAITING} from the {@link LocalDB} to - // the - // * {@link Sync} object. - // * - // * @since Envoy v0.1-alpha - // */ - // private void addWaitingMessagesToSync() { - // for (Chat chat : getChats()) - // for (int i = 0; i < chat.getModel().size(); i++) - // if (chat.getModel().get(i).getMetadata().getState() == MessageState.WAITING) - // { - // logger.info("Got Waiting Message"); - // sync.getMessages().add(chat.getModel().get(i)); - // } - // } - // - // /** - // * Clears the {@code unreadMessagesSync} {@link Sync} object. - // * - // * @since Envoy v0.1-alpha - // */ - // public void clearUnreadMessagesSync() { - // unreadMessagesSync.getMessages().clear(); } - /** * @return a {@code Map} of all users stored locally with their * user names as keys diff --git a/src/main/java/envoy/client/ReceivedMessageProcessor.java b/src/main/java/envoy/client/ReceivedMessageProcessor.java new file mode 100644 index 0000000..d408993 --- /dev/null +++ b/src/main/java/envoy/client/ReceivedMessageProcessor.java @@ -0,0 +1,32 @@ +package envoy.client; + +import java.util.function.Consumer; +import java.util.logging.Logger; + +import envoy.client.event.MessageCreationEvent; +import envoy.client.util.EnvoyLog; +import envoy.data.Message; +import envoy.data.Message.MessageStatus; +import envoy.event.EventBus; + +/** + * Project: envoy-client
+ * File: ReceivedMessageProcessor.java
+ * Created: 31.12.2019
+ * + * @author Kai S. K. Engelbart + * @since Envoy v0.3-alpha + */ +public class ReceivedMessageProcessor implements Consumer { + + private static final Logger logger = EnvoyLog.getLogger(ReceivedMessageProcessor.class.getSimpleName()); + + @Override + public void accept(Message message) { + logger.info("Received message object " + message); + if (message.getStatus() != MessageStatus.SENT) logger.warning("The message has the unexpected status " + message.getStatus()); + else + // Dispatch event + EventBus.getInstance().dispatch(new MessageCreationEvent(message)); + } +} diff --git a/src/main/java/envoy/client/Receiver.java b/src/main/java/envoy/client/Receiver.java index f70d6f9..abe4687 100644 --- a/src/main/java/envoy/client/Receiver.java +++ b/src/main/java/envoy/client/Receiver.java @@ -2,6 +2,7 @@ package envoy.client; import java.io.InputStream; import java.io.ObjectInputStream; +import java.net.SocketException; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; @@ -20,8 +21,8 @@ import envoy.client.util.EnvoyLog; */ public class Receiver implements Runnable { - private InputStream in; - private Map, Consumer> processors = new HashMap<>(); + private final InputStream in; + private final Map, Consumer> processors = new HashMap<>(); private static final Logger logger = EnvoyLog.getLogger(Receiver.class.getSimpleName()); @@ -47,6 +48,8 @@ public class Receiver implements Runnable { logger.severe(String.format("The received object has the class %s for which no processor is defined.", obj.getClass())); else processor.accept(obj); } + } catch (SocketException e) { + logger.info("Connection probably closed by client. Exiting receiver thread..."); } catch (Exception e) { logger.log(Level.SEVERE, "Error on receiver thread", e); } @@ -60,4 +63,9 @@ public class Receiver implements Runnable { * @param processor the object processor */ public void registerProcessor(Class processorClass, Consumer processor) { processors.put(processorClass, processor); } + + /** + * Removes all object processors registered at this {@link Receiver}. + */ + public void removeAllProcessors() { processors.clear(); } } \ No newline at end of file