Send Pending Messages After Successful Handshake #111

Merged
kske merged 1 commits from b/slow-handshake into develop 2020-12-02 21:21:02 +01:00
3 changed files with 33 additions and 53 deletions
Showing only changes of commit c2bbb1a224 - Show all commits

View File

@ -12,7 +12,7 @@ import envoy.data.*;
import envoy.event.*;
import envoy.util.*;
import envoy.client.data.*;
import envoy.client.data.ClientConfig;
import envoy.client.event.EnvoyCloseEvent;
/**
@ -55,13 +55,12 @@ public final class Client implements EventListener, Closeable {
* the handshake does exceed this time limit, an exception is thrown.
*
* @param credentials the login credentials of the user
* @param cacheMap the map of all caches needed
* @throws TimeoutException if the server could not be reached
* @throws IOException if the login credentials could not be written
* @throws InterruptedException if the current thread is interrupted while waiting for the
* handshake response
*/
public void performHandshake(LoginCredentials credentials, CacheMap cacheMap)
public void performHandshake(LoginCredentials credentials)
throws TimeoutException, IOException, InterruptedException {
if (online)
throw new IllegalStateException("Handshake has already been performed successfully");
@ -79,7 +78,6 @@ public final class Client implements EventListener, Closeable {
// Register user creation processor, contact list processor, message cache and
// authentication token
receiver.registerProcessor(User.class, sender -> this.sender = sender);
receiver.registerProcessors(cacheMap.getMap());
// Start receiver
receiver.start();
@ -101,42 +99,18 @@ public final class Client implements EventListener, Closeable {
if (System.currentTimeMillis() - start > 5000) {
rejected = true;
socket.close();
receiver.removeAllProcessors();
throw new TimeoutException("Did not log in after 5 seconds");
}
Thread.sleep(500);
}
online = true;
logger.log(Level.INFO, "Handshake completed.");
}
/**
* Initializes the {@link Receiver} used to process data sent from the server to this client.
*
* @param localDB the local database used to persist the current {@link IDGenerator}
* @param cacheMap the map of all caches needed
* @throws IOException if no {@link IDGenerator} is present and none could be requested from the
* server
* @since Envoy Client v0.2-alpha
*/
public void initReceiver(LocalDB localDB, CacheMap cacheMap) throws IOException {
checkOnline();
// Remove all processors as they are only used during the handshake
// Remove handshake specific processors
receiver.removeAllProcessors();
// Relay cached messages and message status changes
cacheMap.get(Message.class).setProcessor(eventBus::dispatch);
cacheMap.get(GroupMessage.class).setProcessor(eventBus::dispatch);
cacheMap.get(MessageStatusChange.class).setProcessor(eventBus::dispatch);
cacheMap.get(GroupMessageStatusChange.class).setProcessor(eventBus::dispatch);
// Request a generator if none is present or the existing one is consumed
if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext())
requestIDGenerator();
// Relay caches
cacheMap.getMap().values().forEach(Cache::relay);
online = true;
logger.log(Level.INFO, "Handshake completed.");
}
/**

View File

@ -12,7 +12,7 @@ import javafx.stage.Stage;
import envoy.data.*;
import envoy.data.User.UserStatus;
import envoy.event.*;
import envoy.event.UserStatusChange;
import envoy.exception.EnvoyException;
import envoy.util.EnvoyLog;
@ -115,21 +115,20 @@ public final class Startup extends Application {
* @since Envoy Client v0.2-beta
*/
public static boolean performHandshake(LoginCredentials credentials) {
final var cacheMap = new CacheMap();
cacheMap.put(Message.class, new Cache<Message>());
cacheMap.put(GroupMessage.class, new Cache<GroupMessage>());
cacheMap.put(MessageStatusChange.class, new Cache<MessageStatusChange>());
cacheMap.put(GroupMessageStatusChange.class, new Cache<GroupMessageStatusChange>());
final var originalStatus =
localDB.getUser() == null ? UserStatus.ONLINE : localDB.getUser().getStatus();
try {
client.performHandshake(credentials, cacheMap);
client.performHandshake(credentials);
if (client.isOnline()) {
// Restore the original status as the server automatically returns status ONLINE
client.getSender().setStatus(originalStatus);
loadChatScene();
client.initReceiver(localDB, cacheMap);
// Request an ID generator if none is present or the existing one is consumed
if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext())
client.requestIDGenerator();
return true;
} else
return false;

View File

@ -40,6 +40,7 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
// Cache this write proxy for user-independent notifications
UserStatusChangeProcessor.setWriteProxy(writeProxy);
// Check for compatible versions
if (!VersionUtil.verifyCompatibility(credentials.getClientVersion())) {
logger.info("The client has the wrong version.");
writeProxy.write(socketID, new HandshakeRejection(WRONG_VERSION));
@ -70,10 +71,10 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(socketID, new HandshakeRejection(INVALID_TOKEN));
return;
}
} else
} else if (!PasswordUtil.validate(credentials.getPassword(),
user.getPasswordHash())) {
// Check the password hash
if (!PasswordUtil.validate(credentials.getPassword(), user.getPasswordHash())) {
logger.info(user + " has entered the wrong password.");
writeProxy.write(socketID, new HandshakeRejection(WRONG_PASSWORD_OR_USER));
return;
@ -101,7 +102,8 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(socketID, new HandshakeRejection(USERNAME_TAKEN));
return;
} catch (final NoResultException e) {
// Creation of a new user
// Create a new user
user = new User();
user.setName(credentials.getIdentifier());
user.setLastSeen(Instant.now());
@ -138,6 +140,15 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
persistenceManager.updateContact(user);
writeProxy.write(socketID, new NewAuthToken(token));
}
// Notify the user if a contact deletion has happened since he last logged in
if (user.getLatestContactDeletion().isAfter(user.getLastSeen()))
writeProxy.write(socketID, new ContactsChangedSinceLastLogin());
// Complete the handshake
writeProxy.write(socketID, user.toCommon());
// Send pending (group) messages and status changes
final var pendingMessages =
PersistenceManager.getInstance().getPendingMessages(user, credentials.getLastSync());
pendingMessages.removeIf(GroupMessage.class::isInstance);
@ -162,9 +173,10 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(connectionManager.getSocketID(msg.getSender().getID()),
new MessageStatusChange(msgCommon));
}
} else
} else {
writeProxy.write(socketID, new MessageStatusChange(msgCommon));
}
}
final List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance()
.getPendingGroupMessages(user, credentials.getLastSync());
@ -197,10 +209,11 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
}
PersistenceManager.getInstance().updateMessage(gmsg);
} else
} else {
// Just send the message without updating if it was received in the past
writeProxy.write(socketID, gmsgCommon);
}
} else {
// Sending group message status changes
@ -220,11 +233,5 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(socketID, new MessageStatusChange(gmsgCommon));
}
}
// Notify the user if a contact deletion has happened since he last logged in
if (user.getLatestContactDeletion().isAfter(user.getLastSeen()))
writeProxy.write(socketID, new ContactsChangedSinceLastLogin());
// Complete the handshake
writeProxy.write(socketID, user.toCommon());
}
}