Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.52.0-beta.0] - Unreleased

### Added
- Pololu QMI driver can now be used also in UART mode. For this, use of special transport line "uart:<address>[:baudrate=...]" is needed.

### Changed

### Fixed
- `psutil._common` does not contain `snicaddr` namedtuple since version 7.2.0. It has been moved to `psutil._ntuples`. Fixed this in `proc.py`.

### Removed

Expand Down
194 changes: 128 additions & 66 deletions qmi/instruments/pololu/maestro.py

Large diffs are not rendered by default.

110 changes: 110 additions & 0 deletions qmi/instruments/pololu/qmi_uart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import time
from queue import Empty

import busio # type: ignore

from qmi.core.exceptions import QMI_InvalidOperationException


class QMI_Uart(busio.UART):
"""Extension of the class to make compatible with QMI_Transport calls.

The UART class opens a serial connection behind the scenes, see class binhoHostAdapter in binhoHostAdapter.py.
The default read and write timeouts are: timeout=0.025, write_timeout=0.05. These are not changeable through the
API, but would need a workaround through the 'serial' module interface.

The docstring of busio.readline says a 'line' is read until a _newline_ character, but in `uart.py` we can see that
``` while out != "\r":``` is used. Thus, the correct docstring should be that a line is read until a
_carriage return_ character.

Attributes:
READ_BYTE_BATCH_SIZE: The default size of the read buffer, based on <LSB><MSB>.
"""
READ_BYTE_BATCH_SIZE = 2

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_open = True

def _check_is_open(self) -> None:
"""Verify that the transport is open, otherwise raise exception."""
if not self._is_open:
raise QMI_InvalidOperationException(
f"Operation not allowed on closed transport {type(self).__name__}")

def open(self) -> None:
"""The instrument is opened already at the __init__. Note that if close() -> self.deinit() was called,
we cannot simply 're-open', but need to make a new instance to open the connection again.
"""
pass

def close(self) -> None:
"""Close the transport and de-initialize the device."""
self._check_is_open()
self.deinit()
self._is_open = False

def write(self, data: bytes) -> None:
"""Write a sequence of bytes to the transport.

When this method returns, all bytes are written to the transport
or queued to be written to the transport.

Parameters:
data: Bytes to write.
"""
self._check_is_open()
super().write(data)

def read_until_timeout(self, nbytes: int, timeout: float) -> bytes:
"""Read a sequence of bytes from the transport.

This method blocks until either the specified number of bytes
are available or the timeout (in seconds) expires, whichever occurs
sooner.

If timeout occurs, the partial sequence of available bytes is returned.
This sequence may be empty if timeout occurs before any byte was available.

If the transport has been closed on the remote side, any remaining
input bytes are returned (up to the maximum number of bytes requested).

Parameters:
nbytes: Maximum number of bytes to read.
timeout: Maximum time to wait (in seconds).

Returns:
buffer: Received bytes.
"""
self._check_is_open()
buffer = bytearray()
batch = self.READ_BYTE_BATCH_SIZE if nbytes > self.READ_BYTE_BATCH_SIZE else nbytes
bytes_read = 0
start_time = time.time()
while bytes_read < nbytes:
# Extend buffer, for now try in batches (of 1 or more bytes).
buffer = self.readinto(buffer, nbytes=batch)
bytes_read += batch
if time.time() - start_time > timeout:
break

if bytes_read == nbytes:
break

# Check on remaining buffer and adjust read batch size if necessary.
if bytes_read + batch > nbytes:
buffer = self.readinto(buffer, nbytes=nbytes - bytes_read)
break

return buffer

def discard_read(self) -> None:
"""Discard all bytes that are immediately available for reading. As using the read methods from uart.py use
`Queue.get()`, which means blocking until something is received, we work around this by calling the queue
without blocking.
"""
while True:
try:
self._uart._nova._rxdQueue.get(block=False)
except Empty:
break
8 changes: 7 additions & 1 deletion qmi/tools/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
else:
_logger = logging.getLogger(__name__)

# psutil version info
V, R, _ = map(int, psutil.__version__.split("."))

