Implemented pendingMessage processing using an extra cache
This commit is contained in:
parent
e1e8315ace
commit
7723db672b
@ -59,6 +59,12 @@ public class Client implements Closeable {
|
|||||||
* from the server that can be relayed
|
* from the server that can be relayed
|
||||||
* after
|
* after
|
||||||
* initialization
|
* initialization
|
||||||
|
* @param receivedGroupMessageCache a groupMessage cache containing all
|
||||||
|
* unread
|
||||||
|
* groupMessages
|
||||||
|
* from the server that can be relayed
|
||||||
|
* after
|
||||||
|
* initialization
|
||||||
* @param receivedMessageStatusChangeCache an event cache containing all
|
* @param receivedMessageStatusChangeCache an event cache containing all
|
||||||
* received messageStatusChangeEvents
|
* received messageStatusChangeEvents
|
||||||
* from the server that can be relayed
|
* from the server that can be relayed
|
||||||
@ -68,7 +74,7 @@ public class Client implements Closeable {
|
|||||||
* @throws InterruptedException if the current thread is interrupted while
|
* @throws InterruptedException if the current thread is interrupted while
|
||||||
* waiting for the handshake response
|
* waiting for the handshake response
|
||||||
*/
|
*/
|
||||||
public void performHandshake(LoginCredentials credentials, Cache<Message> receivedMessageCache,
|
public void performHandshake(LoginCredentials credentials, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
|
||||||
Cache<MessageStatusChange> receivedMessageStatusChangeCache) throws TimeoutException, IOException, InterruptedException {
|
Cache<MessageStatusChange> receivedMessageStatusChangeCache) throws TimeoutException, IOException, InterruptedException {
|
||||||
if (online) throw new IllegalStateException("Handshake has already been performed successfully");
|
if (online) throw new IllegalStateException("Handshake has already been performed successfully");
|
||||||
|
|
||||||
@ -83,6 +89,7 @@ public class Client implements Closeable {
|
|||||||
// Register user creation processor, contact list processor and message cache
|
// Register user creation processor, contact list processor and message cache
|
||||||
receiver.registerProcessor(User.class, sender -> this.sender = sender);
|
receiver.registerProcessor(User.class, sender -> this.sender = sender);
|
||||||
receiver.registerProcessor(Message.class, receivedMessageCache);
|
receiver.registerProcessor(Message.class, receivedMessageCache);
|
||||||
|
receiver.registerProcessor(GroupMessage.class, receivedGroupMessageCache);
|
||||||
receiver.registerProcessor(MessageStatusChange.class, receivedMessageStatusChangeCache);
|
receiver.registerProcessor(MessageStatusChange.class, receivedMessageStatusChangeCache);
|
||||||
receiver.registerProcessor(HandshakeRejection.class, evt -> { rejected = true; eventBus.dispatch(evt); });
|
receiver.registerProcessor(HandshakeRejection.class, evt -> { rejected = true; eventBus.dispatch(evt); });
|
||||||
|
|
||||||
@ -130,6 +137,12 @@ public class Client implements Closeable {
|
|||||||
* from the server that can be relayed
|
* from the server that can be relayed
|
||||||
* after
|
* after
|
||||||
* initialization
|
* initialization
|
||||||
|
* @param receivedGroupMessageCache a groupMessage cache containing all
|
||||||
|
* unread
|
||||||
|
* groupMessages
|
||||||
|
* from the server that can be relayed
|
||||||
|
* after
|
||||||
|
* initialization
|
||||||
* @param receivedMessageStatusChangeCache an event cache containing all
|
* @param receivedMessageStatusChangeCache an event cache containing all
|
||||||
* received messageStatusChangeEvents
|
* received messageStatusChangeEvents
|
||||||
* from the server that can be relayed
|
* from the server that can be relayed
|
||||||
@ -138,21 +151,24 @@ public class Client implements Closeable {
|
|||||||
* requested from the server
|
* requested from the server
|
||||||
* @since Envoy Client v0.2-alpha
|
* @since Envoy Client v0.2-alpha
|
||||||
*/
|
*/
|
||||||
public void initReceiver(LocalDB localDB, Cache<Message> receivedMessageCache, Cache<MessageStatusChange> receivedMessageStatusChangeCache)
|
public void initReceiver(LocalDB localDB, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
|
||||||
|
Cache<MessageStatusChange> receivedMessageStatusChangeCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOnline();
|
checkOnline();
|
||||||
|
|
||||||
// Process incoming messages
|
// Process incoming messages
|
||||||
final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor();
|
final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor();
|
||||||
|
final ReceivedGroupMessageProcessor receivedGroupMessageProcessor = new ReceivedGroupMessageProcessor();
|
||||||
final MessageStatusChangeProcessor messageStatusChangeEventProcessor = new MessageStatusChangeProcessor();
|
final MessageStatusChangeProcessor messageStatusChangeEventProcessor = new MessageStatusChangeProcessor();
|
||||||
|
|
||||||
// TODO: Define a cache
|
// TODO: Define a cache
|
||||||
receiver.registerProcessor(GroupMessage.class, new ReceivedGroupMessageProcessor());
|
receiver.registerProcessor(GroupMessage.class, receivedGroupMessageProcessor);
|
||||||
|
|
||||||
receiver.registerProcessor(Message.class, receivedMessageProcessor);
|
receiver.registerProcessor(Message.class, receivedMessageProcessor);
|
||||||
|
|
||||||
// Relay cached unread messages
|
// Relay cached unread messages and unread groupMessages
|
||||||
receivedMessageCache.setProcessor(receivedMessageProcessor);
|
receivedMessageCache.setProcessor(receivedMessageProcessor);
|
||||||
|
receivedGroupMessageCache.setProcessor(receivedGroupMessageProcessor);
|
||||||
|
|
||||||
// Process message status changes
|
// Process message status changes
|
||||||
receiver.registerProcessor(MessageStatusChange.class, messageStatusChangeEventProcessor);
|
receiver.registerProcessor(MessageStatusChange.class, messageStatusChangeEventProcessor);
|
||||||
|
@ -71,7 +71,8 @@ public class Receiver extends Thread {
|
|||||||
final Consumer processor = processors.get(obj.getClass());
|
final Consumer processor = processors.get(obj.getClass());
|
||||||
if (processor == null)
|
if (processor == null)
|
||||||
logger.log(Level.WARNING, String.format(
|
logger.log(Level.WARNING, String.format(
|
||||||
"The received object has the class %s for which no processor is defined.", obj.getClass()));
|
"The received object has the %s for which no processor is defined.",
|
||||||
|
obj.getClass()));
|
||||||
else processor.accept(obj);
|
else processor.accept(obj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@ import envoy.client.data.*;
|
|||||||
import envoy.client.net.Client;
|
import envoy.client.net.Client;
|
||||||
import envoy.client.ui.SceneContext.SceneInfo;
|
import envoy.client.ui.SceneContext.SceneInfo;
|
||||||
import envoy.client.ui.controller.LoginScene;
|
import envoy.client.ui.controller.LoginScene;
|
||||||
|
import envoy.data.GroupMessage;
|
||||||
import envoy.data.Message;
|
import envoy.data.Message;
|
||||||
import envoy.event.MessageStatusChange;
|
import envoy.event.MessageStatusChange;
|
||||||
import envoy.exception.EnvoyException;
|
import envoy.exception.EnvoyException;
|
||||||
@ -43,6 +44,7 @@ public final class Startup extends Application {
|
|||||||
private LocalDB localDB;
|
private LocalDB localDB;
|
||||||
private Client client;
|
private Client client;
|
||||||
private Cache<Message> messageCache;
|
private Cache<Message> messageCache;
|
||||||
|
private Cache<GroupMessage> groupMessageCache;
|
||||||
private Cache<MessageStatusChange> messageStatusCache;
|
private Cache<MessageStatusChange> messageStatusCache;
|
||||||
|
|
||||||
private static final ClientConfig config = ClientConfig.getInstance();
|
private static final ClientConfig config = ClientConfig.getInstance();
|
||||||
@ -99,6 +101,7 @@ public final class Startup extends Application {
|
|||||||
// Initialize client and unread message cache
|
// Initialize client and unread message cache
|
||||||
client = new Client();
|
client = new Client();
|
||||||
messageCache = new Cache<>();
|
messageCache = new Cache<>();
|
||||||
|
groupMessageCache = new Cache<>();
|
||||||
messageStatusCache = new Cache<>();
|
messageStatusCache = new Cache<>();
|
||||||
|
|
||||||
stage.setTitle("Envoy");
|
stage.setTitle("Envoy");
|
||||||
@ -106,7 +109,7 @@ public final class Startup extends Application {
|
|||||||
|
|
||||||
final var sceneContext = new SceneContext(stage);
|
final var sceneContext = new SceneContext(stage);
|
||||||
sceneContext.load(SceneInfo.LOGIN_SCENE);
|
sceneContext.load(SceneInfo.LOGIN_SCENE);
|
||||||
sceneContext.<LoginScene>getController().initializeData(client, localDB, messageCache, messageStatusCache, sceneContext);
|
sceneContext.<LoginScene>getController().initializeData(client, localDB, messageCache, groupMessageCache, messageStatusCache, sceneContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -15,9 +15,7 @@ import envoy.client.data.*;
|
|||||||
import envoy.client.net.Client;
|
import envoy.client.net.Client;
|
||||||
import envoy.client.ui.SceneContext;
|
import envoy.client.ui.SceneContext;
|
||||||
import envoy.client.ui.Startup;
|
import envoy.client.ui.Startup;
|
||||||
import envoy.data.LoginCredentials;
|
import envoy.data.*;
|
||||||
import envoy.data.Message;
|
|
||||||
import envoy.data.User;
|
|
||||||
import envoy.data.User.UserStatus;
|
import envoy.data.User.UserStatus;
|
||||||
import envoy.event.EventBus;
|
import envoy.event.EventBus;
|
||||||
import envoy.event.HandshakeRejection;
|
import envoy.event.HandshakeRejection;
|
||||||
@ -58,6 +56,7 @@ public final class LoginScene {
|
|||||||
private Client client;
|
private Client client;
|
||||||
private LocalDB localDB;
|
private LocalDB localDB;
|
||||||
private Cache<Message> receivedMessageCache;
|
private Cache<Message> receivedMessageCache;
|
||||||
|
private Cache<GroupMessage> receivedGroupMessageCache;
|
||||||
private Cache<MessageStatusChange> receivedMessageStatusChangeCache;
|
private Cache<MessageStatusChange> receivedMessageStatusChangeCache;
|
||||||
private SceneContext sceneContext;
|
private SceneContext sceneContext;
|
||||||
|
|
||||||
@ -83,6 +82,8 @@ public final class LoginScene {
|
|||||||
* @param receivedMessageCache the cache storing messages received
|
* @param receivedMessageCache the cache storing messages received
|
||||||
* during
|
* during
|
||||||
* the handshake
|
* the handshake
|
||||||
|
* @param receivedGroupMessageCache the cache storing groupMessages
|
||||||
|
* received during the handshake
|
||||||
* @param receivedMessageStatusChangeCache the cache storing
|
* @param receivedMessageStatusChangeCache the cache storing
|
||||||
* messageStatusChangeEvents received
|
* messageStatusChangeEvents received
|
||||||
* during handshake
|
* during handshake
|
||||||
@ -91,11 +92,12 @@ public final class LoginScene {
|
|||||||
* scene
|
* scene
|
||||||
* @since Envoy Client v0.1-beta
|
* @since Envoy Client v0.1-beta
|
||||||
*/
|
*/
|
||||||
public void initializeData(Client client, LocalDB localDB, Cache<Message> receivedMessageCache,
|
public void initializeData(Client client, LocalDB localDB, Cache<Message> receivedMessageCache, Cache<GroupMessage> receivedGroupMessageCache,
|
||||||
Cache<MessageStatusChange> receivedMessageStatusChangeCache, SceneContext sceneContext) {
|
Cache<MessageStatusChange> receivedMessageStatusChangeCache, SceneContext sceneContext) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.localDB = localDB;
|
this.localDB = localDB;
|
||||||
this.receivedMessageCache = receivedMessageCache;
|
this.receivedMessageCache = receivedMessageCache;
|
||||||
|
this.receivedGroupMessageCache = receivedGroupMessageCache;
|
||||||
this.receivedMessageStatusChangeCache = receivedMessageStatusChangeCache;
|
this.receivedMessageStatusChangeCache = receivedMessageStatusChangeCache;
|
||||||
this.sceneContext = sceneContext;
|
this.sceneContext = sceneContext;
|
||||||
|
|
||||||
@ -145,9 +147,9 @@ public final class LoginScene {
|
|||||||
|
|
||||||
private void performHandshake(LoginCredentials credentials) {
|
private void performHandshake(LoginCredentials credentials) {
|
||||||
try {
|
try {
|
||||||
client.performHandshake(credentials, receivedMessageCache, receivedMessageStatusChangeCache);
|
client.performHandshake(credentials, receivedMessageCache, receivedGroupMessageCache, receivedMessageStatusChangeCache);
|
||||||
if (client.isOnline()) {
|
if (client.isOnline()) {
|
||||||
client.initReceiver(localDB, receivedMessageCache, receivedMessageStatusChangeCache);
|
client.initReceiver(localDB, receivedMessageCache, receivedGroupMessageCache, receivedMessageStatusChangeCache);
|
||||||
loadChatScene();
|
loadChatScene();
|
||||||
}
|
}
|
||||||
} catch (IOException | InterruptedException | TimeoutException e) {
|
} catch (IOException | InterruptedException | TimeoutException e) {
|
||||||
@ -212,6 +214,7 @@ public final class LoginScene {
|
|||||||
|
|
||||||
// Relay unread messages from cache
|
// Relay unread messages from cache
|
||||||
if (receivedMessageCache != null && client.isOnline()) receivedMessageCache.relay();
|
if (receivedMessageCache != null && client.isOnline()) receivedMessageCache.relay();
|
||||||
|
if (receivedGroupMessageCache != null && client.isOnline()) receivedGroupMessageCache.relay();
|
||||||
if (receivedMessageStatusChangeCache != null && client.isOnline()) receivedMessageStatusChangeCache.relay();
|
if (receivedMessageStatusChangeCache != null && client.isOnline()) receivedMessageStatusChangeCache.relay();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user