Send Pending Messages After Successful Handshake #111
@ -12,7 +12,7 @@ import envoy.data.*;
|
|||||||
import envoy.event.*;
|
import envoy.event.*;
|
||||||
import envoy.util.*;
|
import envoy.util.*;
|
||||||
|
|
||||||
import envoy.client.data.*;
|
import envoy.client.data.ClientConfig;
|
||||||
import envoy.client.event.EnvoyCloseEvent;
|
import envoy.client.event.EnvoyCloseEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -55,13 +55,12 @@ public final class Client implements EventListener, Closeable {
|
|||||||
* the handshake does exceed this time limit, an exception is thrown.
|
* the handshake does exceed this time limit, an exception is thrown.
|
||||||
*
|
*
|
||||||
* @param credentials the login credentials of the user
|
* @param credentials the login credentials of the user
|
||||||
* @param cacheMap the map of all caches needed
|
|
||||||
* @throws TimeoutException if the server could not be reached
|
* @throws TimeoutException if the server could not be reached
|
||||||
* @throws IOException if the login credentials could not be written
|
* @throws IOException if the login credentials could not be written
|
||||||
* @throws InterruptedException if the current thread is interrupted while waiting for the
|
* @throws InterruptedException if the current thread is interrupted while waiting for the
|
||||||
* handshake response
|
* handshake response
|
||||||
*/
|
*/
|
||||||
public void performHandshake(LoginCredentials credentials, CacheMap cacheMap)
|
public void performHandshake(LoginCredentials credentials)
|
||||||
throws TimeoutException, IOException, InterruptedException {
|
throws TimeoutException, IOException, InterruptedException {
|
||||||
if (online)
|
if (online)
|
||||||
throw new IllegalStateException("Handshake has already been performed successfully");
|
throw new IllegalStateException("Handshake has already been performed successfully");
|
||||||
@ -79,7 +78,6 @@ public final class Client implements EventListener, Closeable {
|
|||||||
// Register user creation processor, contact list processor, message cache and
|
// Register user creation processor, contact list processor, message cache and
|
||||||
// authentication token
|
// authentication token
|
||||||
receiver.registerProcessor(User.class, sender -> this.sender = sender);
|
receiver.registerProcessor(User.class, sender -> this.sender = sender);
|
||||||
receiver.registerProcessors(cacheMap.getMap());
|
|
||||||
|
|
||||||
// Start receiver
|
// Start receiver
|
||||||
receiver.start();
|
receiver.start();
|
||||||
@ -101,42 +99,18 @@ public final class Client implements EventListener, Closeable {
|
|||||||
|
|
||||||
if (System.currentTimeMillis() - start > 5000) {
|
if (System.currentTimeMillis() - start > 5000) {
|
||||||
rejected = true;
|
rejected = true;
|
||||||
|
socket.close();
|
||||||
|
receiver.removeAllProcessors();
|
||||||
throw new TimeoutException("Did not log in after 5 seconds");
|
throw new TimeoutException("Did not log in after 5 seconds");
|
||||||
}
|
}
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
online = true;
|
// Remove handshake specific processors
|
||||||
logger.log(Level.INFO, "Handshake completed.");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the {@link Receiver} used to process data sent from the server to this client.
|
|
||||||
*
|
|
||||||
* @param localDB the local database used to persist the current {@link IDGenerator}
|
|
||||||
* @param cacheMap the map of all caches needed
|
|
||||||
* @throws IOException if no {@link IDGenerator} is present and none could be requested from the
|
|
||||||
* server
|
|
||||||
* @since Envoy Client v0.2-alpha
|
|
||||||
*/
|
|
||||||
public void initReceiver(LocalDB localDB, CacheMap cacheMap) throws IOException {
|
|
||||||
checkOnline();
|
|
||||||
|
|
||||||
// Remove all processors as they are only used during the handshake
|
|
||||||
receiver.removeAllProcessors();
|
receiver.removeAllProcessors();
|
||||||
|
|
||||||
// Relay cached messages and message status changes
|
online = true;
|
||||||
cacheMap.get(Message.class).setProcessor(eventBus::dispatch);
|
logger.log(Level.INFO, "Handshake completed.");
|
||||||
cacheMap.get(GroupMessage.class).setProcessor(eventBus::dispatch);
|
|
||||||
cacheMap.get(MessageStatusChange.class).setProcessor(eventBus::dispatch);
|
|
||||||
cacheMap.get(GroupMessageStatusChange.class).setProcessor(eventBus::dispatch);
|
|
||||||
|
|
||||||
// Request a generator if none is present or the existing one is consumed
|
|
||||||
if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext())
|
|
||||||
requestIDGenerator();
|
|
||||||
|
|
||||||
// Relay caches
|
|
||||||
cacheMap.getMap().values().forEach(Cache::relay);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -12,7 +12,7 @@ import javafx.stage.Stage;
|
|||||||
|
|
||||||
import envoy.data.*;
|
import envoy.data.*;
|
||||||
import envoy.data.User.UserStatus;
|
import envoy.data.User.UserStatus;
|
||||||
import envoy.event.*;
|
import envoy.event.UserStatusChange;
|
||||||
import envoy.exception.EnvoyException;
|
import envoy.exception.EnvoyException;
|
||||||
import envoy.util.EnvoyLog;
|
import envoy.util.EnvoyLog;
|
||||||
|
|
||||||
@ -115,21 +115,20 @@ public final class Startup extends Application {
|
|||||||
* @since Envoy Client v0.2-beta
|
* @since Envoy Client v0.2-beta
|
||||||
*/
|
*/
|
||||||
public static boolean performHandshake(LoginCredentials credentials) {
|
public static boolean performHandshake(LoginCredentials credentials) {
|
||||||
final var cacheMap = new CacheMap();
|
|
||||||
cacheMap.put(Message.class, new Cache<Message>());
|
|
||||||
cacheMap.put(GroupMessage.class, new Cache<GroupMessage>());
|
|
||||||
cacheMap.put(MessageStatusChange.class, new Cache<MessageStatusChange>());
|
|
||||||
cacheMap.put(GroupMessageStatusChange.class, new Cache<GroupMessageStatusChange>());
|
|
||||||
final var originalStatus =
|
final var originalStatus =
|
||||||
localDB.getUser() == null ? UserStatus.ONLINE : localDB.getUser().getStatus();
|
localDB.getUser() == null ? UserStatus.ONLINE : localDB.getUser().getStatus();
|
||||||
try {
|
try {
|
||||||
client.performHandshake(credentials, cacheMap);
|
client.performHandshake(credentials);
|
||||||
if (client.isOnline()) {
|
if (client.isOnline()) {
|
||||||
|
|
||||||
// Restore the original status as the server automatically returns status ONLINE
|
// Restore the original status as the server automatically returns status ONLINE
|
||||||
client.getSender().setStatus(originalStatus);
|
client.getSender().setStatus(originalStatus);
|
||||||
loadChatScene();
|
loadChatScene();
|
||||||
client.initReceiver(localDB, cacheMap);
|
|
||||||
|
// Request an ID generator if none is present or the existing one is consumed
|
||||||
|
if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext())
|
||||||
|
client.requestIDGenerator();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} else
|
} else
|
||||||
return false;
|
return false;
|
||||||
|
@ -40,6 +40,7 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
// Cache this write proxy for user-independent notifications
|
// Cache this write proxy for user-independent notifications
|
||||||
UserStatusChangeProcessor.setWriteProxy(writeProxy);
|
UserStatusChangeProcessor.setWriteProxy(writeProxy);
|
||||||
|
|
||||||
|
// Check for compatible versions
|
||||||
if (!VersionUtil.verifyCompatibility(credentials.getClientVersion())) {
|
if (!VersionUtil.verifyCompatibility(credentials.getClientVersion())) {
|
||||||
logger.info("The client has the wrong version.");
|
logger.info("The client has the wrong version.");
|
||||||
writeProxy.write(socketID, new HandshakeRejection(WRONG_VERSION));
|
writeProxy.write(socketID, new HandshakeRejection(WRONG_VERSION));
|
||||||
@ -70,10 +71,10 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
writeProxy.write(socketID, new HandshakeRejection(INVALID_TOKEN));
|
writeProxy.write(socketID, new HandshakeRejection(INVALID_TOKEN));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else
|
} else if (!PasswordUtil.validate(credentials.getPassword(),
|
||||||
|
user.getPasswordHash())) {
|
||||||
|
|
||||||
// Check the password hash
|
// Check the password hash
|
||||||
if (!PasswordUtil.validate(credentials.getPassword(), user.getPasswordHash())) {
|
|
||||||
logger.info(user + " has entered the wrong password.");
|
logger.info(user + " has entered the wrong password.");
|
||||||
writeProxy.write(socketID, new HandshakeRejection(WRONG_PASSWORD_OR_USER));
|
writeProxy.write(socketID, new HandshakeRejection(WRONG_PASSWORD_OR_USER));
|
||||||
return;
|
return;
|
||||||
@ -101,7 +102,8 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
writeProxy.write(socketID, new HandshakeRejection(USERNAME_TAKEN));
|
writeProxy.write(socketID, new HandshakeRejection(USERNAME_TAKEN));
|
||||||
return;
|
return;
|
||||||
} catch (final NoResultException e) {
|
} catch (final NoResultException e) {
|
||||||
// Creation of a new user
|
|
||||||
|
// Create a new user
|
||||||
user = new User();
|
user = new User();
|
||||||
user.setName(credentials.getIdentifier());
|
user.setName(credentials.getIdentifier());
|
||||||
user.setLastSeen(Instant.now());
|
user.setLastSeen(Instant.now());
|
||||||
@ -138,6 +140,15 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
persistenceManager.updateContact(user);
|
persistenceManager.updateContact(user);
|
||||||
writeProxy.write(socketID, new NewAuthToken(token));
|
writeProxy.write(socketID, new NewAuthToken(token));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify the user if a contact deletion has happened since he last logged in
|
||||||
|
if (user.getLatestContactDeletion().isAfter(user.getLastSeen()))
|
||||||
|
writeProxy.write(socketID, new ContactsChangedSinceLastLogin());
|
||||||
|
|
||||||
|
// Complete the handshake
|
||||||
|
writeProxy.write(socketID, user.toCommon());
|
||||||
|
|
||||||
|
// Send pending (group) messages and status changes
|
||||||
final var pendingMessages =
|
final var pendingMessages =
|
||||||
PersistenceManager.getInstance().getPendingMessages(user, credentials.getLastSync());
|
PersistenceManager.getInstance().getPendingMessages(user, credentials.getLastSync());
|
||||||
pendingMessages.removeIf(GroupMessage.class::isInstance);
|
pendingMessages.removeIf(GroupMessage.class::isInstance);
|
||||||
@ -162,9 +173,10 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
writeProxy.write(connectionManager.getSocketID(msg.getSender().getID()),
|
writeProxy.write(connectionManager.getSocketID(msg.getSender().getID()),
|
||||||
new MessageStatusChange(msgCommon));
|
new MessageStatusChange(msgCommon));
|
||||||
}
|
}
|
||||||
} else
|
} else {
|
||||||
writeProxy.write(socketID, new MessageStatusChange(msgCommon));
|
writeProxy.write(socketID, new MessageStatusChange(msgCommon));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance()
|
final List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance()
|
||||||
.getPendingGroupMessages(user, credentials.getLastSync());
|
.getPendingGroupMessages(user, credentials.getLastSync());
|
||||||
@ -197,10 +209,11 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
}
|
}
|
||||||
|
|
||||||
PersistenceManager.getInstance().updateMessage(gmsg);
|
PersistenceManager.getInstance().updateMessage(gmsg);
|
||||||
} else
|
} else {
|
||||||
|
|
||||||
// Just send the message without updating if it was received in the past
|
// Just send the message without updating if it was received in the past
|
||||||
writeProxy.write(socketID, gmsgCommon);
|
writeProxy.write(socketID, gmsgCommon);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// Sending group message status changes
|
// Sending group message status changes
|
||||||
@ -220,11 +233,5 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
|
|||||||
writeProxy.write(socketID, new MessageStatusChange(gmsgCommon));
|
writeProxy.write(socketID, new MessageStatusChange(gmsgCommon));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Notify the user if a contact deletion has happened since he last logged in
|
|
||||||
if (user.getLatestContactDeletion().isAfter(user.getLastSeen()))
|
|
||||||
writeProxy.write(socketID, new ContactsChangedSinceLastLogin());
|
|
||||||
|
|
||||||
// Complete the handshake
|
|
||||||
writeProxy.write(socketID, user.toCommon());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user