Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
409 changes: 0 additions & 409 deletions .idea/workspace.xml

This file was deleted.

6 changes: 2 additions & 4 deletions hardwarelibrary/communication/communicationport.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ def readString(self, endPoint=None) -> str:
except CommunicationReadTimeout as err:
raise CommunicationReadTimeout("Only obtained {0}".format(data))

string = data.decode(encoding='utf-8')

return string
return data.decode('utf-8', 'replace')

def writeString(self, string, endPoint=None) -> int:
nBytes = 0
with self.portLock:
data = bytearray(string, "utf-8")
data = bytearray(string, 'utf-8', 'replace')
nBytes = self.writeData(data, endPoint)

return nBytes
Expand Down
15 changes: 8 additions & 7 deletions hardwarelibrary/communication/debugport.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ def bytesAvailable(self, endPoint=0):
return len(self.outputBuffers[endPoint])

def flush(self):
self.buffers = [bytearray(),bytearray()]
self.inputBuffers = [bytearray()]
self.outputBuffers = [bytearray()]

def readData(self, length, endPoint=None):
if endPoint is None:
endPointIndex = 0
endPoint = 0

with self.portLock:
time.sleep(self.delay*random.random())
data = bytearray()
for i in range(0, length):
if len(self.outputBuffers[endPointIndex]) > 0:
byte = self.outputBuffers[endPointIndex].pop(0)
if len(self.outputBuffers[endPoint]) > 0:
byte = self.outputBuffers[endPoint].pop(0)
data.append(byte)
else:
raise CommunicationReadTimeout("Unable to read data")
Expand All @@ -76,12 +77,12 @@ def readData(self, length, endPoint=None):

def writeData(self, data, endPoint=None):
if endPoint is None:
endPointIndex = 0
endPoint = 0

with self.portLock:
self.inputBuffers[endPointIndex].extend(data)
self.inputBuffers[endPoint].extend(data)

self.processInputBuffers(endPointIndex=endPointIndex)
self.processInputBuffers(endPoint)

return len(data)

Expand Down
14 changes: 9 additions & 5 deletions hardwarelibrary/communication/serialport.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def matchAnyPort(cls, idVendor=None, idProduct=None, serialNumber=None):
return None

@classmethod
def matchPorts(cls, idVendor=None, idProduct=None, serialNumber=None):
def matchPortObjects(cls, idVendor=None, idProduct=None, serialNumber=None):
# We must provide idVendor, idProduct and serialNumber
# or idVendor and idProduct
# or idVendor
Expand All @@ -81,7 +81,7 @@ def matchPorts(cls, idVendor=None, idProduct=None, serialNumber=None):
pass


# It sometimes happens on macOS that the ports are "doubled" because two user-space DriverExtension
# It sometimes happens on macOS that the ports are "doubled" because two user-space DriverExtension
# prepare a port (FTDI and Apple's for instance). If that is the case, then we try to remove duplicates

portObjects = []
Expand All @@ -102,14 +102,18 @@ def matchPorts(cls, idVendor=None, idProduct=None, serialNumber=None):
if re.search(serialNumber, port.serial_number, re.IGNORECASE):
portObjects.append(port)

return portObjects

@classmethod
def matchPorts(cls, idVendor=None, idProduct=None, serialNumber=None):
portObjects = cls.matchPortObjects(idVendor, idProduct, serialNumber)
ports = []
# portsAlreadyAdded = []
for port in portObjects:
# uniqueIdentifier = (port.vid, port.pid, port.serial_number)
# if not (uniqueIdentifier in portsAlreadyAdded):
# if not (uniqueIdentifier in portsAlreadyAdded):
ports.append(port.device)
# portsAlreadyAdded.append(uniqueIdentifier)
# portsAlreadyAdded.append(uniqueIdentifier)

return ports

Expand Down Expand Up @@ -191,7 +195,7 @@ def readString(self, endPoint=0):
with self.portLock:
data = self.port.read_until(expected=self.terminator)

return data.decode()
return data.decode('utf-8', 'replace')

def readData(self, length, endPoint=0) -> bytearray:
with self.portLock:
Expand Down
2 changes: 1 addition & 1 deletion hardwarelibrary/communication/usbport.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,6 @@ def readString(self, endPoint=None) -> str:
try:
data += self.readData(length=1, endPoint=endPoint)
if data[-1] == 10: # How to write '\n' ?
return data.decode(encoding='utf-8')
return data.decode('utf-8', 'replace')
except Exception as err:
raise IOError("Unable to read string terminator: {0}".format(err))
3 changes: 3 additions & 0 deletions hardwarelibrary/irises/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

from .irisdevice import IrisDevice
from .uniblitzai25device import UniblitzAI25Device
10 changes: 10 additions & 0 deletions hardwarelibrary/irises/driverAI25/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# driverAI25

Arduino Uno driver to control the Uniblitz AI25 Auto Iris. Requires a 12V 2A power supply (2.1 x 5.5 mm barrel jack).

You need to install `NeoSWSerial` in the Arduino IDE Library Manager.

Reference manual: https://www.uniblitz.com/wp-content/uploads/2021/03/ai25-direct-control-v1-2.pdf

