Implemented receiving unread messages using a message cache

Fixes #98
This commit is contained in:
Kai S. K. Engelbart 2020-02-04 19:13:31 +01:00
parent 7e0ae2e831
commit 44541936d3
4 changed files with 84 additions and 10 deletions

View File

@ -24,8 +24,8 @@ public class Chat implements Serializable {
private static final long serialVersionUID = -7751248474547242056L; private static final long serialVersionUID = -7751248474547242056L;
private User recipient; private final User recipient;
private ComponentListModel<Message> model = new ComponentListModel<>(); private final ComponentListModel<Message> model = new ComponentListModel<>();
/** /**
* Provides the list of messages that the recipient receives.<br> * Provides the list of messages that the recipient receives.<br>

View File

@ -10,6 +10,7 @@ import java.util.logging.Logger;
import javax.naming.TimeLimitExceededException; import javax.naming.TimeLimitExceededException;
import envoy.client.database.LocalDb; import envoy.client.database.LocalDb;
import envoy.client.net.MessageCache;
import envoy.client.net.ReceivedMessageProcessor; import envoy.client.net.ReceivedMessageProcessor;
import envoy.client.net.Receiver; import envoy.client.net.Receiver;
import envoy.client.util.EnvoyLog; import envoy.client.util.EnvoyLog;
@ -48,14 +49,16 @@ public class Client implements Closeable {
* will block for up to 5 seconds. If the handshake does exceed this time limit, * will block for up to 5 seconds. If the handshake does exceed this time limit,
* an exception is thrown. * an exception is thrown.
* *
* @param credentials the login credentials of the user * @param credentials the login credentials of the user
* @param localDb the local database used to persist the current * @param localDb the local database used to persist the current
* {@link IdGenerator} * {@link IdGenerator}
* @return a message cache containing all unread messages from the server that
* can be relayed after initialization
* @throws Exception if the online mode could not be entered or the request * @throws Exception if the online mode could not be entered or the request
* failed for some other reason * failed for some other reason
* @since Envoy v0.2-alpha * @since Envoy v0.2-alpha
*/ */
public void onlineInit(LoginCredentials credentials, LocalDb localDb) throws Exception { public MessageCache onlineInit(LoginCredentials credentials, LocalDb localDb) throws Exception {
// Establish TCP connection // Establish TCP connection
logger.info(String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort())); logger.info(String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort()));
socket = new Socket(config.getServer(), config.getPort()); socket = new Socket(config.getServer(), config.getPort());
@ -64,9 +67,13 @@ public class Client implements Closeable {
// Create message receiver // Create message receiver
receiver = new Receiver(socket.getInputStream()); receiver = new Receiver(socket.getInputStream());
// Register user creation processor // Create cache for unread messages
final MessageCache cache = new MessageCache();
// Register user creation processor, contact list processor and message cache
receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; }); receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; });
receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; }); receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; });
receiver.registerProcessor(Message.class, cache);
// Start receiver // Start receiver
new Thread(receiver).start(); new Thread(receiver).start();
@ -89,7 +96,12 @@ public class Client implements Closeable {
receiver.removeAllProcessors(); receiver.removeAllProcessors();
// Register processors for message and status handling // Register processors for message and status handling
receiver.registerProcessor(Message.class, new ReceivedMessageProcessor()); final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor();
receiver.registerProcessor(Message.class, receivedMessageProcessor);
// Relay cached unread messages
cache.setProcessor(receivedMessageProcessor);
// TODO: Status handling // TODO: Status handling
// Process message ID generation // Process message ID generation
@ -97,6 +109,8 @@ public class Client implements Closeable {
// Request a generator if none is present // Request a generator if none is present
if (!localDb.hasIdGenerator() || !localDb.getIdGenerator().hasNext()) requestIdGenerator(); if (!localDb.hasIdGenerator() || !localDb.getIdGenerator().hasNext()) requestIdGenerator();
return cache;
} }
/** /**

View File

@ -0,0 +1,54 @@
package envoy.client.net;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.logging.Logger;
import envoy.client.util.EnvoyLog;
import envoy.data.Message;
/**
* Stores messages in a queue until the application initialization is complete.
* The messages can then be relayed to a processor.<br>
* <br>
* Project: <strong>envoy-client</strong><br>
* File: <strong>MessageCache.java</strong><br>
* Created: <strong>4 Feb 2020</strong><br>
*
* @author Kai S. K. Engelbart
* @since Envoy v0.3-alpha
*/
public class MessageCache implements Consumer<Message> {
private final Queue<Message> messages = new LinkedList<>();
private Consumer<Message> processor;
private static final Logger logger = EnvoyLog.getLogger(MessageCache.class.getSimpleName());
/**
* Adds a message to the cache.
*
* @since Envoy v0.3-alpha
*/
@Override
public void accept(Message message) {
logger.info(String.format("Adding message %s to cache", message));
messages.add(message);
}
/**
* Sets the processor to which messages are relayed.
*
* @param processor the processor to set
* @since Envoy v0.3-alpha
*/
public void setProcessor(Consumer<Message> processor) { this.processor = processor; }
/**
* Relays all cached messages to the processor.
*
* @since Envoy v0.3-alpha
*/
public void relayMessages() { messages.forEach(processor::accept); }
}

View File

@ -17,6 +17,7 @@ import envoy.client.Settings;
import envoy.client.database.LocalDb; import envoy.client.database.LocalDb;
import envoy.client.database.PersistentLocalDb; import envoy.client.database.PersistentLocalDb;
import envoy.client.database.TransientLocalDb; import envoy.client.database.TransientLocalDb;
import envoy.client.net.MessageCache;
import envoy.client.util.EnvoyLog; import envoy.client.util.EnvoyLog;
import envoy.data.LoginCredentials; import envoy.data.LoginCredentials;
import envoy.data.User; import envoy.data.User;
@ -106,11 +107,12 @@ public class Startup {
// Acquire the client user (with ID) either from the server or from the local // Acquire the client user (with ID) either from the server or from the local
// database, which triggers offline mode // database, which triggers offline mode
Client client = new Client(); Client client = new Client();
MessageCache cache = null;
try { try {
// Try entering online mode first // Try entering online mode first
localDb.loadIdGenerator(); localDb.loadIdGenerator();
client.onlineInit(credentials, localDb); cache = client.onlineInit(credentials, localDb);
} catch (Exception e1) { } catch (Exception e1) {
logger.warning("Could not connect to server. Trying offline mode..."); logger.warning("Could not connect to server. Trying offline mode...");
e1.printStackTrace(); e1.printStackTrace();
@ -151,6 +153,7 @@ public class Startup {
// Save all users to the local database // Save all users to the local database
if (client.isOnline()) localDb.setUsers(client.getUsers()); if (client.isOnline()) localDb.setUsers(client.getUsers());
// Display ChatWindow and StatusTrayIcon
EventQueue.invokeLater(() -> { EventQueue.invokeLater(() -> {
try { try {
chatWindow.setClient(client); chatWindow.setClient(client);
@ -174,6 +177,9 @@ public class Startup {
} }
}); });
// Relay unread messages from cache
if (cache != null) cache.relayMessages();
// Save Settings and PersistentLocalDb on shutdown // Save Settings and PersistentLocalDb on shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try { try {