140 lines
4.0 KiB
Java
140 lines
4.0 KiB
Java
package envoy.client.net;
|
|
|
|
import java.io.*;
|
|
import java.net.SocketException;
|
|
import java.util.*;
|
|
import java.util.function.Consumer;
|
|
import java.util.logging.*;
|
|
|
|
import dev.kske.eventbus.*;
|
|
|
|
import envoy.util.*;
|
|
|
|
/**
|
|
* Receives objects from the server and passes them to processor objects based on their class.
|
|
*
|
|
* @author Kai S. K. Engelbart
|
|
* @since Envoy Client v0.3-alpha
|
|
*/
|
|
public final class Receiver extends Thread {
|
|
|
|
private boolean isAlive = true;
|
|
|
|
private final InputStream in;
|
|
private final Map<Class<?>, Consumer<?>> processors = new HashMap<>();
|
|
|
|
private static final EventBus eventBus = EventBus.getInstance();
|
|
private static final Logger logger = EnvoyLog.getLogger(Receiver.class);
|
|
|
|
/**
|
|
* Creates an instance of {@link Receiver}.
|
|
*
|
|
* @param in the {@link InputStream} to parse objects from
|
|
* @since Envoy Client v0.3-alpha
|
|
*/
|
|
public Receiver(InputStream in) {
|
|
super("Receiver");
|
|
this.in = in;
|
|
setDaemon(true);
|
|
}
|
|
|
|
/**
|
|
* Starts the receiver loop. When an object is read, it is passed to the appropriate processor.
|
|
*
|
|
* @since Envoy Client v0.3-alpha
|
|
*/
|
|
@Override
|
|
public void run() {
|
|
|
|
while (isAlive)
|
|
try {
|
|
// Read object length
|
|
final byte[] lenBytes = new byte[4];
|
|
in.read(lenBytes);
|
|
final int len = SerializationUtils.bytesToInt(lenBytes, 0);
|
|
logger.log(Level.FINEST, "Expecting object of length " + len + ".");
|
|
|
|
// Read object into byte array
|
|
final byte[] objBytes = new byte[len];
|
|
final int bytesRead = in.read(objBytes);
|
|
logger.log(Level.FINEST, "Read " + bytesRead + " bytes.");
|
|
|
|
// Catch LV encoding errors
|
|
if (len != bytesRead) {
|
|
// Server has stopped sending, i.e. because he went offline
|
|
if (bytesRead == -1) {
|
|
isAlive = false;
|
|
logger.log(Level.INFO,
|
|
"Lost connection to the server. Exiting receiver...");
|
|
continue;
|
|
}
|
|
logger.log(Level.WARNING,
|
|
String.format(
|
|
"LV encoding violated: expected %d bytes, received %d bytes. Discarding object...",
|
|
len, bytesRead));
|
|
continue;
|
|
}
|
|
|
|
try (ObjectInputStream oin =
|
|
new ObjectInputStream(new ByteArrayInputStream(objBytes))) {
|
|
final Object obj = oin.readObject();
|
|
logger.log(Level.FINE, "Received " + obj);
|
|
|
|
// Get appropriate processor
|
|
@SuppressWarnings("rawtypes")
|
|
final Consumer processor = processors.get(obj.getClass());
|
|
|
|
// Dispatch to the processor if present
|
|
if (processor != null)
|
|
processor.accept(obj);
|
|
// Dispatch to the event bus if the object is an event without a processor
|
|
else if (obj instanceof IEvent)
|
|
eventBus.dispatch((IEvent) obj);
|
|
// Notify if no processor could be located
|
|
else
|
|
logger.log(Level.WARNING,
|
|
String.format(
|
|
"The received object has the %s for which no processor is defined.",
|
|
obj.getClass()));
|
|
}
|
|
} catch (final SocketException | EOFException e) {
|
|
// Connection probably closed by client.
|
|
logger.log(Level.INFO, "Exiting receiver...");
|
|
return;
|
|
} catch (final Exception e) {
|
|
logger.log(Level.SEVERE, "Error on receiver thread", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Adds an object processor to this {@link Receiver}. It will be called once an object of the
|
|
* accepted class has been received.
|
|
*
|
|
* @param processorClass the object class accepted by the processor
|
|
* @param processor the object processor
|
|
* @since Envoy Client v0.3-alpha
|
|
*/
|
|
public <T> void registerProcessor(Class<T> processorClass, Consumer<T> processor) {
|
|
processors.put(processorClass, processor);
|
|
}
|
|
|
|
/**
|
|
* Adds a map of object processors to this {@link Receiver}.
|
|
*
|
|
* @param processors the processors to add the processors to add
|
|
* @since Envoy Client v0.1-beta
|
|
*/
|
|
public void registerProcessors(Map<Class<?>, ? extends Consumer<?>> processors) {
|
|
this.processors.putAll(processors);
|
|
}
|
|
|
|
/**
|
|
* Removes all object processors registered at this {@link Receiver}.
|
|
*
|
|
* @since Envoy Client v0.3-alpha
|
|
*/
|
|
public void removeAllProcessors() {
|
|
processors.clear();
|
|
}
|
|
}
|