Pinout:
![driverAI25 pinout](driverAI25_pinout.jpg)
44 changes: 44 additions & 0 deletions hardwarelibrary/irises/driverAI25/driverAI25.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Uniblitz AI25 Auto Iris driver v1.1
* for Arduino Uno/Nano/Micro
*
* Pin 2 -> AI25 Interrupt (Micro USB3 pin 9)
* Pin 3 -> AI25 RX (Micro USB3 pin 7)
* Pin 4 -> AI25 TX (Micro USB3 pin 6)
*/

#include <NeoSWSerial.h>

const byte interruptPin = 2;
NeoSWSerial irisSerial(4, 3); // Driver RX/TX -> AI25 TX/RX
String command; // input buffer

void setup()
{
Serial.begin(9600);
irisSerial.begin(9600);
pinMode(2, OUTPUT);
while (!Serial); // Wait for serial port to connect; needed for native USB only
Serial.println("driverAI25 Ready");
}

void loop()
{
if (Serial.available())
{
command = Serial.readStringUntil('\n');

if (command == "V")
{
Serial.println("driverAI25 v1.1");
return;
}

digitalWrite(interruptPin, HIGH);
delay(2);
digitalWrite(interruptPin, LOW);

irisSerial.println(command); // send command
Serial.println(irisSerial.readStringUntil('\n')); // return reply
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
118 changes: 118 additions & 0 deletions hardwarelibrary/irises/irisdevice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# irisdevice.py
#

from enum import Enum
from hardwarelibrary.physicaldevice import *
from hardwarelibrary.notificationcenter import NotificationCenter, Notification


class IrisNotification(Enum):
willMove = "willMove"
didMove = "didMove"
didGetPosition = "didGetPosition"


class IrisDevice(PhysicalDevice):
def __init__(self, serialNumber: str, idProduct: int, idVendor: int):
super().__init__(serialNumber, idProduct, idVendor)
self.minStep = 0 # closed
self.maxStep = 2**31 # open
self.micronsPerStep = 1000
self.minAperture = -1 # microns when closed

def convertMicronsToStep(self, microns: float) -> int:
""" Converts an aperture size in microns to a native step position. """
if self.micronsPerStep == 0:
raise ValueError("micronsPerStep == 0")
if self.minAperture < 0:
raise ValueError("minAperture < 0")

return round((microns - self.minAperture) / self.micronsPerStep + self.minStep)

def convertStepToMicrons(self, step: int) -> float:
""" Converts a native step position to an aperture size in microns. """
if self.minAperture < 0:
raise ValueError("minAperture < 0")

return (step - self.minStep) * self.micronsPerStep + self.minAperture

def currentStep(self) -> int:
""" Returns the current native position. """
step = self.doGetCurrentStep()
NotificationCenter().postNotification(IrisNotification.didGetPosition, notifyingObject=self, userInfo=step)
return step

def aperture(self) -> float:
""" Returns the current aperture size in microns. """
return self.convertStepToMicrons(self.currentStep())

def home(self):
NotificationCenter().postNotification(IrisNotification.willMove, notifyingObject=self)
self.doHome()
NotificationCenter().postNotification(IrisNotification.didMove, notifyingObject=self)

def moveTo(self, step: int):
NotificationCenter().postNotification(IrisNotification.willMove, notifyingObject=self, userInfo=step)
self.doMoveTo(step)
NotificationCenter().postNotification(IrisNotification.didMove, notifyingObject=self, userInfo=step)

def moveToMicrons(self, microns):
self.moveTo(self.convertMicronsToStep(microns))

def moveBy(self, steps: int):
NotificationCenter().postNotification(IrisNotification.willMove, notifyingObject=self, userInfo=steps)
self.doMoveBy(steps)
NotificationCenter().postNotification(IrisNotification.didMove, notifyingObject=self, userInfo=steps)

def moveByMicrons(self, microns):
self.moveBy(round(microns / self.micronsPerStep))

def isValidStep(self, step: int) -> bool:
return min(self.minStep, self.maxStep) < step < max(self.minStep, self.maxStep)

def isValidAperture(self, microns: float) -> bool:
return self.isValidStep(self.convertMicronsToStep(microns))

def doGetCurrentStep(self) -> int:
raise NotImplementedError

def doHome(self):
raise NotImplementedError

def doMoveTo(self, step):
raise NotImplementedError

def doMoveBy(self, steps):
raise NotImplementedError


class DebugIrisDevice(IrisDevice):
classIdProduct = 0xFFFD
classIdVendor = debugClassIdVendor

def __init__(self):
super().__init__("debug", DebugIrisDevice.classIdProduct, DebugIrisDevice.classIdVendor)
self.minStep = 0 # iris closed
self.maxStep = 100 # iris open
self.micronsPerStep = 100
self.minAperture = 0 # microns when closed
self.lastSetStep = 0

def doHome(self):
self.lastSetStep = 0

def doGetCurrentStep(self):
return self.lastSetStep

def doMoveTo(self, step):
self.lastSetStep = step

def doMoveBy(self, steps):
self.lastSetStep += steps

def doInitializeDevice(self):
pass

def doShutdownDevice(self):
pass
Loading