Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/infuse_iot/rpc_wrappers/coap_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ class request(ctypes.LittleEndianStructure):
self.resource,
)

def request_json(self):
UINT32_MAX = 2**32 - 1

return {
"server_address": self.server.decode("utf-8"),
"server_port": str(self.port),
"block_timeout_ms": "2000",
"action": self.action.name,
"resource_len": str(UINT32_MAX),
"resource_crc": str(UINT32_MAX),
"resource": self.resource.decode("utf-8"),
}

def handle_response(self, return_code, response):
if return_code != 0:
print(f"Failed to download file ({os.strerror(-return_code)})")
Expand Down
2 changes: 1 addition & 1 deletion src/infuse_iot/rpc_wrappers/data_logger_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def request_struct(self):
return self.request(self.logger)

def request_json(self):
return {"logger": str(self.logger.value)}
return {"logger": self.logger.name}

def handle_response(self, return_code, response):
if return_code != 0:
Expand Down
52 changes: 35 additions & 17 deletions src/infuse_iot/tools/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from infuse_iot import rpc
from infuse_iot.commands import InfuseCommand, InfuseRpcCommand
from infuse_iot.common import InfuseID, InfuseType
from infuse_iot.epacket.packet import PacketOutput
from infuse_iot.epacket.packet import PacketOutput, PacketReceived
from infuse_iot.socket_comms import (
ClientNotificationConnectionDropped,
ClientNotificationEpacketReceived,
Expand Down Expand Up @@ -64,21 +64,40 @@ def __init__(self, args: argparse.Namespace):
else:
self._id = args.id

def _wait_data_ack(self):
while rsp := self._client.receive():
def _finalise_command(self, rpc_rsp: PacketReceived):
# Convert response bytes back to struct form
rsp_header = rpc.ResponseHeader.from_buffer_copy(rpc_rsp.payload)
rsp_payload = rpc_rsp.payload[ctypes.sizeof(rpc.ResponseHeader) :]
rsp_data = self._command.response.from_buffer_copy(rsp_payload) # type: ignore
# Handle the response
print(f"INFUSE ID: {rpc_rsp.route[0].infuse_id:016x}")
self._command.handle_response(rsp_header.return_code, rsp_data)

def _wait_data_ack(self) -> PacketReceived:
while True:
rsp = self._client.receive()
if rsp is None:
continue
if not isinstance(rsp, ClientNotificationEpacketReceived):
continue
if rsp.epacket.ptype != InfuseType.RPC_DATA_ACK:
if rsp.epacket.ptype == InfuseType.RPC_RSP:
rsp_header = rpc.ResponseHeader.from_buffer_copy(rsp.epacket.payload)
if rsp_header.request_id == self._request_id:
return rsp.epacket
elif rsp.epacket.ptype != InfuseType.RPC_DATA_ACK:
continue
data_ack = rpc.DataAck.from_buffer_copy(rsp.epacket.payload)
# Response to the request we sent
if data_ack.request_id != self._request_id:
continue
break
return rsp.epacket

def _wait_rpc_rsp(self):
def _wait_rpc_rsp(self) -> PacketReceived:
# Wait for responses
while rsp := self._client.receive():
while True:
rsp = self._client.receive()
if rsp is None:
continue
if not isinstance(rsp, ClientNotificationEpacketReceived):
continue
# RPC response packet
Expand All @@ -88,13 +107,7 @@ def _wait_rpc_rsp(self):
# Response to the request we sent
if rsp_header.request_id != self._request_id:
continue
# Convert response bytes back to struct form
rsp_payload = rsp.epacket.payload[ctypes.sizeof(rpc.ResponseHeader) :]
rsp_data = self._command.response.from_buffer_copy(rsp_payload) # type: ignore
# Handle the response
print(f"INFUSE ID: {rsp.epacket.route[0].infuse_id:016x}")
self._command.handle_response(rsp_header.return_code, rsp_data)
break
return rsp.epacket

def _run_data_send_cmd(self):
ack_period = 1
Expand All @@ -114,7 +127,10 @@ def _run_data_send_cmd(self):
self._client.send(req)

# Wait for initial ACK
self._wait_data_ack()
recv = self._wait_data_ack()
if recv.ptype == InfuseType.RPC_RSP:
self._finalise_command(recv)
return

# Send data payloads with maximum interface size
ack_cnt = -ack_period
Expand Down Expand Up @@ -146,7 +162,8 @@ def _run_data_send_cmd(self):
data = data[size:]
self._command.data_progress_cb(offset)

self._wait_rpc_rsp()
recv = self._wait_rpc_rsp()
self._finalise_command(recv)

def _run_data_recv_cmd(self):
header = rpc.RequestHeader(self._request_id, self._command.COMMAND_ID) # type: ignore
Expand Down Expand Up @@ -202,7 +219,8 @@ def _run_standard_cmd(self):
)
req = GatewayRequestEpacketSend(pkt)
self._client.send(req)
self._wait_rpc_rsp()
recv = self._wait_rpc_rsp()
self._finalise_command(recv)

def run(self):
try:
Expand Down
10 changes: 6 additions & 4 deletions src/infuse_iot/tools/rpc_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def add_parser(cls, parser):
subparser = parser.add_subparsers(title="commands", metavar="<command>", required=True)

parser_queue = subparser.add_parser("queue", help="Queue a RPC to be sent")
parser_queue.set_defaults(action="queue")
parser_queue.set_defaults(_tool_action="queue")
parser_queue.add_argument("--id", required=True, type=lambda x: int(x, 0), help="Infuse ID to run command on")
parser_queue.add_argument("--timeout", type=int, default=600, help="Timeout to send command in seconds")
command_list_parser = parser_queue.add_subparsers(title="commands", metavar="<command>", required=True)
Expand All @@ -52,7 +52,7 @@ def add_parser(cls, parser):
cmd_cls.add_parser(cmd_parser)

parser_query = subparser.add_parser("query", help="Query the state of a previously queued RPC")
parser_query.set_defaults(action="query")
parser_query.set_defaults(_tool_action="query")
parser_query.add_argument("--id", required=True, type=str, help="RPC ID from `infuse rpc_cloud queue`")

def __init__(self, args: argparse.Namespace):
Expand Down Expand Up @@ -92,7 +92,9 @@ def run(self):
with Client(base_url="https://api.infuse-iot.com").with_headers(
{"x-api-key": f"Bearer {get_api_key()}"}
) as client:
if self._args.action == "queue":
if self._args._tool_action == "queue":
self.queue(client)
elif self._args.action == "query":
elif self._args._tool_action == "query":
self.query(client)
else:
raise NotImplementedError(f"Unknown action {self._args._tool_action}")
Loading