[common] binary message in ws connection

This commit is contained in:
2019-05-23 11:43:35 +03:00
parent 4b426183e8
commit 864d5f3335
9 changed files with 60 additions and 53 deletions

View File

@@ -0,0 +1,20 @@
package ru.m.connect;
import haxe.io.Bytes;
import haxe.io.BytesOutput;
import protohx.Message;
class PacketUtil {
public static function fromBytes<P:Message>(bytes:Bytes, factory:Class<P>):P {
var packet:P = Type.createInstance(factory, []);
packet.mergeFrom(bytes);
return packet;
}
public static function toBytes<P:Message>(packet:P):Bytes {
var out = new BytesOutput();
packet.writeTo(out);
return out.getBytes();
}
}

View File

@@ -1,7 +1,6 @@
package ru.m.connect.desktop; package ru.m.connect.desktop;
import cpp.vm.Thread; import cpp.vm.Thread;
import haxe.io.BytesOutput;
import haxe.Timer; import haxe.Timer;
import promhx.Deferred; import promhx.Deferred;
import promhx.Promise; import promhx.Promise;
@@ -58,8 +57,7 @@ class DesktopConnection<O:Message, I:Message> extends BaseConnection<O, I> {
socket.waitForRead(); socket.waitForRead();
var size = socket.input.readUInt16(); var size = socket.input.readUInt16();
var data = socket.input.read(size); var data = socket.input.read(size);
var packet:I = Type.createInstance(queue.packetClass, []); var packet:I = PacketUtil.fromBytes(data, queue.packetClass);
packet.mergeFrom(data);
receiveHandler.emit(packet); receiveHandler.emit(packet);
} }
} catch (error:Dynamic) { } catch (error:Dynamic) {
@@ -68,9 +66,7 @@ class DesktopConnection<O:Message, I:Message> extends BaseConnection<O, I> {
} }
private function _send(packet:O):Void { private function _send(packet:O):Void {
var out = new BytesOutput(); var bytes = PacketUtil.toBytes(packet);
packet.writeTo(out);
var bytes = out.getBytes();
socket.output.writeUInt16(bytes.length); socket.output.writeUInt16(bytes.length);
socket.output.write(bytes); socket.output.write(bytes);
socket.output.flush(); socket.output.flush();

View File

@@ -4,7 +4,6 @@ import ru.m.connect.IConnection.ConnectionEvent;
import promhx.Promise; import promhx.Promise;
import protohx.Message; import protohx.Message;
class FakeConnection<O:Message, I:Message> extends BaseConnection<O, I> { class FakeConnection<O:Message, I:Message> extends BaseConnection<O, I> {
override public function connect():Promise<IConnection<O, I>> { override public function connect():Promise<IConnection<O, I>> {

View File

@@ -8,7 +8,6 @@ import flash.events.SecurityErrorEvent;
import flash.net.Socket; import flash.net.Socket;
import flash.utils.Endian; import flash.utils.Endian;
import haxe.io.Bytes; import haxe.io.Bytes;
import haxe.io.BytesOutput;
import promhx.Deferred; import promhx.Deferred;
import promhx.Promise; import promhx.Promise;
import protohx.Message; import protohx.Message;
@@ -76,21 +75,19 @@ class FlashConnection<O:Message, I:Message> extends BaseConnection<O, I> {
private function onSocketData(_):Void { private function onSocketData(_):Void {
try { try {
var b = new flash.utils.ByteArray(); var data = new flash.utils.ByteArray();
socket.readBytes(b); socket.readBytes(data);
var bs = Bytes.ofData(cast b); var bytes = Bytes.ofData(data);
pushData(bs); pushData(bytes);
} catch (error:Dynamic) { } catch (error:Dynamic) {
handler.emit(ConnectionEvent.ERROR(error)); handler.emit(ConnectionEvent.ERROR(error));
} }
} }
private function _send(packet:O):Void { private function _send(packet:O):Void {
var out = new BytesOutput(); var bytes = PacketUtil.toBytes(packet);
packet.writeTo(out);
var bytes = out.getBytes();
socket.writeShort(bytes.length); socket.writeShort(bytes.length);
socket.writeBytes(cast bytes.getData()); socket.writeBytes(bytes.getData());
socket.flush(); socket.flush();
} }
} }

View File

@@ -1,11 +1,12 @@
package ru.m.connect.js; package ru.m.connect.js;
import haxe.io.Bytes;
import js.Browser; import js.Browser;
import js.html.BinaryType;
import js.html.WebSocket; import js.html.WebSocket;
import promhx.Deferred; import promhx.Deferred;
import promhx.Promise; import promhx.Promise;
import protohx.Message; import protohx.Message;
import ru.m.Base64;
import ru.m.connect.IConnection; import ru.m.connect.IConnection;
class JsConnection<O:Message, I:Message> extends BaseConnection<O, I> { class JsConnection<O:Message, I:Message> extends BaseConnection<O, I> {
@@ -32,8 +33,8 @@ class JsConnection<O:Message, I:Message> extends BaseConnection<O, I> {
override public function connect():Promise<IConnection<O, I>> { override public function connect():Promise<IConnection<O, I>> {
var self = this; var self = this;
var decodeBytes = Base64.decodeBase64;
socket = buildSocket(host, port); socket = buildSocket(host, port);
socket.binaryType = BinaryType.ARRAYBUFFER;
socket.onopen = this.onConnect; socket.onopen = this.onConnect;
socket.onclose = this.onClose; socket.onclose = this.onClose;
socket.onerror = this.onError; socket.onerror = this.onError;
@@ -67,8 +68,8 @@ class JsConnection<O:Message, I:Message> extends BaseConnection<O, I> {
private function onSocketData(event:Dynamic):Void { private function onSocketData(event:Dynamic):Void {
try { try {
var data:String = event.data; var bytes = Bytes.ofData(event.data);
var packet:I = WebSocketTools.string2packet(data, queue.packetClass); var packet:I = PacketUtil.fromBytes(bytes, queue.packetClass);
receive(packet); receive(packet);
} catch (error:Dynamic) { } catch (error:Dynamic) {
handler.emit(ConnectionEvent.ERROR(event)); handler.emit(ConnectionEvent.ERROR(event));
@@ -77,6 +78,7 @@ class JsConnection<O:Message, I:Message> extends BaseConnection<O, I> {
override public function send(packet:O):Void { override public function send(packet:O):Void {
super.send(packet); super.send(packet);
socket.send(WebSocketTools.packet2string(packet)); var bytes = PacketUtil.toBytes(packet);
socket.send(bytes.getData());
} }
} }

View File

@@ -1,6 +1,5 @@
package ru.m.connect.neko; package ru.m.connect.neko;
import haxe.io.BytesOutput;
import protohx.Message; import protohx.Message;
import sys.net.Socket; import sys.net.Socket;
@@ -19,9 +18,7 @@ class NekoConnection<O:Message, I:Message> extends BaseConnection<O, I> {
private function _send(packet:O):Void { private function _send(packet:O):Void {
try { try {
var b = new BytesOutput(); var bytes = PacketUtil.toBytes(packet);
packet.writeTo(b);
var bytes = b.getBytes();
socket.output.writeUInt16(bytes.length); socket.output.writeUInt16(bytes.length);
socket.output.write(bytes); socket.output.write(bytes);
socket.output.flush(); socket.output.flush();

View File

@@ -3,10 +3,11 @@ package ru.m.connect.neko;
import haxe.crypto.BaseCode; import haxe.crypto.BaseCode;
import haxe.crypto.Sha1; import haxe.crypto.Sha1;
import haxe.io.Bytes; import haxe.io.Bytes;
import haxe.io.BytesBuffer;
import protohx.Message; import protohx.Message;
import sys.net.Socket; import sys.net.Socket;
class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> { class NekoWSConnection<O:Message, I:Message> extends NekoConnection<O, I> {
private var opened:Bool; private var opened:Bool;
@@ -16,12 +17,8 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
} }
override private function _send(packet:O):Void { override private function _send(packet:O):Void {
try { var data = PacketUtil.toBytes(packet);
var data = WebSocketTools.packet2string(packet);
writeData(data, socket); writeData(data, socket);
} catch (error:Dynamic) {
L.e('Proto', 'Error send packet: ${packet}', error);
}
} }
override public function pushData(bytes:Bytes):Void { override public function pushData(bytes:Bytes):Void {
@@ -36,13 +33,12 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
} else { } else {
var data = parseData(bytes); var data = parseData(bytes);
if (data != null) { if (data != null) {
var packet:I = WebSocketTools.string2packet(data, queue.packetClass); var packet:I = PacketUtil.fromBytes(data, queue.packetClass);
receive(packet); receive(packet);
} }
} }
} }
private function encodeBase64(content:String):String { private function encodeBase64(content:String):String {
var suffix = switch (content.length % 3) var suffix = switch (content.length % 3)
{ {
@@ -73,8 +69,8 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
socket.output.writeString(s); socket.output.writeString(s);
} }
private function writeData(data:String, socket:sys.net.Socket, isServer = true):Void { private function writeData(data:Bytes, socket:sys.net.Socket, isServer = true):Void {
socket.output.writeByte(0x81); socket.output.writeByte(130);
var len = 0; var len = 0;
if (data.length < 126) len = data.length; if (data.length < 126) len = data.length;
@@ -97,7 +93,7 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
} }
if (isServer) { if (isServer) {
socket.output.writeString(data); socket.output.writeBytes(data, 0, data.length);
} }
else { else {
var mask = [ Std.random(256), Std.random(256), Std.random(256), Std.random(256) ]; var mask = [ Std.random(256), Std.random(256), Std.random(256), Std.random(256) ];
@@ -105,29 +101,29 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
socket.output.writeByte(mask[1]); socket.output.writeByte(mask[1]);
socket.output.writeByte(mask[2]); socket.output.writeByte(mask[2]);
socket.output.writeByte(mask[3]); socket.output.writeByte(mask[3]);
var maskedData = new StringBuf(); var maskedData = new BytesBuffer();
for (i in 0...data.length) { for (i in 0...data.length) {
maskedData.addChar(data.charCodeAt(i) ^ mask[i % 4]); maskedData.addByte(data.get(i) ^ mask[i % 4]);
} }
socket.output.writeString(maskedData.toString()); socket.output.writeBytes(maskedData.getBytes(), 0, maskedData.length);
} }
} }
private function parseData(bytes:Bytes):String { private function parseData(bytes:Bytes):Bytes {
var p = 0; var p = 0;
var opcode = bytes.get(p++); var opcode = bytes.get(p++);
if (opcode == 0x00) { if (opcode == 0x00) {
var s = ""; var data = new BytesBuffer();
var b:Int; var b:Int;
while ((b = bytes.get(p++)) != 0xFF) { while ((b = bytes.get(p++)) != 0xFF) {
s += String.fromCharCode(b); data.addByte(b);
} }
return s; return data.getBytes();
} }
// 0x81 = fin & text // 130 = binary data
if (opcode == 0x81) { if (opcode == 130) {
var len = bytes.get(p++); var len = bytes.get(p++);
// mask // mask
@@ -158,13 +154,13 @@ class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
//Lib.println("mask = " + mask); //Lib.println("mask = " + mask);
var data = new StringBuf(); var data = new BytesBuffer();
for (i in 0...len) { for (i in 0...len) {
data.addChar(bytes.get(p++) ^ mask[i % 4]); data.addByte(bytes.get(p++) ^ mask[i % 4]);
} }
//Lib.println("readed = " + data.toString()); //Lib.println("readed = " + data.toString());
return data.toString(); return data.getBytes();
} else { } else {
throw "Expected masked data."; throw "Expected masked data.";
} }

View File

@@ -1,10 +1,10 @@
package ru.m.tankz.server.session; package ru.m.tankz.server.session;
import ru.m.connect.neko.NekoWebConnection;
import haxe.io.Bytes; import haxe.io.Bytes;
import protohx.Message; import protohx.Message;
import ru.m.connect.IConnection; import ru.m.connect.IConnection;
import ru.m.connect.neko.NekoConnection; import ru.m.connect.neko.NekoConnection;
import ru.m.connect.neko.NekoWSConnection;
import sys.net.Socket; import sys.net.Socket;
class ProtoSession<O:Message, I:Message> implements ISession { class ProtoSession<O:Message, I:Message> implements ISession {
@@ -37,7 +37,7 @@ class ProtoSession<O:Message, I:Message> implements ISession {
return null; return null;
} }
if (StringTools.startsWith(str, "GET")) { if (StringTools.startsWith(str, "GET")) {
connection = new NekoWebConnection<O, I>(socket, request); connection = new NekoWSConnection<O, I>(socket, request);
} else { } else {
connection = new NekoConnection<O, I>(socket, request); connection = new NekoConnection<O, I>(socket, request);
} }

View File

@@ -7,7 +7,7 @@ import com.hurlant.crypto.prng.Random;
import haxe.io.Bytes; import haxe.io.Bytes;
import ru.m.connect.IConnection; import ru.m.connect.IConnection;
import ru.m.connect.neko.NekoConnection; import ru.m.connect.neko.NekoConnection;
import ru.m.connect.neko.NekoWebConnection; import ru.m.connect.neko.NekoWSConnection;
import ru.m.tankz.proto.core.UserProto; import ru.m.tankz.proto.core.UserProto;
import ru.m.tankz.proto.pack.CreateGameRequest; import ru.m.tankz.proto.pack.CreateGameRequest;
import ru.m.tankz.proto.pack.CreateGameResponse; import ru.m.tankz.proto.pack.CreateGameResponse;
@@ -72,7 +72,7 @@ class _Session {
return; return;
} }
if (StringTools.startsWith(str, "GET")) { if (StringTools.startsWith(str, "GET")) {
connection = new NekoWebConnection<Response, Request>(socket, Request); connection = new NekoWSConnection<Response, Request>(socket, Request);
} else { } else {
connection = new NekoConnection<Response, Request>(socket, Request); connection = new NekoConnection<Response, Request>(socket, Request);
} }