[client] update proto

This commit is contained in:
2018-02-28 22:06:00 +03:00
parent 336896b37e
commit 34e5ac2b9e
10 changed files with 249 additions and 343 deletions

View File

@@ -1,60 +1,45 @@
package ru.m.connect;
import haxework.dispath.Dispatcher;
import haxework.dispath.IDispatcher;
import haxework.provider.Provider;
import promhx.Deferred;
import hxsignal.impl.Signal1;
import haxe.io.Bytes;
import promhx.Promise;
import protohx.Message;
import ru.m.connect.IConnection;
class BaseConnection implements IConnection {
public var handler(default,default):IDispatcher<IConnectionHandler>;
public var packetHandler(default,default):IDispatcher<IPacketHandler>;
public var connected(default, null):Bool;
public var queue(default, null):PacketQueue;
public var builder(default, null):IPacketBuilder;
public function new() {
this.builder = Provider.get(IPacketBuilder);
this.queue = new PacketQueue(builder);
this.handler = new Dispatcher<IConnectionHandler>();
this.packetHandler = new Dispatcher<IPacketHandler>();
}
class BaseConnection<O:Message, I:Message> implements IConnection<O, I> {
public var handler(default, default):Signal1<ConnectionEvent>;
public var packetHandler(default, default):Signal1<I>;
public var connected(default, null):Bool;
public var queue(default, null):PacketQueue<I>;
public function connect():Void {
throw "Not implemented";
}
private var connectDeferred:Deferred<IConnection<O, I>>;
public function disconnect():Void {
throw "Not implemented";
}
public function pushData(bytes:Bytes):Void {
queue.addBytes(bytes);
while (queue.hasMsg()) {
var packet:Message = queue.popMsg();
try {
receive(packet);
} catch (error:Dynamic) {
trace(error);
handler.dispatch(function(h) h.onError(error));
}
public function new(i:Class<I>) {
queue = new PacketQueue<I>(i);
handler = new Signal1<ConnectionEvent>();
packetHandler = new Signal1<I>();
connectDeferred = new Deferred();
}
}
public function send(packet:Message):Void {
#if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
}
public function receive(packet:Message):Void {
#if proto_debug L.d("Receive", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
var name = "on" + Type.getClassName(Type.getClass(packet)).split(".").pop();
packetHandler.dispatch(function(h) {
var method = Reflect.field(h, name);
if (method != null && Reflect.isFunction(method)) {
Reflect.callMethod(h, method, [packet]);
} else {
h.onPacket(packet);
}
});
}
public function connect():Promise<IConnection<O, I>> {
throw "Not implemented";
}
public function disconnect():Void {
throw "Not implemented";
}
public function pushData(bytes:Bytes):Void {
queue.addBytes(bytes);
while (queue.hasMsg()) {
var packet:I = queue.popMsg();
packetHandler.emit(packet);
}
}
public function send(packet:O):Void {
#if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
}
}

View File

@@ -6,37 +6,19 @@ import promhx.Promise;
import protohx.Message;
interface IConnection {
enum ConnectionEvent {
CONNECTED;
DISCONNECTED;
ERROR(error:Dynamic);
}
interface IConnection<O:Message, I:Message> {
public var connected(default, null):Bool;
public var handler(default, default):Signal1<IConnectionHandler>;
public var packetHandler(default, default):Signal1<IPacketHandler>;
public var handler(default, null):Signal1<ConnectionEvent>;
public var packetHandler(default, null):Signal1<I>;
public var builder(default, null):IPacketBuilder;
public function connect():Promise<IConnection>;
public function connect():Promise<IConnection<O, I>>;
public function disconnect():Void;
public function send(packet:Message):Void;
public function send(packet:O):Void;
public function pushData(bytes:Bytes):Void;
public function receive(packet:Message):Void;
}
interface IConnectionHandler {
public function onConnected():Void;
public function onDisconnected():Void;
public function onError(error:Dynamic):Void;
}
interface IPacketHandler {
public function onPacket(packet:Message):Void;
}
typedef PacketMeta = {
var family:Int;
var id:Int;
}
interface IPacketBuilder {
public function packetMeta(packet:Message):PacketMeta;
public function buildPacket(meta:PacketMeta):Message;
}

View File

@@ -1,80 +1,75 @@
package ru.m.connect;
import ru.m.connect.IConnection.IPacketBuilder;
import protohx.Message;
import haxe.io.BytesInput;
import haxe.io.BytesBuffer;
import haxe.io.Bytes;
import haxe.io.BytesBuffer;
import haxe.io.BytesInput;
import protohx.Message;
class PacketQueue {
public static inline var HEADER_SIZE:Int = 4;
class PacketQueue<P:Message> {
private var builder:IPacketBuilder;
private var bytesBuff:Bytes;
private var msgs:List<Message>;
private var packetClass:Class<P>;
private var bytesBuff:Bytes;
private var msgs:List<P>;
public function new(builder:IPacketBuilder) {
this.builder = builder;
msgs = new List<Message>();
}
public inline function hasMsg():Bool {
return !msgs.isEmpty();
}
public inline function popMsg():Message {
return msgs.pop();
}
public inline function addMsg(msg:Message):Void {
msgs.add(msg);
}
public function addBytes(bytes:Bytes) {
if (bytes == null) {
return;
public function new(packetClass:Class<P>) {
this.packetClass = packetClass;
msgs = new List<P>();
}
if (bytesBuff == null) {
bytesBuff = bytes;
} else {
var buffer = new BytesBuffer();
buffer.add(bytesBuff);
buffer.add(bytes);
bytesBuff = buffer.getBytes();
public inline function hasMsg():Bool {
return !msgs.isEmpty();
}
if (bytesBuff == null || bytesBuff.length < HEADER_SIZE) {
return;
public inline function popMsg():P {
return msgs.pop();
}
var available = bytesBuff.length;
var bi = new BytesInput(bytesBuff);
bi.bigEndian = false;
while (available >= HEADER_SIZE) {
var family = bi.readByte();
var id = bi.readByte();
var packetSize = bi.readUInt16();
available -= HEADER_SIZE;
if (packetSize <= available) {
available -= packetSize;
var msgBytes = bi.read(packetSize);
var packet = builder.buildPacket({family:family, id:id});
packet.mergeFrom(msgBytes);
addMsg(packet);
} else {
available += HEADER_SIZE;
break;
}
public inline function addMsg(msg:P):Void {
msgs.add(msg);
}
if (available == 0) {
bytesBuff = null;
} else if (available > 0) {
if (bytesBuff.length != available) {
var pos = bytesBuff.length - available;
bytesBuff = bytesBuff.sub(pos, available);
}
} else {
throw "Wrong available: " + available;
public function addBytes(bytes:Bytes):Void {
if (bytes == null) {
return;
}
if (bytesBuff == null) {
bytesBuff = bytes;
} else {
var buffer = new BytesBuffer();
buffer.add(bytesBuff);
buffer.add(bytes);
bytesBuff = buffer.getBytes();
}
if (bytesBuff == null || bytesBuff.length < 1) {
return;
}
var available = bytesBuff.length;
var bi = new BytesInput(bytesBuff);
bi.bigEndian = false;
while (available > 0) {
var packetSize = bi.readUInt16();
available -= 1;
if (packetSize <= available) {
var msgBytes = bi.read(packetSize);
var packet:P = Type.createInstance(packetClass, []);
packet.mergeFrom(msgBytes);
addMsg(packet);
available -= packetSize;
} else {
break;
}
}
if (available == 0) {
bytesBuff = null;
} else if (available > 0) {
if (bytesBuff.length != available) {
var pos = bytesBuff.length - available;
bytesBuff = bytesBuff.sub(pos, available);
}
} else {
throw "Wrong available: " + available;
}
}
}
}

View File

@@ -1,6 +1,7 @@
package ru.m.connect.flash;
import ru.m.connect.IConnection.IConnectionHandler;
import ru.m.connect.IConnection.ConnectionEvent;
import promhx.Promise;
import flash.utils.Endian;
import haxe.io.BytesOutput;
import protohx.Message;
@@ -12,76 +13,77 @@ import flash.events.SecurityErrorEvent;
import flash.events.IOErrorEvent;
import flash.net.Socket;
class FlashConnection extends BaseConnection {
private var host:String;
private var port:Int;
private var socket:Socket;
class FlashConnection<O:Message, I:Message> extends BaseConnection<O, I> {
public function new(host:String, port:Int) {
super();
this.host = host;
this.port = port;
connected = false;
socket = new Socket();
socket.addEventListener(IOErrorEvent.IO_ERROR, onError);
socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onError);
socket.addEventListener(Event.CLOSE, onClose);
socket.addEventListener(Event.CONNECT, onConnect);
socket.addEventListener(ProgressEvent.SOCKET_DATA, onSocketData);
socket.endian = Endian.LITTLE_ENDIAN;
}
private var host:String;
private var port:Int;
private var socket:Socket;
override public function connect():Void {
socket.connect(host, port);
}
override public function disconnect():Void {
if (socket.connected) {
socket.close();
connected = false;
handler.dispatch(function(h) h.onDisconnected());
public function new(host:String, port:Int, i:Class<I>) {
super(i);
this.host = host;
this.port = port;
connected = false;
socket = new Socket();
socket.addEventListener(IOErrorEvent.IO_ERROR, onError);
socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onError);
socket.addEventListener(Event.CLOSE, onClose);
socket.addEventListener(Event.CONNECT, onConnect);
socket.addEventListener(ProgressEvent.SOCKET_DATA, onSocketData);
socket.endian = Endian.LITTLE_ENDIAN;
}
}
private function onError(event:ErrorEvent):Void {
socket.close();
connected = false;
handler.dispatch(function(h) h.onError(event));
}
private function onConnect(_):Void {
connected = true;
handler.dispatch(function(h) h.onConnected());
}
private function onClose(_):Void {
socket.close();
connected = false;
handler.dispatch(function(h) h.onDisconnected());
}
private function onSocketData(_):Void {
try {
var b = new flash.utils.ByteArray();
socket.readBytes(b);
var bs = Bytes.ofData(cast b);
pushData(bs);
} catch (error:Dynamic) {
handler.dispatch(function(h) h.onError(error));
override public function connect():Promise<IConnection<O, I>> {
socket.connect(host, port);
return connectDeferred.promise();
}
}
override public function send(packet:Message):Void {
super.send(packet);
var meta = builder.packetMeta(packet);
socket.writeByte(meta.family);
socket.writeByte(meta.id);
var out = new BytesOutput();
packet.writeTo(out);
var bytes = out.getBytes();
socket.writeShort(bytes.length);
socket.writeBytes(cast bytes.getData());
socket.flush();
}
override public function disconnect():Void {
if (socket.connected) {
socket.close();
//connected = false;
//handler.emit(ConnectionEvent.DISCONNECTED);
}
}
private function onError(event:ErrorEvent):Void {
socket.close();
connected = false;
handler.emit(ConnectionEvent.ERROR(event));
connectDeferred.throwError(event);
}
private function onConnect(_):Void {
connected = true;
handler.emit(ConnectionEvent.CONNECTED);
connectDeferred.resolve(this);
}
private function onClose(_):Void {
socket.close();
connected = false;
handler.emit(ConnectionEvent.DISCONNECTED);
}
private function onSocketData(_):Void {
try {
var b = new flash.utils.ByteArray();
socket.readBytes(b);
var bs = Bytes.ofData(cast b);
pushData(bs);
} catch (error:Dynamic) {
handler.emit(ConnectionEvent.ERROR(error));
}
}
override public function send(packet:O):Void {
super.send(packet);
var out = new BytesOutput();
packet.writeTo(out);
var bytes = out.getBytes();
socket.writeShort(bytes.length);
socket.writeBytes(cast bytes.getData());
socket.flush();
}
}