Skip to content
This repository was archived by the owner on Aug 15, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions robot/board.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,22 @@
import socket
import time
from pathlib import Path
from typing import Iterable, Mapping, TypeVar, Union
from typing import ( # noqa: F401
Any,
Callable,
Iterable,
Iterator,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
overload,
)

T = TypeVar('T')
_Message = Mapping[str, Any]

LOGGER = logging.getLogger(__name__)

Expand All @@ -16,25 +31,25 @@ class Board:

def __init__(self, socket_path: Union[Path, str]) -> None:
self.socket_path = Path(socket_path)
self.socket = None
self.socket = None # type: Optional[socket.socket]
self.data = b''

self._connect()

@property
def serial(self):
def serial(self) -> str:
"""Serial number for the board."""
return self.socket_path.stem

def _greeting_response(self, data):
def _greeting_response(self, data: _Message) -> None:
"""
Handle the response to the greeting command.

NOTE: This is called on reconnect in addition to first connection
"""
pass

def _connect(self):
def _connect(self) -> None:
"""
Connect or reconnect to a socket.

Expand Down Expand Up @@ -63,7 +78,7 @@ def _get_lc_error(self) -> str:
path=self.socket_path,
)

def _socket_with_single_retry(self, handler):
def _socket_with_single_retry(self, handler: Callable[[], T]) -> T:
retryable_errors = (
socket.timeout,
BrokenPipeError,
Expand Down Expand Up @@ -99,7 +114,7 @@ def _socket_with_single_retry(self, handler):

raise original_exception

def _send(self, message, should_retry=True):
def _send(self, message: _Message, should_retry: bool=True) -> None:
"""
Send a message to robotd.

Expand All @@ -109,21 +124,21 @@ def _send(self, message, should_retry=True):

data = (json.dumps(message) + '\n').encode('utf-8')

def sendall():
def sendall() -> None:
self.socket.sendall(data)

if should_retry:
return self._socket_with_single_retry(sendall)
else:
return sendall()

def _recv_from_socket(self, size):
def _recv_from_socket(self, size: int) -> bytes:
data = self.socket.recv(size)
if data == b'':
raise BrokenPipeError()
return data

def _receive(self, should_retry=True):
def _receive(self, should_retry: bool=True) -> _Message:
"""
Receive a message from robotd.
"""
Expand All @@ -142,21 +157,25 @@ def _receive(self, should_retry=True):

return json.loads(line.decode('utf-8'))

def _send_and_receive(self, message, should_retry=True):
def _send_and_receive(
self,
message: _Message,
should_retry: bool=True,
) -> _Message:
"""
Send a message to robotd and wait for a response.
"""
self._send(message, should_retry)
return self._receive(should_retry)

def close(self):
def close(self) -> None:
"""
Close the the connection to the underlying robotd board.
"""
self.socket.detach()

def __str__(self):
return "{} - {}".format(self.__name__, self.serial)
def __str__(self) -> str:
return "{} - {}".format(self.__class__.__name__, self.serial)

__del__ = close

Expand Down
4 changes: 2 additions & 2 deletions robot/camera.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import socket
import time
from typing import List, overload
from typing import Any, List, Mapping, Sequence, overload

from robot.board import Board
from robot.markers import Marker
Expand Down Expand Up @@ -41,7 +41,7 @@ class Camera(Board):
"""

@staticmethod
def _see_to_results(data) -> ResultList:
def _see_to_results(data: Mapping[str, Sequence[Mapping[str, Any]]]) -> ResultList:
"""
Convert the data from ``robotd`` into a sorted of ``Marker``s.

Expand Down
4 changes: 2 additions & 2 deletions robot/markers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import math
import warnings
from typing import List, NamedTuple, NewType, Tuple
from typing import Any, List, Mapping, NamedTuple, NewType, Tuple

from robot.game_specific import TOKEN, WALL

Expand Down Expand Up @@ -102,7 +102,7 @@ def distance_metres(self) -> Metres:
class Marker:
"""A marker captured from a webcam image."""

def __init__(self, data):
def __init__(self, data: Mapping[str, Any]) -> None:
self._raw_data = data

@property
Expand Down
74 changes: 43 additions & 31 deletions robot/motor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import Union

from robot.board import Board

Power = Union[float, str]
_VoltageString = Union[float, str]


# BRAKE is set to 0 so setting the motors to 0 has exactly the same affect as
# setting the motors to BRAKE
BRAKE = 0
Expand All @@ -10,73 +16,79 @@ class MotorBoard(Board):
"""A motor board with two motor outputs."""

@staticmethod
def _string_to_power(voltage):
def _string_to_power(voltage: _VoltageString) -> Power:
"""
Converts a string to a Voltage value.

:param voltage:
:return:
"""
if voltage == 'coast':
return COAST
elif voltage == 'brake':
return BRAKE
elif -1 <= voltage <= 1:
return voltage
else:
raise ValueError(
"Incorrect voltage value, valid values: between -1 and 1, "
"'coast', or 'brake'",
)
if isinstance(voltage, str):
if voltage == 'coast':
return COAST
elif voltage == 'brake':
return BRAKE