# Result from shutdown_context().
ShutdownResult = NamedTuple("ShutdownResult", [
Expand Down Expand Up @@ -253,7 +255,11 @@ def is_local_host(host: str) -> bool:
# Return True if a local IP address matches the specified host.
for addrs in if_addrs.values():
for addr in addrs:
assert isinstance(addr, psutil._common.snicaddr)
if (V == 7 and R >= 2) or V > 7:
assert isinstance(addr, psutil._ntuples.snicaddr)
else:
assert isinstance(addr, psutil._common.snicaddr)

if addr.address in host_ips:
return True

Expand Down
148 changes: 114 additions & 34 deletions tests/instruments/pololu/test_maestro.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,103 @@
from unittest.mock import Mock, patch, call
import logging

import qmi
from qmi.core.transport import QMI_TcpTransport
from qmi.instruments.pololu import Pololu_Maestro
from qmi.core.transport import QMI_SerialTransport
from qmi.core.exceptions import QMI_InstrumentException

with patch("qmi.instruments.pololu.maestro.TYPE_CHECKING", True):
from qmi.instruments.pololu import Pololu_Maestro

class PololuMaestroOpenCloseTestCase(unittest.TestCase):
from tests.patcher import PatcherQmiContext as QMI_Context


class PololuMaestroUartOpenCloseTestCase(unittest.TestCase):
board_mock = Mock()
board_mock.TX = 2
board_mock.RX = 3
busio_mock = Mock()
uart_mock = Mock()
uart_ports_mock = Mock()

def setUp(self) -> None:
qmi.start("pololu_unit_test")
transport = "serial:COM1"
self.instr = qmi.make_instrument("Pololu", Pololu_Maestro, transport)
self.busio_mock.UART = Mock(return_value=None)
self.baudrate = 115200
self._cmd_lead = chr(0xAA) + chr(0x0C)
ctx = QMI_Context("pololu_unit_test")
transport = f"uart:COM1:baudrate={self.baudrate}"
with patch.dict("sys.modules", {
"qmi.instruments.pololu.qmi_uart": self.uart_mock,
"board": self.board_mock, "busio": self.busio_mock,
"machine": Mock(), "microcontroller.pin": self.uart_ports_mock
}) as self.sys_patch:
self.uart_ports_mock.uartPorts = ([[10, 2, 3]])
self.instr = Pololu_Maestro(ctx, "Pololu", transport)

def tearDown(self) -> None:
qmi.stop()
def test_open_close(self):
"""Test opening and closing the instrument"""
self.instr.open()
self.assertTrue(self.instr.is_open())
self.instr.close()
self.assertFalse(self.instr.is_open())

def test_write_with_set_target_value(self):
"""Test writing by setting a target value."""
# Arrange
target, channel = 5000, 1
lsb = target & 0x7F # 7 bits for least significant byte
msb = (target >> 7) & 0x7F # shift 7 and take next 7 bits for msb
cmd = chr(0x04) + chr(channel) + chr(lsb) + chr(msb)
expected_command = bytes(self._cmd_lead + cmd, "latin-1")
self.instr._transport.read_until_timeout.side_effect = [b'\x00\x00'] * 2
expected_write_calls = [
call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1"))
]
expected_read_call = [call(nbytes=2, timeout=Pololu_Maestro.RESPONSE_TIMEOUT)]

# Act
self.instr.open()
self.instr.set_target(channel, target)

# Assert
self.instr._transport.write.assert_has_calls(expected_write_calls)
self.instr._transport.read_until_timeout.assert_has_calls(expected_read_call)

def test_read_with_get_position(self):
"""Test get position returns expected value."""
# Arrange
expected, channel = 5000, 0
cmd = chr(0x10) + chr(channel)
expected_command = bytes(self._cmd_lead + cmd, "latin-1")
low_bit, high_bit = expected - (expected // 256 * 256), expected // 256
self.instr._transport.read_until_timeout.side_effect = [
chr(low_bit),
chr(high_bit),
b'\x00\x00',
]
expected_write_calls = [
call(expected_command), call(bytes(self._cmd_lead + '!', "latin-1"))
]
expected_read_calls = [
call(nbytes=1, timeout=Pololu_Maestro.RESPONSE_TIMEOUT),
call(nbytes=1, timeout=Pololu_Maestro.RESPONSE_TIMEOUT),
call(nbytes=2, timeout=Pololu_Maestro.RESPONSE_TIMEOUT),
]

# Act
self.instr.open()
result = self.instr.get_position(channel)

# Assert
self.instr._transport.write.assert_has_calls(expected_write_calls)
self.instr._transport.read_until_timeout.assert_has_calls(expected_read_calls)
self.assertEqual(expected, result)


class PololuMaestroSerialOpenCloseTestCase(unittest.TestCase):

def setUp(self) -> None:
ctx = QMI_Context("pololu_unit_test")
transport = "serial:COM1"
self.instr = Pololu_Maestro(ctx, "Pololu", transport)

def test_open_close(self):
"""Test opening and closing the instrument"""
Expand Down Expand Up @@ -50,20 +132,19 @@ def setUp(self) -> None:
logging.CRITICAL)
self._cmd_lead = chr(0xAA) + chr(0x0C)
self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1")
qmi.start("pololu_unit_test")
ctx = QMI_Context("pololu_unit_test")
patcher = patch(
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport)
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport)
self._transport_mock: Mock = patcher.start().return_value
self.addCleanup(patcher.stop)
self.instr: Pololu_Maestro = qmi.make_instrument(
"Pololu", Pololu_Maestro, self.TRANSPORT_STR,
self.instr: Pololu_Maestro = Pololu_Maestro(
ctx, "Pololu", self.TRANSPORT_STR,
channels_min_max_targets={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX),
3: (self.CHANNEL3_MIN, self.CHANNEL3_MAX)})
self.instr.open()

def tearDown(self) -> None:
self.instr.close()
qmi.stop()

def test_setting_channel_min_and_max_targets(self):
"""Test setting the min and max targets of channels."""
Expand Down Expand Up @@ -102,19 +183,18 @@ def setUp(self) -> None:
logging.CRITICAL)
self._cmd_lead = chr(0xAA) + chr(0x0C)
self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1")
qmi.start("pololu_unit_test")
ctx = QMI_Context("pololu_unit_test")
patcher = patch(
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport)
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport)
self._transport_mock: Mock = patcher.start().return_value
self.addCleanup(patcher.stop)
self.instr: Pololu_Maestro = qmi.make_instrument(
"Pololu", Pololu_Maestro, self.TRANSPORT_STR,
self.instr: Pololu_Maestro = Pololu_Maestro(
ctx, "Pololu", self.TRANSPORT_STR,
channels_min_max_speeds={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX)})
self.instr.open()

