Implemented pendingMessage processing using an extra cache

This commit is contained in:
DieGurke 2020-06-28 13:20:10 +02:00
parent 3446e24043
commit 21ad87d97d
4 changed files with 35 additions and 12 deletions

View File

@ -59,6 +59,12 @@ public class Client implements Closeable {
* from the server that can be relayed * from the server that can be relayed
* after * after
* initialization * initialization
* @param receivedGroupMessageCache a groupMessage cache containing all
* unread
* groupMessages
* from the server that can be relayed
* after
* initialization
* @param receivedMessageStatusChangeCache an event cache containing all * @param receivedMessageStatusChangeCache an event cache containing all
* received messageStatusChangeEvents * received messageStatusChangeEvents
* from the server that can be relayed * from the server that can be relayed
@ -68,7 +74,7 @@ public class Client implements Closeable {
* @throws InterruptedException if the current thread is interrupted while * @throws InterruptedException if the current thread is interrupted while
* waiting for the handshake response * waiting for the handshake response
*/ */
public void performHandshake(LoginCredentials credentials, Cache<Message> receivedMessageCache, public void performHandshake(LoginCredentials credentials, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
Cache<MessageStatusChange> receivedMessageStatusChangeCache) throws TimeoutException, IOException, InterruptedException { Cache<MessageStatusChange> receivedMessageStatusChangeCache) throws TimeoutException, IOException, InterruptedException {
if (online) throw new IllegalStateException("Handshake has already been performed successfully"); if (online) throw new IllegalStateException("Handshake has already been performed successfully");
@ -83,6 +89,7 @@ public class Client implements Closeable {
// Register user creation processor, contact list processor and message cache // Register user creation processor, contact list processor and message cache
receiver.registerProcessor(User.class, sender -> this.sender = sender); receiver.registerProcessor(User.class, sender -> this.sender = sender);
receiver.registerProcessor(Message.class, receivedMessageCache); receiver.registerProcessor(Message.class, receivedMessageCache);
receiver.registerProcessor(GroupMessage.class, receivedGroupMessageCache);
receiver.registerProcessor(MessageStatusChange.class, receivedMessageStatusChangeCache); receiver.registerProcessor(MessageStatusChange.class, receivedMessageStatusChangeCache);
receiver.registerProcessor(HandshakeRejection.class, evt -> { rejected = true; eventBus.dispatch(evt); }); receiver.registerProcessor(HandshakeRejection.class, evt -> { rejected = true; eventBus.dispatch(evt); });
@ -130,6 +137,12 @@ public class Client implements Closeable {
* from the server that can be relayed * from the server that can be relayed
* after * after
* initialization * initialization
* @param receivedGroupMessageCache a groupMessage cache containing all
* unread
* groupMessages
* from the server that can be relayed
* after
* initialization
* @param receivedMessageStatusChangeCache an event cache containing all * @param receivedMessageStatusChangeCache an event cache containing all
* received messageStatusChangeEvents * received messageStatusChangeEvents
* from the server that can be relayed * from the server that can be relayed
@ -138,21 +151,24 @@ public class Client implements Closeable {
* requested from the server * requested from the server
* @since Envoy Client v0.2-alpha * @since Envoy Client v0.2-alpha
*/ */
public void initReceiver(LocalDB localDB, Cache<Message> receivedMessageCache, Cache<MessageStatusChange> receivedMessageStatusChangeCache) public void initReceiver(LocalDB localDB, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
Cache<MessageStatusChange> receivedMessageStatusChangeCache)
throws IOException { throws IOException {
checkOnline(); checkOnline();
// Process incoming messages // Process incoming messages
final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor(); final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor();
final ReceivedGroupMessageProcessor receivedGroupMessageProcessor = new ReceivedGroupMessageProcessor();
final MessageStatusChangeProcessor messageStatusChangeEventProcessor = new MessageStatusChangeProcessor(); final MessageStatusChangeProcessor messageStatusChangeEventProcessor = new MessageStatusChangeProcessor();
// TODO: Define a cache // TODO: Define a cache
receiver.registerProcessor(GroupMessage.class, new ReceivedGroupMessageProcessor()); receiver.registerProcessor(GroupMessage.class, receivedGroupMessageProcessor);
receiver.registerProcessor(Message.class, receivedMessageProcessor); receiver.registerProcessor(Message.class, receivedMessageProcessor);
// Relay cached unread messages // Relay cached unread messages and unread groupMessages
receivedMessageCache.setProcessor(receivedMessageProcessor); receivedMessageCache.setProcessor(receivedMessageProcessor);
receivedGroupMessageCache.setProcessor(receivedGroupMessageProcessor);
// Process message status changes // Process message status changes
receiver.registerProcessor(MessageStatusChange.class, messageStatusChangeEventProcessor); receiver.registerProcessor(MessageStatusChange.class, messageStatusChangeEventProcessor);

View File

@ -71,7 +71,8 @@ public class Receiver extends Thread {
final Consumer processor = processors.get(obj.getClass()); final Consumer processor = processors.get(obj.getClass());
if (processor == null) if (processor == null)
logger.log(Level.WARNING, String.format( logger.log(Level.WARNING, String.format(
"The received object has the class %s for which no processor is defined.", obj.getClass())); "The received object has the %s for which no processor is defined.",
obj.getClass()));
else processor.accept(obj); else processor.accept(obj);
} }
} }

View File

@ -15,6 +15,7 @@ import envoy.client.data.*;
import envoy.client.net.Client; import envoy.client.net.Client;
import envoy.client.ui.SceneContext.SceneInfo; import envoy.client.ui.SceneContext.SceneInfo;
import envoy.client.ui.controller.LoginScene; import envoy.client.ui.controller.LoginScene;
import envoy.data.GroupMessage;
import envoy.data.Message; import envoy.data.Message;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.exception.EnvoyException; import envoy.exception.EnvoyException;
@ -43,6 +44,7 @@ public final class Startup extends Application {
private LocalDB localDB; private LocalDB localDB;
private Client client; private Client client;
private Cache<Message> messageCache; private Cache<Message> messageCache;
private Cache<GroupMessage> groupMessageCache;
private Cache<MessageStatusChange> messageStatusCache; private Cache<MessageStatusChange> messageStatusCache;
private static final ClientConfig config = ClientConfig.getInstance(); private static final ClientConfig config = ClientConfig.getInstance();
@ -99,6 +101,7 @@ public final class Startup extends Application {
// Initialize client and unread message cache // Initialize client and unread message cache
client = new Client(); client = new Client();
messageCache = new Cache<>(); messageCache = new Cache<>();
groupMessageCache = new Cache<>();
messageStatusCache = new Cache<>(); messageStatusCache = new Cache<>();
stage.setTitle("Envoy"); stage.setTitle("Envoy");
@ -106,7 +109,7 @@ public final class Startup extends Application {
final var sceneContext = new SceneContext(stage); final var sceneContext = new SceneContext(stage);
sceneContext.load(SceneInfo.LOGIN_SCENE); sceneContext.load(SceneInfo.LOGIN_SCENE);
sceneContext.<LoginScene>getController().initializeData(client, localDB, messageCache, messageStatusCache, sceneContext); sceneContext.<LoginScene>getController().initializeData(client, localDB, messageCache, groupMessageCache, messageStatusCache, sceneContext);
} }
/** /**

View File

@ -15,9 +15,7 @@ import envoy.client.data.*;
import envoy.client.net.Client; import envoy.client.net.Client;
import envoy.client.ui.SceneContext; import envoy.client.ui.SceneContext;
import envoy.client.ui.Startup; import envoy.client.ui.Startup;
import envoy.data.LoginCredentials; import envoy.data.*;
import envoy.data.Message;
import envoy.data.User;
import envoy.data.User.UserStatus; import envoy.data.User.UserStatus;
import envoy.event.EventBus; import envoy.event.EventBus;
import envoy.event.HandshakeRejection; import envoy.event.HandshakeRejection;
@ -58,6 +56,7 @@ public final class LoginScene {
private Client client; private Client client;
private LocalDB localDB; private LocalDB localDB;
private Cache<Message> receivedMessageCache; private Cache<Message> receivedMessageCache;
private Cache<GroupMessage> receivedGroupMessageCache;
private Cache<MessageStatusChange> receivedMessageStatusChangeCache; private Cache<MessageStatusChange> receivedMessageStatusChangeCache;
private SceneContext sceneContext; private SceneContext sceneContext;
@ -83,6 +82,8 @@ public final class LoginScene {
* @param receivedMessageCache the cache storing messages received * @param receivedMessageCache the cache storing messages received
* during * during
* the handshake * the handshake
* @param receivedGroupMessageCache the cache storing groupMessages
* received during the handshake
* @param receivedMessageStatusChangeCache the cache storing * @param receivedMessageStatusChangeCache the cache storing
* messageStatusChangeEvents received * messageStatusChangeEvents received
* during handshake * during handshake
@ -91,11 +92,12 @@ public final class LoginScene {
* scene * scene
* @since Envoy Client v0.1-beta * @since Envoy Client v0.1-beta
*/ */
public void initializeData(Client client, LocalDB localDB, Cache<Message> receivedMessageCache, public void initializeData(Client client, LocalDB localDB, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
Cache<MessageStatusChange> receivedMessageStatusChangeCache, SceneContext sceneContext) { Cache<MessageStatusChange> receivedMessageStatusChangeCache, SceneContext sceneContext) {
this.client = client; this.client = client;
this.localDB = localDB; this.localDB = localDB;
this.receivedMessageCache = receivedMessageCache; this.receivedMessageCache = receivedMessageCache;
this.receivedGroupMessageCache = receivedGroupMessageCache;
this.receivedMessageStatusChangeCache = receivedMessageStatusChangeCache; this.receivedMessageStatusChangeCache = receivedMessageStatusChangeCache;
this.sceneContext = sceneContext; this.sceneContext = sceneContext;
@ -145,9 +147,9 @@ public final class LoginScene {
private void performHandshake(LoginCredentials credentials) { private void performHandshake(LoginCredentials credentials) {
try { try {
client.performHandshake(credentials, receivedMessageCache, receivedMessageStatusChangeCache); client.performHandshake(credentials, receivedMessageCache, receivedGroupMessageCache, receivedMessageStatusChangeCache);
if (client.isOnline()) { if (client.isOnline()) {
client.initReceiver(localDB, receivedMessageCache, receivedMessageStatusChangeCache); client.initReceiver(localDB, receivedMessageCache, receivedGroupMessageCache, receivedMessageStatusChangeCache);
loadChatScene(); loadChatScene();
} }
} catch (IOException | InterruptedException | TimeoutException e) { } catch (IOException | InterruptedException | TimeoutException e) {
@ -212,6 +214,7 @@ public final class LoginScene {
// Relay unread messages from cache // Relay unread messages from cache
if (receivedMessageCache != null && client.isOnline()) receivedMessageCache.relay(); if (receivedMessageCache != null && client.isOnline()) receivedMessageCache.relay();
if (receivedGroupMessageCache != null && client.isOnline()) receivedGroupMessageCache.relay();
if (receivedMessageStatusChangeCache != null && client.isOnline()) receivedMessageStatusChangeCache.relay(); if (receivedMessageStatusChangeCache != null && client.isOnline()) receivedMessageStatusChangeCache.relay();
} }
} }