Added contact initialization to handshake and ReceivedMessageProcessor
This commit is contained in:
parent
34785dc7f5
commit
655ac58f32
@ -7,11 +7,10 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import javax.naming.TimeLimitExceededException;
|
||||
|
||||
import envoy.client.util.EnvoyLog;
|
||||
import envoy.data.LoginCredentials;
|
||||
import envoy.data.Message;
|
||||
import envoy.data.User;
|
||||
import envoy.exception.EnvoyException;
|
||||
import envoy.data.*;
|
||||
import envoy.util.SerializationUtils;
|
||||
|
||||
/**
|
||||
@ -33,12 +32,17 @@ public class Client implements Closeable {
|
||||
private volatile User sender;
|
||||
private User recipient;
|
||||
|
||||
private volatile Contacts contacts;
|
||||
|
||||
private Config config = Config.getInstance();
|
||||
|
||||
private static final Logger logger = EnvoyLog.getLogger(Client.class.getSimpleName());
|
||||
|
||||
/**
|
||||
* Enters the online mode by acquiring a user ID from the server.
|
||||
* Enters the online mode by acquiring a user ID from the server. As a
|
||||
* connection has to be established and a handshake has to be made, this method
|
||||
* will block for up to 5 seconds. If the handshake does exceed this time limit,
|
||||
* an exception is thrown.
|
||||
*
|
||||
* @param credentials the login credentials of the user
|
||||
* @throws Exception if the online mode could not be entered or the request
|
||||
@ -56,6 +60,7 @@ public class Client implements Closeable {
|
||||
|
||||
// Register user creation processor
|
||||
receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; });
|
||||
receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; });
|
||||
|
||||
// Start receiver
|
||||
new Thread(receiver).start();
|
||||
@ -66,12 +71,19 @@ public class Client implements Closeable {
|
||||
|
||||
// Wait for a maximum of five seconds to acquire the sender object
|
||||
long start = System.currentTimeMillis();
|
||||
while (sender == null) {
|
||||
if (System.currentTimeMillis() - start > 5000) throw new EnvoyException("Did not log in after 5 seconds");
|
||||
while (sender == null || contacts == null) {
|
||||
if (System.currentTimeMillis() - start > 5000) throw new TimeLimitExceededException("Did not log in after 5 seconds");
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
logger.info("Handshake completed.");
|
||||
online = true;
|
||||
|
||||
// Remove user creation processor
|
||||
receiver.removeAllProcessors();
|
||||
|
||||
// Register processors for message and status handling
|
||||
receiver.registerProcessor(Message.class, new ReceivedMessageProcessor());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,6 +96,7 @@ public class Client implements Closeable {
|
||||
public void sendMessage(Message message) throws IOException {
|
||||
checkOnline();
|
||||
SerializationUtils.writeBytesWithLength(message, socket.getOutputStream());
|
||||
message.nextStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -94,7 +107,7 @@ public class Client implements Closeable {
|
||||
public Map<String, User> getUsers() {
|
||||
checkOnline();
|
||||
Map<String, User> users = new HashMap<>();
|
||||
sender.getContacts().forEach(u -> users.put(u.getName(), u));
|
||||
contacts.getContacts().forEach(u -> users.put(u.getName(), u));
|
||||
return users;
|
||||
}
|
||||
|
||||
|
@ -84,166 +84,6 @@ public class LocalDB {
|
||||
*/
|
||||
public void loadChats() throws ClassNotFoundException, IOException { chats = SerializationUtils.read(localDBFile, ArrayList.class); }
|
||||
|
||||
// /**
|
||||
// * Creates a {@link Sync} object filled with the changes that occurred to the
|
||||
// * local database since the last synchronization.
|
||||
// *
|
||||
// * @param userId the ID of the user that is synchronized by this client
|
||||
// * @return {@link Sync} object filled with the current changes
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// public Sync fillSync(long userId) {
|
||||
// addWaitingMessagesToSync();
|
||||
//
|
||||
// sync.getMessages().addAll(readMessages.getMessages());
|
||||
// readMessages.getMessages().clear();
|
||||
//
|
||||
// logger.finest(String.format("Filled sync with %d messages.",
|
||||
// sync.getMessages().size()));
|
||||
// return sync;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Applies the changes carried by a {@link Sync} object to the local database
|
||||
// *
|
||||
// * @param returnSync the {@link Sync} object to apply
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// public void applySync(Sync returnSync) {
|
||||
// for (int i = 0; i < returnSync.getMessages().size(); i++) {
|
||||
//
|
||||
// // The message has an ID
|
||||
// if (returnSync.getMessages().get(i).getMetadata().getMessageId() != 0) {
|
||||
//
|
||||
// // Messages are processes differently corresponding to their state
|
||||
// switch (returnSync.getMessages().get(i).getMetadata().getState()) {
|
||||
// case SENT:
|
||||
// // Update previously waiting and now sent messages that were assigned an ID
|
||||
// by
|
||||
// // the server
|
||||
// sync.getMessages().get(i).getMetadata().setMessageId(returnSync.getMessages().get(i).getMetadata().getMessageId());
|
||||
// sync.getMessages().get(i).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
|
||||
// break;
|
||||
// case RECEIVED:
|
||||
// if (returnSync.getMessages().get(i).getMetadata().getSender() != 0) {
|
||||
// // these are the unread Messages from the server
|
||||
// unreadMessagesSync.getMessages().add(returnSync.getMessages().get(i));
|
||||
//
|
||||
// // Create and dispatch message creation event
|
||||
// EventBus.getInstance().dispatch(new
|
||||
// MessageCreationEvent(returnSync.getMessages().get(i)));
|
||||
// } else {
|
||||
// // Update Messages in localDB to state RECEIVED
|
||||
// for (Chat chat : getChats())
|
||||
// if (chat.getRecipient().getId() ==
|
||||
// returnSync.getMessages().get(i).getMetadata().getRecipient())
|
||||
// for (int j = 0; j < chat.getModel().getSize(); j++)
|
||||
// if (chat.getModel().get(j).getMetadata().getMessageId() ==
|
||||
// returnSync.getMessages()
|
||||
// .get(i)
|
||||
// .getMetadata()
|
||||
// .getMessageId())
|
||||
// chat.getModel().get(j).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
|
||||
// }
|
||||
// break;
|
||||
// case READ:
|
||||
// // Update local Messages to state READ
|
||||
// logger.info("Message with ID: " +
|
||||
// returnSync.getMessages().get(i).getMetadata().getMessageId()
|
||||
// + "was initialized to be set to READ in localDB.");
|
||||
// for (Chat chat : getChats())
|
||||
// if (chat.getRecipient().getId() ==
|
||||
// returnSync.getMessages().get(i).getMetadata().getRecipient()) {
|
||||
// logger.info("Chat with: " + chat.getRecipient().getId() + "was selected.");
|
||||
// for (int k = 0; k < chat.getModel().getSize(); k++)
|
||||
// if (chat.getModel().get(k).getMetadata().getMessageId() ==
|
||||
// returnSync.getMessages()
|
||||
// .get(i)
|
||||
// .getMetadata()
|
||||
// .getMessageId()) {
|
||||
// logger.info("Message with ID: " +
|
||||
// chat.getModel().get(k).getMetadata().getMessageId() + "was selected.");
|
||||
// chat.getModel().get(k).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
|
||||
// logger.info("Message State is now: " +
|
||||
// chat.getModel().get(k).getMetadata().getState());
|
||||
// }
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Updating UserStatus of all users in LocalDB
|
||||
// for (User user : returnSync.getUsers())
|
||||
// for (Chat chat : getChats())
|
||||
// if (user.getId() == chat.getRecipient().getId())
|
||||
// chat.getRecipient().setStatus(user.getStatus());
|
||||
//
|
||||
// sync.getMessages().clear();
|
||||
// sync.getUsers().clear();
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Adds the unread messages returned from the server in the latest sync to the
|
||||
// * right chats in the LocalDB.
|
||||
// *
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// public void addUnreadMessagesToLocalDB() {
|
||||
// for (Message message : unreadMessagesSync.getMessages())
|
||||
// for (Chat chat : getChats())
|
||||
// if (message.getMetadata().getSender() == chat.getRecipient().getId()) {
|
||||
// chat.appendMessage(message);
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Changes all messages with state {@code RECEIVED} of a specific chat to
|
||||
// state
|
||||
// * {@code READ}.
|
||||
// * <br>
|
||||
// * Adds these messages to the {@code readMessages} {@link Sync} object.
|
||||
// *
|
||||
// * @param currentChat the {@link Chat} that was just opened
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// public void setMessagesToRead(Chat currentChat) {
|
||||
// for (int i = currentChat.getModel().size() - 1; i >= 0; --i)
|
||||
// if (currentChat.getModel().get(i).getMetadata().getRecipient() !=
|
||||
// currentChat.getRecipient().getId())
|
||||
// if (currentChat.getModel().get(i).getMetadata().getState() ==
|
||||
// MessageState.RECEIVED) {
|
||||
// currentChat.getModel().get(i).getMetadata().setState(MessageState.READ);
|
||||
// readMessages.getMessages().add(currentChat.getModel().get(i));
|
||||
// } else break;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Adds all messages with state {@code WAITING} from the {@link LocalDB} to
|
||||
// the
|
||||
// * {@link Sync} object.
|
||||
// *
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// private void addWaitingMessagesToSync() {
|
||||
// for (Chat chat : getChats())
|
||||
// for (int i = 0; i < chat.getModel().size(); i++)
|
||||
// if (chat.getModel().get(i).getMetadata().getState() == MessageState.WAITING)
|
||||
// {
|
||||
// logger.info("Got Waiting Message");
|
||||
// sync.getMessages().add(chat.getModel().get(i));
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * Clears the {@code unreadMessagesSync} {@link Sync} object.
|
||||
// *
|
||||
// * @since Envoy v0.1-alpha
|
||||
// */
|
||||
// public void clearUnreadMessagesSync() {
|
||||
// unreadMessagesSync.getMessages().clear(); }
|
||||
|
||||
/**
|
||||
* @return a {@code Map<String, User>} of all users stored locally with their
|
||||
* user names as keys
|
||||
|
32
src/main/java/envoy/client/ReceivedMessageProcessor.java
Normal file
32
src/main/java/envoy/client/ReceivedMessageProcessor.java
Normal file
@ -0,0 +1,32 @@
|
||||
package envoy.client;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import envoy.client.event.MessageCreationEvent;
|
||||
import envoy.client.util.EnvoyLog;
|
||||
import envoy.data.Message;
|
||||
import envoy.data.Message.MessageStatus;
|
||||
import envoy.event.EventBus;
|
||||
|
||||
/**
|
||||
* Project: <strong>envoy-client</strong><br>
|
||||
* File: <strong>ReceivedMessageProcessor.java</strong><br>
|
||||
* Created: <strong>31.12.2019</strong><br>
|
||||
*
|
||||
* @author Kai S. K. Engelbart
|
||||
* @since Envoy v0.3-alpha
|
||||
*/
|
||||
public class ReceivedMessageProcessor implements Consumer<Message> {
|
||||
|
||||
private static final Logger logger = EnvoyLog.getLogger(ReceivedMessageProcessor.class.getSimpleName());
|
||||
|
||||
@Override
|
||||
public void accept(Message message) {
|
||||
logger.info("Received message object " + message);
|
||||
if (message.getStatus() != MessageStatus.SENT) logger.warning("The message has the unexpected status " + message.getStatus());
|
||||
else
|
||||
// Dispatch event
|
||||
EventBus.getInstance().dispatch(new MessageCreationEvent(message));
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ package envoy.client;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.net.SocketException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
@ -20,8 +21,8 @@ import envoy.client.util.EnvoyLog;
|
||||
*/
|
||||
public class Receiver implements Runnable {
|
||||
|
||||
private InputStream in;
|
||||
private Map<Class<?>, Consumer<?>> processors = new HashMap<>();
|
||||
private final InputStream in;
|
||||
private final Map<Class<?>, Consumer<?>> processors = new HashMap<>();
|
||||
|
||||
private static final Logger logger = EnvoyLog.getLogger(Receiver.class.getSimpleName());
|
||||
|
||||
@ -47,6 +48,8 @@ public class Receiver implements Runnable {
|
||||
logger.severe(String.format("The received object has the class %s for which no processor is defined.", obj.getClass()));
|
||||
else processor.accept(obj);
|
||||
}
|
||||
} catch (SocketException e) {
|
||||
logger.info("Connection probably closed by client. Exiting receiver thread...");
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "Error on receiver thread", e);
|
||||
}
|
||||
@ -60,4 +63,9 @@ public class Receiver implements Runnable {
|
||||
* @param processor the object processor
|
||||
*/
|
||||
public <T> void registerProcessor(Class<T> processorClass, Consumer<T> processor) { processors.put(processorClass, processor); }
|
||||
|
||||
/**
|
||||
* Removes all object processors registered at this {@link Receiver}.
|
||||
*/
|
||||
public void removeAllProcessors() { processors.clear(); }
|
||||
}
|
Reference in New Issue
Block a user