-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Description
import ctypes
from ctypes import c_int, c_char_p, POINTER, c_uint, byref, CFUNCTYPE, c_void_p, c_ulong
import os
from datetime import datetime, timedelta
import time
import serial
from openmovement.load import CwaData
# Load the shared library
lib = ctypes.CDLL('/usr/local/lib/libomapi.dylib')
OM_VERSION = 108
OM_METADATA_SIZE = 448
OM_FAILED = lambda result: result < 0
OM_DATETIME_ZERO = 0
OM_DATETIME_INFINITE = 0xFFFFFFFF
OmDownloadCallback = CFUNCTYPE(None, c_void_p, c_int, c_int, c_int)
DOWNLOAD_CALLBACK = {
1: "OM_DOWNLOAD_PROGRESS",
2: "OM_DOWNLOAD_COMPLETE",
3: "OM_DOWNLOAD_CANCELLED",
4: "OM_DOWNLOAD_ERROR"
}
def call_and_log(func, message, *args):
if len(args) >= 0:
result = func(*args)
else:
result = func()
if OM_FAILED(result):
raise Exception(f"Failed: {message} , Error: {result}")
else:
print(f"Success: {message}, Result: {result}, Args: {args}")
return result
def initialize():
call_and_log(lib.OmStartup, "Initialize OMAPI library", OM_VERSION)
def shutdown():
call_and_log(lib.OmShutdown, "Shutdown OMAPI library", OM_VERSION)
def get_device_ids():
device_ids = (c_int * 10)()
count = lib.OmGetDeviceIds(device_ids, 10)
return device_ids[:count]
def send_command(device_id, command):
command = f"{command}\r\n".encode('utf-8')
response = ctypes.create_string_buffer(1024) # Allocate buffer for the response
result = lib.OmCommand(device_id, command, response, len(response), None, 0, None, 0)
if OM_FAILED(result):
raise Exception(f"Failed: Send command to device, Error: {result}")
else:
print(f"Success: {command}, Result: {result}, Response: {response.value}")
return response.value
# Configure and start recording on a device
def start_recording(device_id, delay=1):
call_and_log(lib.OmGetBatteryLevel, "Battery level for device", device_id)
call_and_log(lib.OmGetBatteryHealth, "Battery health for device", device_id)
call_and_log(lib.OmGetMemoryHealth, "Memory health for device", device_id)
send_command(device_id, "FORMAT W")
send_command(device_id, "SESSION 0")
send_command(device_id, "RATE 10,500")
now = datetime.now()
start_time = now + timedelta(seconds=delay)
formatted_start_time = start_time.strftime("%Y-%m-%d,%H:%M:%S")
send_command(device_id, f"HIBERNATE {formatted_start_time}")
send_command(device_id, "COMMIT")
# Stop recording on a device
def stop_recording(device_id):
now = datetime.now()
formatted_datetime = now.strftime("%Y-%m-%d,%H:%M:%S")
print("Stopping session-------------------")
send_command(device_id, "SESSION")
send_command(device_id, f"STOP {formatted_datetime}")
send_command(device_id, "COMMIT")
def download_callback(reference, device_id, status, value):
if status in (1, 2, 3):
print(f"DOWNLOAD #{device_id}: {DOWNLOAD_CALLBACK[status]}")
elif status == 4:
print(f"DOWNLOAD #{device_id}: Error. (Diagnostic 0x{value:04x})")
else:
print(f"DOWNLOAD #{device_id}: Unexpected status {status} / 0x{value:04x}")
def download(device_id, path):
session_id = c_int()
call_and_log(lib.OmGetSessionId, f"Get session ID for device", device_id, byref(session_id))
print(f"DOWNLOAD #{device_id}: SessionId = {session_id.value}")
start_time = c_ulong()
stop_time = c_ulong()
result = lib.OmGetDelays(device_id, byref(start_time), byref(stop_time))
if result != 0:
raise Exception(f"Failed: Get delays for device, Error: {result}")
start_time_value = datetime.fromtimestamp(start_time.value/65536).strftime("%Y-%m-%d,%H:%M:%S")
stop_time_value = datetime.fromtimestamp(stop_time.value/65536).strftime("%Y-%m-%d,%H:%M:%S")
file_name = f"{session_id.value}_{device_id}_{start_time_value}_{stop_time_value}.cwa"
file_path = os.path.join(path, file_name)
# Set the download callback before starting the download
download_callback_c = OmDownloadCallback(download_callback)
lib.OmSetDownloadCallback(download_callback_c, None)
result = lib.OmBeginDownloading(device_id, 0, -1, file_path.encode('utf-8'))
if OM_FAILED(result):
print(f"ERROR: OmBeginDownloading() {result}")
def record_main():
initialize()
device_ids = get_device_ids()
for device_id in device_ids:
start_recording(device_id)
shutdown()
def stop_main():
initialize()
device_ids = get_device_ids()
for device_id in device_ids:
stop_recording(device_id)
shutdown()
def download_main(dir):
initialize()
device_ids = get_device_ids()
for device_id in device_ids:
download(device_id, dir)
shutdown()
def convert_cws_files(dir):
fn = os.listdir(dir)
fn = [f for f in fn if f.endswith(".cwa")]
for filename in fn:
file_path = os.path.join(dir, filename)
with CwaData(file_path, include_gyro=False, include_temperature=True) as cwa_data:
# As a pandas DataFrame
samples = cwa_data.get_samples()
samples.to_csv(f"{file_path[:-3]}.csv", index=False)
if __name__ == "__main__":
record_main()
time.sleep(10)
stop_main()
dir = "/Users/aa/Downloads"
download_main(dir)
I always get OmBeginDownloading() -9 error code, where I think means that the device is not accessible/occupied in some process. I am not sure where I understand it wrong.
Metadata
Metadata
Assignees
Labels
No labels