[common] udpate proto

This commit is contained in:
2018-03-01 12:05:06 +03:00
parent 634f5ad2d0
commit 45fab2c0b5
15 changed files with 96 additions and 63 deletions

View File

@@ -1,16 +1,17 @@
package ru.m.connect;
import promhx.Deferred;
import hxsignal.impl.Signal1;
import haxe.io.Bytes;
import promhx.Deferred;
import promhx.Promise;
import protohx.Message;
import ru.m.connect.IConnection;
import ru.m.signal.Signal;
class BaseConnection<O:Message, I:Message> implements IConnection<O, I> {
public var handler(default, default):Signal1<ConnectionEvent>;
public var packetHandler(default, default):Signal1<I>;
public var handler(default, null):Signal<ConnectionEvent>;
public var sendHandler(default, null):Signal<O>;
public var receiveHandler(default, null):Signal<I>;
public var connected(default, null):Bool;
public var queue(default, null):PacketQueue<I>;
@@ -18,8 +19,9 @@ class BaseConnection<O:Message, I:Message> implements IConnection<O, I> {
public function new(i:Class<I>) {
queue = new PacketQueue<I>(i);
handler = new Signal1<ConnectionEvent>();
packetHandler = new Signal1<I>();
handler = new Signal<ConnectionEvent>();
sendHandler = new Signal<O>();
receiveHandler = new Signal<I>();
connectDeferred = new Deferred();
}
@@ -32,14 +34,21 @@ class BaseConnection<O:Message, I:Message> implements IConnection<O, I> {
}
public function pushData(bytes:Bytes):Void {
#if proto_debug L.d('Proto', 'pushData: ${bytes.length}'); #end
queue.addBytes(bytes);
while (queue.hasMsg()) {
var packet:I = queue.popMsg();
packetHandler.emit(packet);
receive(packet);
}
}
public function send(packet:O):Void {
#if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
#if proto_debug L.d('Proto', 'send: ${packet}'); #end
sendHandler.emit(packet);
}
public function receive(packet:I):Void {
#if proto_debug L.d('Proto', 'receive: ${packet}'); #end
receiveHandler.emit(packet);
}
}

View File

@@ -1,9 +1,9 @@
package ru.m.connect;
import haxe.io.Bytes;
import hxsignal.impl.Signal1;
import promhx.Promise;
import protohx.Message;
import ru.m.signal.Signal;
enum ConnectionEvent {
@@ -14,8 +14,9 @@ enum ConnectionEvent {
interface IConnection<O:Message, I:Message> {
public var connected(default, null):Bool;
public var handler(default, null):Signal1<ConnectionEvent>;
public var packetHandler(default, null):Signal1<I>;
public var handler(default, null):Signal<ConnectionEvent>;
public var sendHandler(default, null):Signal<O>;
public var receiveHandler(default, null):Signal<I>;
public function connect():Promise<IConnection<O, I>>;
public function disconnect():Void;

View File

@@ -50,7 +50,7 @@ class PacketQueue<P:Message> {
bi.bigEndian = false;
while (available > 0) {
var packetSize = bi.readUInt16();
available -= 1;
available -= 2;
if (packetSize <= available) {
var msgBytes = bi.read(packetSize);
var packet:P = Type.createInstance(packetClass, []);

View File

@@ -32,6 +32,7 @@ class FlashConnection<O:Message, I:Message> extends BaseConnection<O, I> {
socket.addEventListener(Event.CONNECT, onConnect);
socket.addEventListener(ProgressEvent.SOCKET_DATA, onSocketData);
socket.endian = Endian.LITTLE_ENDIAN;
sendHandler.connect(_send);
}
override public function connect():Promise<IConnection<O, I>> {
@@ -77,8 +78,7 @@ class FlashConnection<O:Message, I:Message> extends BaseConnection<O, I> {
}
}
override public function send(packet:O):Void {
super.send(packet);
private function _send(packet:O):Void {
var out = new BytesOutput();
packet.writeTo(out);
var bytes = out.getBytes();

View File

@@ -12,13 +12,13 @@ class NekoConnection<O:Message, I:Message> extends BaseConnection<O, I> {
public function new(socket:Socket, i:Class<I>) {
super(i);
this.socket = socket;
socket.setFastSend(true);
socket.output.bigEndian = false;
socket.input.bigEndian = false;
//socket.setFastSend(true);
//socket.output.bigEndian = false;
//socket.input.bigEndian = false;
sendHandler.connect(_send);
}
override public function send(packet:O):Void {
super.send(packet);
private function _send(packet:O):Void {
try {
var b = new BytesOutput();
packet.writeTo(b);
@@ -26,9 +26,8 @@ class NekoConnection<O:Message, I:Message> extends BaseConnection<O, I> {
socket.output.writeUInt16(bytes.length);
socket.output.write(bytes);
socket.output.flush();
} catch (e:Dynamic) {
trace("Error send packet: " + Type.getClassName(Type.getClass(packet)));
trace(e);
} catch (error:Dynamic) {
L.e('Proto', 'Error send packet: ${packet}', error);
}
}
}

View File

@@ -16,13 +16,12 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
opened = false;
}
override public function send(packet:O):Void {
#if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
override private function _send(packet:O):Void {
try {
var data = WebSocketTools.packet2string(packet);
writeData(data, socket);
} catch (e:Dynamic) {
trace(e);
} catch (error:Dynamic) {
L.e('Proto', 'Error send packet: ${packet}', error);
}
}
@@ -39,7 +38,7 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
var data = parseData(bytes);
if (data != null) {
var packet:I = WebSocketTools.string2packet(data, queue.packetClass);
packetHandler.emit(packet);
receive(packet);
}
}
}

View File

@@ -0,0 +1,27 @@
package ru.m.signal;
typedef Receiver<T> = T -> Void;
class Signal<T> {
private var receivers:Array<Receiver<T>>;
public function new() {
receivers = [];
}
public function connect(receiver:Receiver<T>):Void {
receivers.push(receiver);
}
public function disconnect(receiver:Receiver<T>):Void {
receivers.remove(receiver);
}
public function emit(value:T):Void {
for (receiver in receivers) {
receiver(value);
}
}
}