Skip to content
127 changes: 96 additions & 31 deletions mmwave/dataloader/adc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from enum import Enum

import numpy as np
import time


class CMD(Enum):
Expand Down Expand Up @@ -54,11 +55,8 @@ def __str__(self):
# DYNAMIC
BYTES_IN_FRAME = (ADC_PARAMS['chirps'] * ADC_PARAMS['rx'] * ADC_PARAMS['tx'] *
ADC_PARAMS['IQ'] * ADC_PARAMS['samples'] * ADC_PARAMS['bytes'])
BYTES_IN_FRAME_CLIPPED = (BYTES_IN_FRAME // BYTES_IN_PACKET) * BYTES_IN_PACKET
PACKETS_IN_FRAME = BYTES_IN_FRAME / BYTES_IN_PACKET
PACKETS_IN_FRAME_CLIPPED = BYTES_IN_FRAME // BYTES_IN_PACKET
UINT16_IN_PACKET = BYTES_IN_PACKET // 2
UINT16_IN_FRAME = BYTES_IN_FRAME // 2
DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS = 1.0


class DCA1000:
Expand Down Expand Up @@ -116,7 +114,7 @@ def __init__(self, static_ip='192.168.33.30', adc_ip='192.168.33.180',
self.packet_count = []
self.byte_count = []

self.frame_buff = []
self.frame_buff = {}

self.curr_buff = None
self.last_frame = None
Expand Down Expand Up @@ -157,7 +155,7 @@ def close(self):
self.config_socket.close()

def read(self, timeout=1):
""" Read in a single packet via UDP
""" Read in a single frame via UDP

Args:
timeout (float): Time to wait for packet before moving on
Expand All @@ -169,34 +167,25 @@ def read(self, timeout=1):
# Configure
self.data_socket.settimeout(timeout)

# Frame buffer
ret_frame = np.zeros(UINT16_IN_FRAME, dtype=np.uint16)

# Wait for start of next frame
while True:
packet_num, byte_count, packet_data = self._read_data_packet()
if byte_count % BYTES_IN_FRAME_CLIPPED == 0:
packets_read = 1
ret_frame[0:UINT16_IN_PACKET] = packet_data
break

# Read in the rest of the frame
# Read packets until a full frame is read
while True:
# Read UDP packet
packet_num, byte_count, packet_data = self._read_data_packet()
packets_read += 1

if byte_count % BYTES_IN_FRAME_CLIPPED == 0:
self.lost_packets = PACKETS_IN_FRAME_CLIPPED - packets_read
return ret_frame

curr_idx = ((packet_num - 1) % PACKETS_IN_FRAME_CLIPPED)
try:
ret_frame[curr_idx * UINT16_IN_PACKET:(curr_idx + 1) * UINT16_IN_PACKET] = packet_data
except:
pass

if packets_read > PACKETS_IN_FRAME_CLIPPED:
packets_read = 0
# Place data from UDP packet in frame buffer
frame_num, frame_data = self._place_data_packet_in_frame_buffer(
byte_count=byte_count,
payload=packet_data
)

if frame_data is not None:
# Remove incomplete frames from frame buffer which exceed a timeout
dropped_frames = self._delete_incomplete_frames(timeout_seconds=DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS)
if dropped_frames:
ids = ", ".join(str(f) for f in dropped_frames)
print(f"WARNING: Dropped Frame(s) {ids} since they weren't complete.")
# Return the complete frame
return frame_data

def _send_command(self, cmd, length='0000', body='', timeout=1):
"""Helper function to send a single commmand to the FPGA
Expand Down Expand Up @@ -236,6 +225,82 @@ def _read_data_packet(self):
byte_count = struct.unpack('>Q', b'\x00\x00' + data[4:10][::-1])[0]
packet_data = np.frombuffer(data[10:], dtype=np.uint16)
return packet_num, byte_count, packet_data

def _place_data_packet_in_frame_buffer(self, byte_count: int, payload: np.ndarray):
"""Helper function to place one UDP packet at the correct position in the frame buffer

Args:
byte_count (int): cumulative Bytes before this payload (from DCA1000 header)
payload (np.ndarray): uint16 from the UDP packet

Returns:
(int, np.ndarray): Complete frame as a tuple of (frame_num, frame_data),
(None, None) if no frame is complete yet
"""

offset = byte_count // 2 # Absolute position in UDP packet stream
idx = 0 # Read-index of payload
remaining = payload.size # Number of uint16 to process
completed = (None, None) # Tuple of (frame_id, frame_data) for complete captured frame

while remaining > 0:
# Determine which frame_id this data chunk belongs to
frame_id = offset // UINT16_IN_FRAME
# Determine which packet number this is within the frame
packet_num_within_frame = offset % UINT16_IN_FRAME
n_uint16_to_frame_end = UINT16_IN_FRAME - packet_num_within_frame

# Determine the size chunk of the data which is written to buffer
# (detect if the frame border is within this packet or not)
chunk_size = min(remaining, n_uint16_to_frame_end)

# Create buffer within frame_buff obj for this frame if neccessary
buf = self.frame_buff.setdefault(
frame_id,
{
'data': np.empty(UINT16_IN_FRAME, dtype=np.uint16),
'filled': np.zeros(UINT16_IN_FRAME, dtype=bool),
'first_seen': time.time()
}
)

# Write chunk to appropriate position in the frame's buffer
start = packet_num_within_frame
end = packet_num_within_frame + chunk_size
buf['data'][start:end] = payload[idx:idx+chunk_size]
buf['filled'][start:end] = True

# If all packets for the frame have been read, add it to completed tuple
# (but do not return yet, as otherwise the rest of the packet data is lost)
if buf['filled'].all():
completed = (frame_id, buf['data'].copy())
del self.frame_buff[frame_id]

# Persist in helper vars that chunk has been read
offset += chunk_size
idx += chunk_size
remaining -= chunk_size

return completed

def _delete_incomplete_frames(self, timeout_seconds: float=0.2):
"""Helper function to delete incomplete frames from frame buffer which exceed a given timeout

Args:
timeout_seconds (float): Time after which incomplete frames are deleted

Returns:
List[int]: List of frame numbers which were deleted (can be empty)
"""
now = time.time()
to_delete = []
for frame_number, buf in self.frame_buff.items():
if now - buf['first_seen'] > timeout_seconds:
to_delete.append(frame_number)
for frame_number in to_delete:
del self.frame_buff[frame_number]

return to_delete

def _listen_for_error(self):
"""Helper function to try and read in for an error message from the FPGA
Expand Down