Fixed threading issue when receiving unread messages

This commit is contained in:
Kai S. K. Engelbart 2020-02-08 11:43:02 +01:00
parent f76d0cdeb4
commit e6c4139b4d
2 changed files with 14 additions and 18 deletions

View File

@ -51,16 +51,17 @@ public class Client implements Closeable {
* will block for up to 5 seconds. If the handshake does exceed this time limit, * will block for up to 5 seconds. If the handshake does exceed this time limit,
* an exception is thrown. * an exception is thrown.
* *
* @param credentials the login credentials of the user * @param credentials the login credentials of the user
* @param localDb the local database used to persist the current * @param localDb the local database used to persist the current
* {@link IdGenerator} * {@link IdGenerator}
* @return a message cache containing all unread messages from the server that * @param receivedMessageCache a message cache containing all unread messages
* can be relayed after initialization * from the server that can be relayed after
* initialization
* @throws Exception if the online mode could not be entered or the request * @throws Exception if the online mode could not be entered or the request
* failed for some other reason * failed for some other reason
* @since Envoy v0.2-alpha * @since Envoy v0.2-alpha
*/ */
public Cache<Message> onlineInit(LoginCredentials credentials, LocalDb localDb) throws Exception { public void onlineInit(LoginCredentials credentials, LocalDb localDb, Cache<Message> receivedMessageCache) throws Exception {
// Establish TCP connection // Establish TCP connection
logger.info(String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort())); logger.info(String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort()));
socket = new Socket(config.getServer(), config.getPort()); socket = new Socket(config.getServer(), config.getPort());
@ -69,13 +70,10 @@ public class Client implements Closeable {
// Create message receiver // Create message receiver
receiver = new Receiver(socket.getInputStream()); receiver = new Receiver(socket.getInputStream());
// Create cache for unread messages
final Cache<Message> cache = new Cache<>();
// Register user creation processor, contact list processor and message cache // Register user creation processor, contact list processor and message cache
receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; }); receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; });
receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; }); receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; });
receiver.registerProcessor(Message.class, cache); receiver.registerProcessor(Message.class, receivedMessageCache);
// Start receiver // Start receiver
new Thread(receiver).start(); new Thread(receiver).start();
@ -104,7 +102,7 @@ public class Client implements Closeable {
receiver.registerProcessor(Message.class, receivedMessageProcessor); receiver.registerProcessor(Message.class, receivedMessageProcessor);
// Relay cached unread messages // Relay cached unread messages
cache.setProcessor(receivedMessageProcessor); receivedMessageCache.setProcessor(receivedMessageProcessor);
// Process message status changes // Process message status changes
receiver.registerProcessor(MessageStatusChangeEvent.class, new MessageStatusChangeEventProcessor()); receiver.registerProcessor(MessageStatusChangeEvent.class, new MessageStatusChangeEventProcessor());
@ -117,8 +115,6 @@ public class Client implements Closeable {
// Request a generator if none is present or the existing one is consumed // Request a generator if none is present or the existing one is consumed
if (!localDb.hasIdGenerator() || !localDb.getIdGenerator().hasNext()) requestIdGenerator(); if (!localDb.hasIdGenerator() || !localDb.getIdGenerator().hasNext()) requestIdGenerator();
return cache;
} }
/** /**

View File

@ -106,11 +106,11 @@ public class Startup {
// Acquire the client user (with ID) either from the server or from the local // Acquire the client user (with ID) either from the server or from the local
// database, which triggers offline mode // database, which triggers offline mode
Client client = new Client(); Client client = new Client();
Cache<Message> cache = null; Cache<Message> cache = new Cache<>();
try { try {
// Try entering online mode first // Try entering online mode first
localDb.loadIdGenerator(); localDb.loadIdGenerator();
cache = client.onlineInit(credentials, localDb); client.onlineInit(credentials, localDb, cache);
} catch (Exception e1) { } catch (Exception e1) {
logger.warning("Could not connect to server. Trying offline mode..."); logger.warning("Could not connect to server. Trying offline mode...");
e1.printStackTrace(); e1.printStackTrace();
@ -162,6 +162,9 @@ public class Startup {
try { try {
chatWindow.initContent(client, localDb, writeProxy); chatWindow.initContent(client, localDb, writeProxy);
// Relay unread messages from cache
if (cache != null) cache.relay();
try { try {
new StatusTrayIcon(chatWindow).show(); new StatusTrayIcon(chatWindow).show();
@ -180,9 +183,6 @@ public class Startup {
} }
}); });
// Relay unread messages from cache
if (cache != null) cache.relay();
// Save Settings and PersistentLocalDb on shutdown // Save Settings and PersistentLocalDb on shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try { try {