Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yaqc/yaqc/_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def handshake(
self._write(request)
self._write_metadata()
self._write_method_name("")
self._write_terminator()
# read response
response = self._read(handshake_response)
self._read({"type": "map", "values": "bytes"})
Expand Down
144 changes: 68 additions & 76 deletions yaqd-core/yaqd_core/avrorpc/unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,79 @@


class Unpacker:
def __init__(self, protocol, file_like=None):
def __init__(self, protocol):
self.protocol = protocol
if file_like is None:
self._file = io.BytesIO()
else:
self._file = file_like
self.buf = io.BytesIO()

self._msg_bufs = asyncio.Queue()

self._buf = io.BytesIO()
self._remaining = 0

self.handshake_complete = False
self.handshake_response = None
self.meta = None
self.message_name = None
self.parameters = None
self.remaining = 0
self._handshake_response = None
self.named_types = {t["name"]: t for t in self.protocol.get("types", [])}
self.new_data = asyncio.Event()

def __aiter__(self):
return self

async def __anext__(self):
while True:
try:
self.new_data.clear()
if not self.handshake_complete and self.handshake_response is None:
handshake_request = await self._read_object(
handshake_request_schema
)
self.handshake_response = handle_handshake(
handshake_request, self.protocol
)
if self.handshake_response.match == "BOTH":
self.handshake_complete = True
if self.meta is None:
self.meta = await self._read_object(
{"type": "map", "values": "bytes"}
)
if self.message_name is None:
self.message_name = await self._read_object("string")
if self.message_name != "" and self.protocol["messages"][
self.message_name
].get("request", []):
await self._read_parameters(self.message_name)
msg = await self._msg_bufs.get()
print("Processing a message")
parameters = None
meta = self._read_object(
{"type": "map", "values": "bytes"},
msg,
)
message_name = self._read_object("string", msg)
if message_name != "" and self.protocol["messages"][message_name].get(
"request", []
):
parameters = self._read_parameters(message_name, msg)

ret = (
self.handshake_response,
self.meta,
self.message_name,
self.parameters,
)
self.handshake_response = None
self.meta = None
self.message_name = None
self.parameters = None
return ret
except (ValueError, struct.error):
await self.new_data.wait()
print(f"Identifieed {message_name=} with {parameters=}")
hs = None
if self._handshake_response:
hs = self._handshake_response
self._handshake_response = None
return (
hs,
meta,
message_name,
parameters,
)

def feed(self, data: bytes):
# Must support random access, if it does not, must be fed externally (e.g. TCP)
pos = self._file.tell()
self._file.seek(0, 2)
self._file.write(data)
self._file.seek(pos)
self.new_data.set()
self._buf.seek(0, 2)
while data:
print(data, self._remaining, len(data))
if self._remaining:
written = self._buf.write(data[: self._remaining])
data = data[written:]
self._remaining -= written
if data:
self._remaining = struct.unpack_from(">L", data[:4])[0]
data = data[4:]
if self._remaining == 0:
self._buf.seek(0)
if not self.handshake_complete:
print("Doing handshake")
handshake_request = self._read_object(
handshake_request_schema,
self._buf,
)
print("Read handshake request", handshake_request)
self._handshake_response = handle_handshake(
handshake_request, self.protocol
)
print(self._handshake_response.match)
if self._handshake_response.match == "BOTH":
print("Handshake complete")
self.handshake_complete = True

self._msg_bufs.put_nowait(self._buf)
self._buf = io.BytesIO()

async def _read_object(self, schema):
def _read_object(self, schema, buf):
schema = fastavro.parse_schema(
schema, expand=True, named_schemas=self.named_types
)
Expand All @@ -86,26 +93,11 @@ async def _read_object(self, schema):
)
except fastavro.schema.SchemaParseException:
pass # Must not have needed the second pass...
while True:
try:
self.buf.seek(0)
obj = fastavro.schemaless_reader(self.buf, schema)
self.buf = io.BytesIO()
return obj
except Exception:
self.buf.seek(0)
if not self.remaining:
self.remaining = struct.unpack_from(">L", self._file.read(4))[0]

self.buf.seek(0, 2)
num_read = self.buf.write(self._file.read(self.remaining))
self.remaining -= num_read
await asyncio.sleep(0)
obj = fastavro.schemaless_reader(buf, schema)
return obj

async def _read_parameters(self, name):
if self.parameters is None:
self.parameters = []
for param_schema in self.protocol["messages"][name]["request"][
len(self.parameters) :
]:
self.parameters.append(await self._read_object(param_schema["type"]))
def _read_parameters(self, name, buf):
parameters = []
for param_schema in self.protocol["messages"][name]["request"]:
parameters.append(self._read_object(param_schema["type"], buf))
return parameters