if isinstance(voltage, float):
if -1 <= voltage <= 1:
return voltage

raise ValueError(
"Incorrect voltage value, valid values: between -1 and 1, "
"'coast', or 'brake'",
)

@staticmethod
def _power_to_string(voltage):
def _power_to_string(voltage: Power) -> _VoltageString:
"""
Converts a voltage value to the equivalent API value.

This is the reverse of inverse of ``MotorBoard.string_to_voltage``.
"""
if voltage is COAST:
return 'coast'
# Explicit or implicit stopping
elif voltage is BRAKE or voltage == 0:
return 'brake'
elif -1 <= voltage <= 1:
return voltage
else:
raise ValueError(
"Incorrect voltage value, valid values: between -1 and 1, "
"robot.COAST, or robot.BRAKE",
)
if isinstance(voltage, str):
if voltage is COAST:
return 'coast'

if isinstance(voltage, float):
# Explicit or implicit stopping
if voltage is BRAKE or voltage == 0:
return 'brake'
elif -1 <= voltage <= 1:
return voltage

raise ValueError(
"Incorrect voltage value, valid values: between -1 and 1, "
"robot.COAST, or robot.BRAKE",
)

@property
def m0(self) -> float:
def m0(self) -> Power:
"""
:return: The value of motor output 0.
"""
return self._get_status("m0")

@m0.setter
def m0(self, power: float):
def m0(self, power: Power) -> None:
self._update_motor("m0", power)

@property
def m1(self) -> float:
def m1(self) -> Power:
"""
:return: The value of motor output 1.
"""
return self._get_status("m1")

@m1.setter
def m1(self, power: float):
def m1(self, power: Power) -> None:
self._update_motor("m1", power)

def _get_status(self, motor_id: str):
def _get_status(self, motor_id: str) -> Power:
return self._string_to_power(
self._send_and_receive({})[motor_id],
)

def _update_motor(self, motor_id: str, voltage: float):
def _update_motor(self, motor_id: str, voltage: Power) -> None:
"""
Set the value of a motor output.

Expand Down
10 changes: 5 additions & 5 deletions robot/power.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ class PowerBoard(Board):
'uc': 523,
}

def power_on(self):
def power_on(self) -> None:
"""
Turn on power to all power board outputs.
"""

self._send_and_receive({'power': True})

def power_off(self):
def power_off(self) -> None:
"""
Turn off power to all power board outputs.
"""
self._send_and_receive({'power': False})

def set_start_led(self, value: bool):
def set_start_led(self, value: bool) -> None:
"""Set the state of the start LED."""
self._send_and_receive({'start-led': value})

Expand All @@ -45,7 +45,7 @@ def start_button_pressed(self) -> bool:
status = self._send_and_receive({})
return status["start-button"]

def wait_start(self):
def wait_start(self) -> None:
"""
Block until the start button is pressed.
"""
Expand All @@ -60,7 +60,7 @@ def wait_start(self):
self.set_start_led(False)
LOGGER.info("Starting user code.")

def buzz(self, duration, *, note=None, frequency=None):
def buzz(self, duration: float, *, note: str=None, frequency: int=None) -> None:
"""Enqueue a note to be played by the buzzer on the power board."""
if note is None and frequency is None:
raise ValueError("Either note or frequency must be provided")
Expand Down
10 changes: 5 additions & 5 deletions robot/robot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from pathlib import Path
from typing import List, Set, Type, Union # noqa: F401
from typing import Any, List, Set, Type, Union # noqa: F401

from robot.board import BoardList, TBoard
from robot.camera import Camera
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(
if wait_for_start_button:
self.power_board.wait_start()

def _assert_has_power_board(self):
def _assert_has_power_board(self) -> None:
power_boards = self.power_boards
if not power_boards:
raise RuntimeError('Cannot find Power Board!')
Expand Down Expand Up @@ -139,7 +139,7 @@ def _games(self) -> BoardList[GameState]:
return self._update_boards(self.known_gamestates, GameState, 'game')

@staticmethod
def _single_index(name, list_of_boards: BoardList[TBoard]) -> TBoard:
def _single_index(name: str, list_of_boards: BoardList[TBoard]) -> TBoard:
if list_of_boards:
return list_of_boards[0]
else:
Expand Down Expand Up @@ -211,7 +211,7 @@ def mode(self) -> GameMode:
"""
return self._game.mode

def close(self):
def close(self) -> None:
"""
Cleanup robot instance.
"""
Expand All @@ -230,5 +230,5 @@ def close(self):
# reanimate the boards (which isn't supported).
del board_group[:]

def __del__(self):
def __del__(self) -> None:
self.close()
Loading