-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_cpu_cam.py
More file actions
97 lines (84 loc) · 2.83 KB
/
sync_cpu_cam.py
File metadata and controls
97 lines (84 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
'''
To be used on the Raspberry Pi to regulate capture speeds.
This will send UDP and TTL pulses to a variety of devices to ensure syncronous recording.
'''
import pigpio
from timeit import default_timer as timer
import socket
import numpy as np
import time
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("-l", "--length", type = float, default = 1,
help="desired video length in minutes")
ap.add_argument("-t", "--ttl", type = int, default = 10,
help="ttl frequency")
ap.add_argument("-u", "--USB", type=int, default=20,
help="UDP frequency for USB cameras")
ap.add_argument("-d", "--dict", type = argparse.FileType('r'),
nargs = 1, required = False,
help="name/host/port dict for communication")
args = vars(ap.parse_args())
#intial parameters for camera rates
fpscMOS = args['ttl'] #fps for cMOS camera
fpsUSB = args['USB'] #fps for USB camera
mlen = args['length'] # in minutes
timesec = mlen * 60
lag = 0 #0.0002
#define how the communication will occur through ethernet
name = ['sUSB', 'sCMOS']
hosts = ['10.42.0.1', '']
ports = [8936, 8937]
#calculating looping parameters
if fpscMOS >= fpsUSB:
nframe = timesec * fpscMOS #(min * 60sec/min * fps)
step = 1 / fpscMOS + lag
fac = fpscMOS / fpsUSB
if fpscMOS << fpsUSB:
nframe = timesec * fpsUSB
step = 1 / fpsUSB + lag
fac = fpsUSB / fpscMOS
ran = np.arange(0, (mlen * 60) + step, step)
#ran = np.arange(0, 10 + step, step)
#server and communication setup
def setupServer(name, host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print ("Socket created.")
try:
s.bind((host, port))
print ('Socket', name, 'bound to ', host, ' : ', port)
except socket.error as msg:
print(msg)
return s
#TTL write
def pulse(GPIO, dur):
pi.write(GPIO, 1) # high
time.sleep(dur) # in sec
pi.write(GPIO, 0) # low
#initialize handles for TTL and UDP
sCMOS = setupServer(name[1], hosts[1], ports[1]) #UDP to CMOS CPU
sUSB = setupServer(name[0], hosts[0], ports[0]) #UDP to USB CPU
pi = pigpio.pi() #TTL
#for loop sending information
t0 = timer()
pi.write(23, 1) # Cam Aquire
for i, n in enumerate(ran[:-1]):
#send TTL
if (i%(fac)) == 0:
pulse(24, 0.01) # Trigger for aquisition
#send UDP to USB CPU
sCMOS.sendto(str.encode(str(i//fac)), (hosts[1], ports[1]))
#send UDP to USB CPU
sUSB.sendto(str.encode(str(i//1)), (hosts[0], ports[0]))
if (i%100) == 0:
print('Triggering USB camera frame', i, 'of', nframe)
print('Triggering CMOS camera frame', (i//fac), 'of', (nframe//fac))
#print('time elapsed:', timer() - t0)
#print('theoretical time:', n)
tsleep = ran[i+1] - (timer() - t0)
time.sleep(tsleep)
pi.write(23, 0)
print("Shutting down.")
pi.stop()
sCMOS.close()
sUSB.close()