[server] update proto

This commit is contained in:
2018-02-28 22:25:29 +03:00
parent 34e5ac2b9e
commit 634f5ad2d0
7 changed files with 238 additions and 272 deletions

View File

@@ -24,7 +24,8 @@ const build = () => function build() {
'protohx', 'protohx',
'orm', 'orm',
'haxework:git', 'haxework:git',
'haxe-crypto' 'haxe-crypto',
'hxsignal'
], ],
cp: [ cp: [
'src/common/haxe', 'src/common/haxe',

View File

@@ -8,7 +8,8 @@ import protohx.Message;
class PacketQueue<P:Message> { class PacketQueue<P:Message> {
private var packetClass:Class<P>; public var packetClass(default, null):Class<P>;
private var bytesBuff:Bytes; private var bytesBuff:Bytes;
private var msgs:List<P>; private var msgs:List<P>;

View File

@@ -1,31 +1,27 @@
package ru.m.connect; package ru.m.connect;
import haxe.io.Bytes;
import haxe.io.BytesOutput; import haxe.io.BytesOutput;
import ru.m.connect.IConnection;
import protohx.Message; import protohx.Message;
class WebSocketTools { class WebSocketTools {
public static function packet2string(packet:Message, builder:IPacketBuilder):String { public static function packet2string(packet:Message):String {
var meta = builder.packetMeta(packet); var b = new BytesOutput();
var b = new BytesOutput(); packet.writeTo(b);
packet.writeTo(b); var data = b.getBytes();
var data = b.getBytes(); var res = new BytesOutput();
var res = new BytesOutput(); //res.writeUInt16(data.length);
res.writeByte(meta.family); res.write(data);
res.writeByte(meta.id); return Base64.encodeBase64(res.getBytes());
//res.writeUInt16(data.length); }
res.write(data);
return Base64.encodeBase64(res.getBytes());
}
public static function string2packet(data:String, builder:IPacketBuilder):Message { public static function string2packet<P:Message>(data:String, packetClass:Class<P>):P {
var bytes = Base64.decodeBase64(data); var bytes = Base64.decodeBase64(data);
var family = bytes.get(0); var family = bytes.get(0);
var id = bytes.get(1); var id = bytes.get(1);
var packet = builder.buildPacket({family:family, id:id}); var packet:P = Type.createInstance(packetClass, []);
packet.mergeFrom(bytes.sub(2, bytes.length - 2)); packet.mergeFrom(bytes.sub(2, bytes.length - 2));
return packet; return packet;
} }
} }

View File

@@ -2,37 +2,33 @@ package ru.m.connect.neko;
import haxe.io.BytesOutput; import haxe.io.BytesOutput;
import protohx.Message; import protohx.Message;
import haxe.io.Bytes;
import sys.net.Socket; import sys.net.Socket;
import ru.m.connect.IConnection;
class NekoConnection extends BaseConnection {
public var socket(default, null):Socket; class NekoConnection<O:Message, I:Message> extends BaseConnection<O, I> {
public function new(socket:Socket) { public var socket(default, null):Socket;
super();
this.socket = socket;
socket.setFastSend(true);
socket.output.bigEndian = false;
socket.input.bigEndian = false;
}
override public function send(packet:Message):Void { public function new(socket:Socket, i:Class<I>) {
super.send(packet); super(i);
try { this.socket = socket;
var meta = builder.packetMeta(packet); socket.setFastSend(true);
var b = new BytesOutput(); socket.output.bigEndian = false;
packet.writeTo(b); socket.input.bigEndian = false;
var bytes = b.getBytes(); }
socket.output.writeByte(meta.family);
socket.output.writeByte(meta.id); override public function send(packet:O):Void {
socket.output.writeUInt16(bytes.length); super.send(packet);
socket.output.write(bytes); try {
socket.output.flush(); var b = new BytesOutput();
} catch (e:Dynamic) { packet.writeTo(b);
trace("Error send packet: " + Type.getClassName(Type.getClass(packet))); var bytes = b.getBytes();
trace(e); socket.output.writeUInt16(bytes.length);
socket.output.write(bytes);
socket.output.flush();
} catch (e:Dynamic) {
trace("Error send packet: " + Type.getClassName(Type.getClass(packet)));
trace(e);
}
} }
}
} }

View File

@@ -5,197 +5,180 @@ import haxe.crypto.Sha1;
import protohx.Message; import protohx.Message;
import haxe.io.Bytes; import haxe.io.Bytes;
import sys.net.Socket; import sys.net.Socket;
import ru.m.connect.IConnection;
class NekoWebConnection extends NekoConnection {
private var opened:Bool;
public function new(socket:Socket) {
super(socket);
opened = false;
}
override public function send(packet:Message):Void {
#if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
try {
var data = WebSocketTools.packet2string(packet, builder);
writeData(data, socket);
} catch (e:Dynamic) {
trace(e);
}
}
override public function pushData(bytes:Bytes):Void {
if (!opened) {
var str:String = bytes.getString(0, bytes.length);
if (StringTools.startsWith(str, "GET")) {
var r = ~/Sec-WebSocket-Key:\s*([A-z0-9=+\/]+)/;
r.match(str);
opened = true;
sendServerHandShake(socket, r.matched(1));
}
} else {
var data = parseData(bytes);
if (data != null) {
var packet = WebSocketTools.string2packet(data, builder);
receive(packet);
}
}
}
private function encodeBase64(content:String):String { class NekoWebConnection<O:Message, I:Message> extends NekoConnection<O, I> {
var suffix = switch (content.length % 3)
{
case 2: "=";
case 1: "==";
default: "";
};
return BaseCode.encode(content, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/") + suffix;
}
private function hex2data(hex:String):String { private var opened:Bool;
var data = "";
for (i in 0...Std.int(hex.length / 2)) {
data += String.fromCharCode(Std.parseInt("0x" + hex.substr(i * 2, 2)));
}
return data;
}
private function sendServerHandShake(socket:sys.net.Socket, inpKey:String) { public function new(socket:Socket, i:Class<I>) {
var outKey = encodeBase64(hex2data(Sha1.encode(StringTools.trim(inpKey) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))); super(socket, i);
opened = false;
var s = "HTTP/1.1 101 Switching Protocols\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: " + outKey + "\r\n"
+ "\r\n";
socket.output.writeString(s);
}
private function writeData(data:String, socket:sys.net.Socket, isServer = true):Void {
socket.output.writeByte(0x81);
var len = 0;
if (data.length < 126) len = data.length;
else if (data.length < 65536) len = 126;
else len = 127;
socket.output.writeByte(len | (!isServer ? 0x80 : 0x00));
if (data.length >= 126)
{
if (data.length < 65536)
{
socket.output.writeByte((data.length >> 8) & 0xFF);
socket.output.writeByte(data.length & 0xFF);
}
else
{
socket.output.writeByte((data.length >> 24) & 0xFF);
socket.output.writeByte((data.length >> 16) & 0xFF);
socket.output.writeByte((data.length >> 8) & 0xFF);
socket.output.writeByte(data.length & 0xFF);
}
} }
if (isServer) override public function send(packet:O):Void {
{ #if proto_debug L.d("Send", Type.getClassName(Type.getClass(packet)).split(".").pop()); #end
socket.output.writeString(data); try {
} var data = WebSocketTools.packet2string(packet);
else writeData(data, socket);
{ } catch (e:Dynamic) {
var mask = [ Std.random(256), Std.random(256), Std.random(256), Std.random(256) ]; trace(e);
socket.output.writeByte(mask[0]);
socket.output.writeByte(mask[1]);
socket.output.writeByte(mask[2]);
socket.output.writeByte(mask[3]);
var maskedData = new StringBuf();
for (i in 0...data.length)
{
maskedData.addChar(data.charCodeAt(i) ^ mask[i % 4]);
}
socket.output.writeString(maskedData.toString());
}
}
private function parseData(bytes:Bytes):String {
var p = 0;
var opcode = bytes.get(p++);
if (opcode == 0x00)
{
var s = "";
var b : Int;
while ((b = bytes.get(p++)) != 0xFF)
{
s += String.fromCharCode(b);
}
return s;
}
if (opcode == 0x81) // 0x81 = fin & text
{
var len = bytes.get(p++);
if (len & 0x80 != 0) // mask
{
len &= 0x7F;
if (len == 126)
{
var b2 = bytes.get(p++);
var b3 = bytes.get(p++);
len = (b2 << 8) + b3;
}
else
if (len == 127)
{
var b2 = bytes.get(p++);
var b3 = bytes.get(p++);
var b4 = bytes.get(p++);
var b5 = bytes.get(p++);
len = (b2 << 24) + (b3 << 16) + (b4 << 8) + b5;
}
//Lib.println("len = " + len);
// direct array init not work corectly!
var mask = [];
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
//Lib.println("mask = " + mask);
var data = new StringBuf();
for (i in 0...len)
{
data.addChar(bytes.get(p++) ^ mask[i % 4]);
}
//Lib.println("readed = " + data.toString());
return data.toString();
}
else
{
throw "Expected masked data.";
} }
} }
if (opcode == 136) { override public function pushData(bytes:Bytes):Void {
//socket.close(); if (!opened) {
opened = false; var str:String = bytes.getString(0, bytes.length);
return null; if (StringTools.startsWith(str, "GET")) {
var r = ~/Sec-WebSocket-Key:\s*([A-z0-9=+\/]+)/;
r.match(str);
opened = true;
sendServerHandShake(socket, r.matched(1));
}
} else {
var data = parseData(bytes);
if (data != null) {
var packet:I = WebSocketTools.string2packet(data, queue.packetClass);
packetHandler.emit(packet);
}
}
} }
else
{ private function encodeBase64(content:String):String {
throw "Unsupported websocket opcode: " + opcode; var suffix = switch (content.length % 3)
{
case 2: "=";
case 1: "==";
default: "";
};
return BaseCode.encode(content, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/") + suffix;
}
private function hex2data(hex:String):String {
var data = "";
for (i in 0...Std.int(hex.length / 2)) {
data += String.fromCharCode(Std.parseInt("0x" + hex.substr(i * 2, 2)));
}
return data;
}
private function sendServerHandShake(socket:sys.net.Socket, inpKey:String) {
var outKey = encodeBase64(hex2data(Sha1.encode(StringTools.trim(inpKey) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")));
var s = "HTTP/1.1 101 Switching Protocols\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: " + outKey + "\r\n"
+ "\r\n";
socket.output.writeString(s);
}
private function writeData(data:String, socket:sys.net.Socket, isServer = true):Void {
socket.output.writeByte(0x81);
var len = 0;
if (data.length < 126) len = data.length;
else if (data.length < 65536) len = 126;
else len = 127;
socket.output.writeByte(len | (!isServer ? 0x80 : 0x00));
if (data.length >= 126) {
if (data.length < 65536) {
socket.output.writeByte((data.length >> 8) & 0xFF);
socket.output.writeByte(data.length & 0xFF);
}
else {
socket.output.writeByte((data.length >> 24) & 0xFF);
socket.output.writeByte((data.length >> 16) & 0xFF);
socket.output.writeByte((data.length >> 8) & 0xFF);
socket.output.writeByte(data.length & 0xFF);
}
}
if (isServer) {
socket.output.writeString(data);
}
else {
var mask = [ Std.random(256), Std.random(256), Std.random(256), Std.random(256) ];
socket.output.writeByte(mask[0]);
socket.output.writeByte(mask[1]);
socket.output.writeByte(mask[2]);
socket.output.writeByte(mask[3]);
var maskedData = new StringBuf();
for (i in 0...data.length) {
maskedData.addChar(data.charCodeAt(i) ^ mask[i % 4]);
}
socket.output.writeString(maskedData.toString());
}
}
private function parseData(bytes:Bytes):String {
var p = 0;
var opcode = bytes.get(p++);
if (opcode == 0x00) {
var s = "";
var b:Int;
while ((b = bytes.get(p++)) != 0xFF) {
s += String.fromCharCode(b);
}
return s;
}
// 0x81 = fin & text
if (opcode == 0x81) {
var len = bytes.get(p++);
// mask
if (len & 0x80 != 0) {
len &= 0x7F;
if (len == 126) {
var b2 = bytes.get(p++);
var b3 = bytes.get(p++);
len = (b2 << 8) + b3;
}
else if (len == 127) {
var b2 = bytes.get(p++);
var b3 = bytes.get(p++);
var b4 = bytes.get(p++);
var b5 = bytes.get(p++);
len = (b2 << 24) + (b3 << 16) + (b4 << 8) + b5;
}
//Lib.println("len = " + len);
// direct array init not work corectly!
var mask = [];
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
mask.push(bytes.get(p++));
//Lib.println("mask = " + mask);
var data = new StringBuf();
for (i in 0...len) {
data.addChar(bytes.get(p++) ^ mask[i % 4]);
}
//Lib.println("readed = " + data.toString());
return data.toString();
} else {
throw "Expected masked data.";
}
}
if (opcode == 136) {
//socket.close();
opened = false;
return null;
} else {
throw "Unsupported websocket opcode: " + opcode;
}
return null;
} }
return null;
}
} }

View File

@@ -1,13 +1,13 @@
package ru.m.tankz.server; package ru.m.tankz.server;
import haxework.log.SocketLogger; import ru.m.connect.IConnection.ConnectionEvent;
import ru.m.connect.IConnection.IPacketBuilder;
import haxework.provider.Provider;
import haxework.log.TraceLogger; import haxework.log.TraceLogger;
import ru.m.tankz.server.session.Session; import ru.m.tankz.server.session.Session;
import haxe.io.Bytes; import haxe.io.Bytes;
import sys.net.Socket; import sys.net.Socket;
import neko.net.ThreadServer; import neko.net.ThreadServer;
#if debug import haxework.log.SocketLogger; #end
class Server extends ThreadServer<Session, Bytes> { class Server extends ThreadServer<Session, Bytes> {
@@ -25,7 +25,7 @@ class Server extends ThreadServer<Session, Bytes> {
override public function clientDisconnected(session:Session) { override public function clientDisconnected(session:Session) {
L.d(TAG, "Client disconnected"); L.d(TAG, "Client disconnected");
session.onDisconnected(); session.connection.handler.emit(ConnectionEvent.DISCONNECTED);
} }
override public function readClientMessage(session:Session, buf:Bytes, pos:Int, len:Int) { override public function readClientMessage(session:Session, buf:Bytes, pos:Int, len:Int) {
@@ -43,7 +43,6 @@ class Server extends ThreadServer<Session, Bytes> {
#end #end
L.d(TAG, "Running"); L.d(TAG, "Running");
L.i(TAG, "Build: " + CompilationOption.get("build")); L.i(TAG, "Build: " + CompilationOption.get("build"));
Provider.set(IPacketBuilder, new PacketBuilder());
var wserver = new Server(); var wserver = new Server();
wserver.run("localhost", 5001); wserver.run("localhost", 5001);
} }

View File

@@ -1,32 +1,37 @@
package ru.m.tankz.server.session; package ru.m.tankz.server.session;
import ru.m.tankz.proto.pack.LoginRequest;
import ru.m.tankz.proto.pack.LoginResponse;
import haxe.io.Bytes; import haxe.io.Bytes;
import protohx.Message; import protohx.Message;
import ru.m.connect.IConnection; import ru.m.connect.IConnection;
import ru.m.connect.neko.NekoConnection; import ru.m.connect.neko.NekoConnection;
import ru.m.connect.neko.NekoWebConnection; import ru.m.connect.neko.NekoWebConnection;
import ru.m.tankz.proto.core.User; import ru.m.tankz.proto.core.User;
import ru.m.tankz.proto.pack.LoginRequest; import ru.m.tankz.proto.pack.Request;
import ru.m.tankz.proto.pack.LoginResponse; import ru.m.tankz.proto.pack.Response;
import sys.net.Socket; import sys.net.Socket;
import Type; import Type;
class Session implements IConnectionHandler implements IPacketHandler { typedef ServerConnection = IConnection<Response, Request>;
class Session {
public static var sessions:Map<Int, Session> = new Map<Int, Session>(); public static var sessions:Map<Int, Session> = new Map<Int, Session>();
public var user(default, null):User; public var user(default, null):User;
public var gameId(default, null):Int = -1; public var gameId(default, null):Int = -1;
private var connection(default, null):IConnection; public var connection(default, null):ServerConnection;
private var socket:Socket; private var socket:Socket;
public function new(socket:Socket) { public function new(socket:Socket) {
this.socket = socket; this.socket = socket;
} }
public function send(packet:Message):Void { public function send(packet:Response):Void {
connection.send(packet); connection.send(packet);
} }
@@ -36,47 +41,32 @@ class Session implements IConnectionHandler implements IPacketHandler {
} else { } else {
var str:String = bytes.getString(0, bytes.length); var str:String = bytes.getString(0, bytes.length);
if (StringTools.startsWith(str, "GET")) { if (StringTools.startsWith(str, "GET")) {
connection = new NekoWebConnection(socket); connection = new NekoWebConnection<Response, Request>(socket, Request);
} else { } else {
connection = new NekoConnection(socket); connection = new NekoConnection<Response, Request>(socket, Request);
} }
connection.handler.addListener(this); connection.handler.connect(onConnectionEvent);
connection.packetHandler.addListener(this); connection.packetHandler.connect(onRequest);
connection.pushData(bytes); connection.pushData(bytes);
} }
} }
/** private function onConnectionEvent(event:ConnectionEvent):Void {
* Connection handlers L.d('Session', '${this}, ${event}');
**/
public function onConnected():Void {
} }
public function onDisconnected():Void { public function onRequest(request:Request):Void {
/*if (person != null) { if (request.hasLogin()) {
var game = GameManager.byPersonId.get(person.id); connection.send(new Response().setLogin(login(request.login)));
if (game != null) game.leave(person); }
}*/
} }
public function onError(error:Dynamic):Void { private function login(request:LoginRequest):LoginResponse {
/*connection.send( return new LoginResponse().setUser(
new ErrorResponse()
.setCode(0)
.setMessage(Std.string(error))
);*/
}
/**
* Packets handlers
**/
public function onLoginRequest(packet:LoginRequest):Void {
connection.send(new LoginResponse().setUser(
new User() new User()
.setUuid(packet.uuid != null ? packet.uuid : 'xxx') .setUuid(request.uuid != null ? request.uuid : 'xxx')
.setName(packet.name) .setName(request.name)
)); );
} }
/*public function onPersonSelectRequest(packet:PersonSelectRequest):Void { /*public function onPersonSelectRequest(packet:PersonSelectRequest):Void {