-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathss_socket.py
More file actions
46 lines (36 loc) · 1.18 KB
/
Copy pathss_socket.py
File metadata and controls
46 lines (36 loc) · 1.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import socket
import struct
BUF_SIZE = 4096
class Message():
def __init__(self, mtype=-1, payload=b''):
self.length = len(payload)
self.type = mtype
self.payload = payload
def buf_out(self):
return struct.pack('<hB', self.length, self.type) + self.payload
def buf_in(self, data):
(self.length, self.type) = struct.unpack('<hB', data[:3])
self.payload = data[3:]
class Socket():
"""
A specialized socket wrapper which does some stuff for us for communicating with the game server
"""
def __init__(self):
self.buf = b''
async def connect(self, host, port):
self.__reader, self.__writer = await asyncio.open_connection(host, port)
async def recv(self):
m = Message()
m.length = struct.unpack('<h', await self.__reader.readexactly(2))[0]
m.type = struct.unpack('<B', await self.__reader.readexactly(1))[0]
m.payload = await self.__reader.readexactly(m.length)
return m
async def send(self, message):
self.__writer.write(message.buf_out())
await self.__writer.drain()
async def send_raw(self, buf):
self.__writer.write(buf)
await self.__writer.drain()
def close(self):
self.__writer.close()