The WitsKit SQL storage module provides a robust, timeseries-optimized database backend for storing and querying WITS drilling data. It supports SQLite, PostgreSQL, and MySQL databases with full async operation support.
- Timeseries Optimized: Database schema designed for efficient time-based queries
- Multiple Databases: Support for SQLite, PostgreSQL, and MySQL
- Async Operations: Non-blocking database operations for high-performance streaming
- Batch Processing: Efficient bulk insertions for streaming data
- Symbol Normalization: Automatic storage of WITS symbol definitions
- Rich Querying: Time-based filtering, symbol filtering, and statistical queries
- CLI Integration: Direct integration with witskit CLI commands
Install with SQL dependencies:
# SQLite support (included by default)
pip install witskit[sql]
# PostgreSQL support
pip install witskit[postgres]
# MySQL support
pip install witskit[mysql]
# All database support
pip install witskit[sql,postgres,mysql]Stream WITS data directly to a SQLite database:
# Stream from TCP source to SQLite
witskit stream tcp://192.168.1.100:12345 --sql-db sqlite:///drilling_data.db
# Stream from serial port to PostgreSQL
witskit stream serial:///dev/ttyUSB0 --sql-db postgresql://user:pass@localhost/wits_db
# Batch processing for better performance
witskit stream tcp://localhost:12345 --sql-db sqlite:///data.db --sql-batch-size 50Query the stored data with powerful filtering:
# List available symbols
witskit sql-query sqlite:///drilling_data.db --list-symbols
# Query specific drilling parameters
witskit sql-query sqlite:///drilling_data.db --symbols "0108,0113" --limit 100
# Time-based queries
witskit sql-query sqlite:///drilling_data.db \
--symbols "0108" \
--start "2024-01-01T10:00:00" \
--end "2024-01-01T12:00:00"
# Export to CSV
witskit sql-query sqlite:///drilling_data.db \
--symbols "0108,0113" \
--format csv \
--output drilling_data.csvThe SQL storage uses a normalized, timeseries-optimized schema:
wits_symbols - Symbol definitions (metadata)
symbol_code(Primary Key): 4-digit WITS symbol codename: Human-readable symbol namedescription: Detailed descriptiondata_type: Data type (A/F/S/L)fps_unit,metric_unit: Unit informationrecord_type: WITS record type
wits_frames - Frame metadata
id(Primary Key): Auto-increment frame IDtimestamp: Frame timestampsource: Data source identifierraw_data: Original raw WITS framecreated_at: Database insertion time
wits_data_points - Individual measurements (main timeseries table)
id(Primary Key): Auto-increment IDframe_id: Reference to wits_framessymbol_code: Reference to wits_symbolstimestamp: Measurement timestampsource: Data source identifierraw_value: Original string valuenumeric_value: Parsed numeric value (if applicable)string_value: Parsed string value (if applicable)unit: Measurement unit
wits_sources - Source tracking and statistics
source(Primary Key): Source identifierfirst_seen,last_seen: Activity timestampstotal_frames,total_data_points: Statisticsis_active: Active status flag
The schema includes optimized indexes for timeseries queries:
- Primary timeseries:
(symbol_code, timestamp) - Source-specific:
(symbol_code, source, timestamp) - Time range:
(timestamp, symbol_code) - Latest values:
(symbol_code, source, timestamp DESC)
import asyncio
from witskit.storage.sql_writer import SQLWriter, DatabaseConfig
async def store_data():
# Create database configuration
config = DatabaseConfig.sqlite("drilling_data.db")
# Initialize writer
sql_writer = SQLWriter(config)
await sql_writer.initialize()
try:
# Store decoded WITS frames
await sql_writer.store_frames(decoded_frames)
# Query data points
async for data_point in sql_writer.query_data_points(
symbol_codes=["0108", "0113"],
start_time=start_time,
end_time=end_time,
limit=1000
):
print(f"{data_point.timestamp}: {data_point.parsed_value}")
finally:
await sql_writer.close()
# Run async function
asyncio.run(store_data())SQLite
config = DatabaseConfig.sqlite("path/to/database.db", echo=False)PostgreSQL
config = DatabaseConfig.postgresql(
host="localhost",
port=5432,
username="postgres",
password="password",
database="wits_data"
)MySQL
config = DatabaseConfig.mysql(
host="localhost",
port=3306,
username="root",
password="password",
database="wits_data"
)Time-based filtering
from datetime import datetime, timedelta
# Query last hour of data
start_time = datetime.now() - timedelta(hours=1)
async for dp in sql_writer.query_data_points(
symbol_codes=["0108"],
start_time=start_time,
limit=1000
):
print(f"Depth: {dp.parsed_value} {dp.unit}")Source filtering
# Query data from specific source
async for dp in sql_writer.query_data_points(
symbol_codes=["0108", "0113"],
source="tcp://192.168.1.100:12345"
):
print(f"{dp.symbol_name}: {dp.parsed_value}")Frame-based queries
# Query complete frames with all data points
async for frame in sql_writer.query_frames(
start_time=start_time,
end_time=end_time,
limit=10
):
print(f"Frame from {frame.source}: {len(frame.data_points)} measurements")For high-throughput streaming, use batch processing:
batch = []
batch_size = 100
for frame_data in stream:
decoded_frame = decode_frame(frame_data)
batch.append(decoded_frame)
if len(batch) >= batch_size:
await sql_writer.store_frames(batch)
batch.clear()
# Store remaining batch
if batch:
await sql_writer.store_frames(batch)The schema is optimized for common query patterns:
- Timeseries queries: Use symbol_code + timestamp filters
- Latest values: Query with ORDER BY timestamp DESC LIMIT 1
- Range queries: Use timestamp-based WHERE clauses
- Multi-source: Include source in WHERE clause for optimal performance
For production deployments, configure appropriate connection pools:
config = DatabaseConfig.postgresql(
host="localhost",
database="wits_data",
pool_size=10, # Number of persistent connections
max_overflow=20 # Additional connections under load
)witskit stream <source> [OPTIONS]
SQL Storage Options:
--sql-db TEXT Database URL (sqlite:///path.db, postgresql://...)
--sql-batch-size INTEGER Batch size for SQL inserts (default: 100)
--sql-echo Echo SQL statements for debuggingwitskit sql-query <database> [OPTIONS]
Options:
--symbols TEXT Comma-separated symbol codes (e.g., "0108,0113")
--start TEXT Start time (ISO format: 2024-01-01T10:00:00)
--end TEXT End time (ISO format: 2024-01-01T12:00:00)
--source TEXT Filter by data source
--limit INTEGER Maximum records to return (default: 1000)
--format TEXT Output format: table, json, csv (default: table)
--output PATH Output file
--list-symbols List available symbols in database
--time-range Show time range of data in database# Start streaming from TCP source to database
witskit stream tcp://drilling-rig:12345 \
--sql-db sqlite:///rig_data.db \
--sql-batch-size 50 \
--metric
# In another terminal, query the live data
witskit sql-query sqlite:///rig_data.db \
--symbols "0108,0113,0114" \
--start "2024-01-01T08:00:00" \
--format table# 1. Check what data is available
witskit sql-query sqlite:///drilling_data.db --list-symbols
witskit sql-query sqlite:///drilling_data.db --time-range
# 2. Export drilling parameters for analysis
witskit sql-query sqlite:///drilling_data.db \
--symbols "0108,0113,0114,0115" \
--start "2024-01-01T00:00:00" \
--end "2024-01-01T23:59:59" \
--format csv \
--output daily_drilling_report.csv
# 3. Query specific time periods
witskit sql-query sqlite:///drilling_data.db \
--symbols "0108" \
--start "2024-01-01T14:30:00" \
--end "2024-01-01T15:30:00" \
--format json \
--output depth_analysis.json- Use batch processing for high-throughput streaming (50-100 frames per batch)
- Include time filters in queries to leverage indexes effectively
- Use appropriate connection pools for production deployments
- Monitor database size and implement retention policies for long-term operations
- Use specific symbol filters rather than querying all symbols when possible
- Consider partitioning large tables by time for very high-volume deployments
Import Errors
# Install SQL dependencies
pip install witskit[sql]
# For specific databases
pip install witskit[postgres] # PostgreSQL
pip install witskit[mysql] # MySQLConnection Issues
# Test database connection
witskit sql-query <database_url> --list-symbols
# Enable SQL debugging
witskit stream <source> --sql-db <database_url> --sql-echoPerformance Issues
- Increase batch size:
--sql-batch-size 200 - Use connection pooling for production
- Add indexes for custom query patterns
- Consider database-specific optimizations (PostgreSQL: work_mem, MySQL: innodb_buffer_pool_size)
Monitor database performance:
-- SQLite: Check database size
SELECT page_count * page_size AS size_bytes FROM pragma_page_count(), pragma_page_size();
-- PostgreSQL: Check table sizes
SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables WHERE schemaname = 'public';
-- MySQL: Check table sizes
SELECT table_name, ROUND(((data_length + index_length) / 1024 / 1024), 2) AS 'Size (MB)'
FROM information_schema.TABLES WHERE table_schema = 'wits_data';