def tearDown(self) -> None:
self.instr.close()
qmi.stop()

def test_setting_channel_min_and_max_speed(self):
"""Test setting the min and max speeds of channels."""
Expand All @@ -138,19 +218,18 @@ def setUp(self) -> None:
logging.CRITICAL)
self._cmd_lead = chr(0xAA) + chr(0x0C)
self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1")
qmi.start("pololu_unit_test")
ctx = QMI_Context("pololu_unit_test")
patcher = patch(
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport)
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport)
self._transport_mock: Mock = patcher.start().return_value
self.addCleanup(patcher.stop)
self.instr: Pololu_Maestro = qmi.make_instrument(
"Pololu", Pololu_Maestro, self.TRANSPORT_STR,
self.instr: Pololu_Maestro = Pololu_Maestro(
ctx, "Pololu", self.TRANSPORT_STR,
channels_min_max_accelerations={1: (self.CHANNEL1_MIN, self.CHANNEL1_MAX)})
self.instr.open()

def tearDown(self) -> None:
self.instr.close()
qmi.stop()

def test_setting_channel_min_and_max_acceleration(self):
"""Test setting the min and max accelerations of channels."""
Expand All @@ -172,18 +251,16 @@ def setUp(self) -> None:
logging.CRITICAL)
self._cmd_lead = chr(0xAA) + chr(0x0C)
self._error_check_cmd = bytes(self._cmd_lead + chr(0x21), "latin-1")
qmi.start("pololu_unit_test")
ctx = QMI_Context("pololu_unit_test")
patcher = patch(
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_TcpTransport)
'qmi.instruments.pololu.maestro.create_transport', spec=QMI_SerialTransport)
self._transport_mock: Mock = patcher.start().return_value
self.addCleanup(patcher.stop)
self.instr: Pololu_Maestro = qmi.make_instrument(
"Pololu", Pololu_Maestro, self.TRANSPORT_STR)
self.instr: Pololu_Maestro = Pololu_Maestro(ctx, "Pololu", self.TRANSPORT_STR)
self.instr.open()

def tearDown(self) -> None:
self.instr.close()
qmi.stop()

def test_get_idn(self):
"""Test getting the QMI instrument ID."""
Expand Down Expand Up @@ -278,9 +355,12 @@ def test_get_position(self):
cmd = chr(0x10) + chr(channel)
expected_command = bytes(self._cmd_lead + cmd, "latin-1")
low_bit, high_bit = expected - (expected // 256 * 256), expected // 256
self._transport_mock.read.side_effect = [chr(low_bit),
chr(high_bit), b'\x00\x00']
expected_calls = [
self._transport_mock.read_until_timeout.side_effect = [
chr(low_bit),
chr(high_bit),
b'\x00\x00'
]
expected_write_calls = [
call(expected_command),
call(self._error_check_cmd)
]
Expand All @@ -289,7 +369,7 @@ def test_get_position(self):
result = self.instr.get_position(channel)

# Assert
self._transport_mock.write.assert_has_calls(expected_calls)
self._transport_mock.write.assert_has_calls(expected_write_calls)
self.assertEqual(result, expected)

def test_set_speed(self):
Expand Down Expand Up @@ -413,7 +493,7 @@ def test_command_error_check_excepts(self):
# Arrange
cmd = chr(0x22)
expected_command = bytes(self._cmd_lead + cmd, "latin-1")
self._transport_mock.read.return_value = b'\x00\xF0'
self._transport_mock.read_until_timeout.return_value = b'\x00\xF0'
expected_calls = [
call(expected_command),
call(self._error_check_cmd)
Expand Down
Loading
Loading