Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions alarmdecoder/devices/serial_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"""

import threading
import serial
import serial.tools.list_ports
import serialx
import serialx.tools.list_ports
import select
import sys
from .base_device import Device
Expand Down Expand Up @@ -41,11 +41,11 @@ def find_all(pattern=None):

try:
if pattern:
devices = serial.tools.list_ports.grep(pattern)
devices = serialx.tools.list_ports.grep(pattern)
else:
devices = serial.tools.list_ports.comports()
devices = serialx.tools.list_ports.comports()

except serial.SerialException as err:
except OSError as err:
raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err)

return devices
Expand Down Expand Up @@ -80,8 +80,6 @@ def __init__(self, interface=None):

self._port = interface
self._id = interface
# Timeout = non-blocking to match pyftdi.
self._device = serial.Serial(timeout=0, writeTimeout=0)

def open(self, baudrate=BAUDRATE, no_reader_thread=False):
"""
Expand All @@ -106,7 +104,7 @@ def open(self, baudrate=BAUDRATE, no_reader_thread=False):

# Open the device and start up the reader thread.
try:
self._device.port = self._port
self._device = serialx.Serial(self._port, read_timeout=0, write_timeout=0)
self._device.open()
# NOTE: Setting the baudrate before opening the
# port caused issues with Moschip 7840/7820
Expand All @@ -116,7 +114,7 @@ def open(self, baudrate=BAUDRATE, no_reader_thread=False):
# all issues with it.
self._device.baudrate = baudrate

except (serial.SerialException, ValueError, OSError) as err:
except (OSError, ValueError) as err:
raise NoDeviceError('Error opening device on {0}.'.format(self._port), err)

else:
Expand Down Expand Up @@ -162,10 +160,10 @@ def write(self, data):

self._device.write(data)

except serial.SerialTimeoutException:
except TimeoutError:
pass

except serial.SerialException as err:
except OSError as err:
raise CommError('Error writing to device.', err)

else:
Expand All @@ -186,7 +184,7 @@ def read(self):
if len(read_ready) != 0:
data = filter_ad2prot_byte(self._device.read(1))

except serial.SerialException as err:
except OSError as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

return data.decode('utf-8')
Expand Down Expand Up @@ -237,7 +235,7 @@ def timeout_event():
if len(self._buffer) > 0:
got_line = True
break
except (OSError, serial.SerialException) as err:
except OSError as err:
raise CommError('Error reading from device: {0}'.format(str(err)), err)

else:
Expand All @@ -258,5 +256,5 @@ def purge(self):
"""
Purges read/write buffers.
"""
self._device.flushInput()
self._device.flushOutput()
self._device.reset_read_buffer()
self._device.reset_write_buffer()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
future>=0.14.3
pyserial>=2.7
serialx
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def readme():
license='MIT',
packages=['alarmdecoder', 'alarmdecoder.devices', 'alarmdecoder.event', 'alarmdecoder.messages', 'alarmdecoder.messages.lrr'],
install_requires=[
'pyserial>=2.7',
'serialx',
] + extra_requirements,
test_suite='nose.collector',
tests_require=['nose', 'mock'],
Expand Down
22 changes: 11 additions & 11 deletions test/test_devices.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException
import serialx
import sys
import socket
import time
Expand Down Expand Up @@ -32,7 +32,7 @@
class TestSerialDevice(TestCase):
def setUp(self):
self._device = SerialDevice()
self._device._device = Mock(spec=Serial)
self._device._device = Mock(spec=serialx.Serial)
self._device._device.open = Mock()

def tearDown(self):
Expand All @@ -56,7 +56,7 @@ def test_open_no_interface(self):
def test_open_failed(self):
self._device.interface = '/dev/ttyS0'

with patch.object(self._device._device, 'open', side_effect=[SerialException, ValueError]):
with patch.object(self._device._device, 'open', side_effect=[OSError, ValueError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

Expand All @@ -73,7 +73,7 @@ def test_write(self):
mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'write', side_effect=SerialException):
with patch.object(self._device._device, 'write', side_effect=OSError):
with self.assertRaises(CommError):
self._device.write(b'test')

Expand All @@ -85,15 +85,15 @@ def test_read(self):
side_effect = ["t".encode('utf-8')]

with patch.object(self._device._device, 'read', side_effect=side_effect) as mock:
with patch('serial.Serial.fileno', return_value=1):
with patch('serialx.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
ret = self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read', side_effect=SerialException):
with patch('serial.Serial.fileno', return_value=1):
with patch.object(self._device._device, 'read', side_effect=OSError):
with patch('serialx.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(CommError):
self._device.read()
Expand All @@ -104,7 +104,7 @@ def test_read_line(self):
side_effect = [chr(x).encode('utf-8') for x in b"testing\r\n"]

with patch.object(self._device._device, 'read', side_effect=side_effect):
with patch('serial.Serial.fileno', return_value=1):
with patch('serialx.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
ret = None
try:
Expand All @@ -116,16 +116,16 @@ def test_read_line(self):

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value=b'a') as mock:
with patch('serial.Serial.fileno', return_value=1):
with patch('serialx.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer.decode('utf-8'))

def test_read_line_exception(self):
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
with patch('serial.Serial.fileno', return_value=1):
with patch.object(self._device._device, 'read', side_effect=[OSError, OSError]):
with patch('serialx.Serial.fileno', return_value=1):
with patch.object(select, 'select', return_value=[[1], [], []]):
with self.assertRaises(CommError):
self._device.read_line()
Expand Down