Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 273 additions & 10 deletions src/bertron_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
"""

import requests
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Optional, Literal
from dataclasses import dataclass
import logging
from urllib.parse import urljoin
import json

# Import pydantic Entity from bertron_schema_pydantic
from schema.datamodel.bertron_schema_pydantic import Entity
Expand Down Expand Up @@ -85,6 +86,105 @@ def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
logger.error(f"API request failed: {e}")
raise BertronAPIError(f"API request failed: {e}")

def create_property_filter(self,
property_name: Optional[str] = None,
property_id: Optional[str] = None,
property_value: Any = None,
property_type: Optional[Literal["raw", "numeric", "regex", "range"]] = "raw") -> Dict[str, Any]:
"""
Create a filter dictionary for querying entities using $elemMatch.

This ensures all conditions match the SAME property object in the properties array,
preventing false matches across different property objects.

Args:
property_name: Name of the property to filter on (e.g., "depth", "elevation")
property_id: ID of the property to filter on (e.g., "MIXS:0000018")
property_value: Value to filter by
property_type: Type of filter - must be one of:
- "raw": Match exact string in value or raw_value fields
- "numeric": Match exact numeric value
- "regex": Match regex pattern in value field
- "range": Match numeric range [min, max]

Returns:
Dictionary representing the filter with $elemMatch

Examples:
# Filter by property name and raw value
filter = create_property_filter(
property_name="depth",
property_value="0 - 0.1m",
property_type="raw"
)

# Filter by property ID and numeric value
filter = create_property_filter(
property_id="MIXS:0000093",
property_value=24,
property_type="numeric"
)

# Filter by numeric range
filter = create_property_filter(
property_name="elevation",
property_value=[20, 30],
property_type="range"
)
"""
if not property_name and not property_id:
raise ValueError("Either property_name or property_id must be provided")

# Build the $elemMatch conditions
elem_match_conditions = {}

# Add attribute filters
if property_name:
elem_match_conditions["attribute.label"] = property_name
if property_id:
elem_match_conditions["attribute.id"] = property_id

# If no value specified, just filter by attribute
if property_value is None:
return {"properties": {"$elemMatch": elem_match_conditions}}

# Add value filters based on property_type
if property_type == "regex":
elem_match_conditions["value"] = {"$regex": property_value}

elif property_type == "numeric":
elem_match_conditions["numeric_value"] = property_value

elif property_type == "range":
# For range queries, we need to handle both single numeric_value
# and minimum_numeric_value/maximum_numeric_value pairs
# Use $or to match either case
elem_match_conditions["$or"] = [
{
"numeric_value": {
"$gte": property_value[0],
"$lte": property_value[1]
}
},
{
"minimum_numeric_value": {"$lte": property_value[1]},
"maximum_numeric_value": {"$gte": property_value[0]}
}
]
Comment on lines +158 to +173
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No validation that property_value is a list/tuple with exactly 2 elements before accessing indices [0] and [1]. This will raise an IndexError if a single value or wrong format is passed.

Copilot uses AI. Check for mistakes.

elif property_type == "raw":
# Match either value or raw_value field
elem_match_conditions["$or"] = [
{"value": property_value},
{"raw_value": property_value}
]

else:
raise ValueError("Invalid property_type. Must be one of 'raw', 'numeric', 'regex', 'range'")

return {"properties": {"$elemMatch": elem_match_conditions}}


def health_check(self) -> Dict[str, Any]:
"""
Check the health of the BERtron API server.
Expand Down Expand Up @@ -153,7 +253,11 @@ def find_entities(
return QueryResponse(entities=entities, count=response["count"])

def find_nearby_entities(
self, latitude: float, longitude: float, radius_meters: float
self,
latitude: float,
longitude: float,
radius_meters: float,
filter_dict: Optional[Dict[str, Any]] = None,
) -> QueryResponse:
"""
Find entities within a specified radius of a geographic point.
Expand All @@ -166,11 +270,14 @@ def find_nearby_entities(
Returns:
QueryResponse containing nearby entities (sorted by distance)
"""
params = {
params: Dict[str, Any] = {
"latitude": latitude,
"longitude": longitude,
"radius_meters": radius_meters,
}

if filter_dict:
params["filter_json"] = json.dumps(filter_dict)

response = self._make_request("GET", "/bertron/geo/nearby", params=params)
entities = [Entity(**doc) for doc in response["documents"]]
Expand All @@ -194,6 +301,7 @@ def find_entities_in_bounding_box(
southwest_lng: float,
northeast_lat: float,
northeast_lng: float,
filter_dict: Optional[Dict[str, Any]] = None
) -> QueryResponse:
"""
Find entities within a rectangular bounding box.
Expand All @@ -207,12 +315,15 @@ def find_entities_in_bounding_box(
Returns:
QueryResponse containing entities within the bounding box
"""
params = {
params: Dict[str, Any] = {
"southwest_lat": southwest_lat,
"southwest_lng": southwest_lng,
"northeast_lat": northeast_lat,
"northeast_lng": northeast_lng,
}

if filter_dict:
params["filter_json"] = json.dumps(filter_dict)

response = self._make_request("GET", "/bertron/geo/bbox", params=params)
entities = [Entity(**doc) for doc in response["documents"]]
Expand All @@ -232,7 +343,7 @@ def find_entities_in_bounding_box(
metadata=metadata,
)

def find_entities_by_source(self, source: str) -> QueryResponse:
def find_entities_by_source(self, source: str, filter_dict: Optional[Dict[str, Any]] = None) -> QueryResponse:
"""
Find entities from a specific BER data source.

Expand All @@ -242,9 +353,12 @@ def find_entities_by_source(self, source: str) -> QueryResponse:
Returns:
QueryResponse containing entities from the specified source
"""
return self.find_entities(filter_dict={"ber_data_source": source})
base_filter = {"ber_data_source": source}
if filter_dict:
base_filter.update(filter_dict)
Comment on lines +356 to +358
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using update() can overwrite the ber_data_source key if filter_dict contains the same key. This could silently change the intended behavior. Consider using a merge strategy that preserves the base filter or validates for conflicts.

Copilot uses AI. Check for mistakes.
return self.find_entities(filter_dict=base_filter)

def find_entities_by_entity_type(self, entity_type: str) -> QueryResponse:
def find_entities_by_entity_type(self, entity_type: str, filter_dict: Optional[Dict[str, Any]] = None) -> QueryResponse:
"""
Find entities of a specific entity type.

Expand All @@ -254,7 +368,10 @@ def find_entities_by_entity_type(self, entity_type: str) -> QueryResponse:
Returns:
QueryResponse containing entities of the specified type
"""
return self.find_entities(filter_dict={"entity_type": entity_type})
base_filter = {"entity_type": entity_type}
if filter_dict:
base_filter.update(filter_dict)
Comment on lines +371 to +373
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as with find_entities_by_source: using update() can overwrite the entity_type key if filter_dict contains the same key, potentially changing the intended filtering behavior.

Suggested change
base_filter = {"entity_type": entity_type}
if filter_dict:
base_filter.update(filter_dict)
# Ensure the entity_type argument always takes precedence over filter_dict
base_filter = dict(filter_dict) if filter_dict else {}
base_filter["entity_type"] = entity_type

Copilot uses AI. Check for mistakes.
return self.find_entities(filter_dict=base_filter)

def search_entities_by_name(
self, name_pattern: str, case_sensitive: bool = False
Expand All @@ -276,7 +393,11 @@ def search_entities_by_name(
return self.find_entities(filter_dict=regex_filter)

def get_entities_in_region(
self, center_lat: float, center_lng: float, radius_km: float
self,
center_lat: float,
center_lng: float,
radius_km: float,
filter_dict: Optional[Dict[str, Any]] = None
) -> QueryResponse:
"""
Convenience method to find entities in a region (radius in kilometers).
Expand All @@ -290,7 +411,7 @@ def get_entities_in_region(
QueryResponse containing entities in the specified region
"""
radius_meters = radius_km * 1000
return self.find_nearby_entities(center_lat, center_lng, radius_meters)
return self.find_nearby_entities(center_lat, center_lng, radius_meters, filter_dict=filter_dict)

def close(self):
"""Close the HTTP session."""
Expand Down Expand Up @@ -357,6 +478,148 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
print(f"Found {pnw_entities.count} entities in Yellowstone region")

# ===== PROPERTY FILTERING EXAMPLES =====
print("\n" + "="*60)
print("PROPERTY FILTERING EXAMPLES")
print("="*60)

# Example 1: Filter by raw value (string match)
# Find entities with depth "0 - 0.1m"
print("\n1. Filter by raw value (depth = '0 - 0.1m')...")
depth_filter = client.create_property_filter(
property_name="depth",
property_id=None,
property_value="0 - 0.1m",
property_type="raw"
)
depth_entities = client.find_entities(filter_dict=depth_filter)
print(f"Found {depth_entities.count} entities with depth '0 - 0.1m'")

# Example 2: Filter by numeric value
# Find entities with elevation of exactly 24 meters
print("\n2. Filter by numeric value (elevation = 24)...")
elevation_filter = client.create_property_filter(
property_name="elevation",
property_id=None,
property_value=24,
property_type="numeric"
)
elevation_entities = client.find_entities(filter_dict=elevation_filter)
print(f"Found {elevation_entities.count} entities with elevation = 24m")

# Example 3: Filter by property ID instead of name
# Find entities with total phosphorous using MIXS ID
print("\n3. Filter by property ID (MIXS:0000117 - total phosphorous)...")
phosphorous_filter = client.create_property_filter(
property_name=None,
property_id="MIXS:0000117",
property_value=2.2,
property_type="numeric"
)
phosphorous_entities = client.find_entities(filter_dict=phosphorous_filter)
print(f"Found {phosphorous_entities.count} entities with total phosphorous = 2.2 ppm")

# Example 4: Filter by numeric range
# Find entities with elevation between 20 and 30 meters
print("\n4. Filter by numeric range (elevation between 20-30m)...")
elevation_range_filter = client.create_property_filter(
property_name="elevation",
property_id=None,
property_value=[20, 30],
property_type="range"
)
elevation_range_entities = client.find_entities(filter_dict=elevation_range_filter)
print(f"Found {elevation_range_entities.count} entities with elevation between 20-30m")

# Example 5: Filter by regex pattern
# Find entities with collection dates in June 2025
print("\n5. Filter by regex pattern (collection date in June 2025)...")
date_filter = client.create_property_filter(
property_name="collection date",
property_id=None,
property_value="2025-06-.*",
property_type="regex"
)
june_entities = client.find_entities(filter_dict=date_filter)
print(f"Found {june_entities.count} entities collected in June 2025")

# Example 6: Filter by controlled vocabulary value
# Find entities with env_broad_scale = "terrestrial biome"
print("\n6. Filter by controlled vocabulary (env_broad_scale = 'terrestrial biome')...")
env_filter = client.create_property_filter(
property_name="env_broad_scale",
property_id=None,
property_value="terrestrial biome",
property_type="raw"
)
terrestrial_entities = client.find_entities(filter_dict=env_filter)
print(f"Found {terrestrial_entities.count} entities with terrestrial biome")

# Example 7: Combine property filter with geographic search
# Find entities near Florida with specific depth
print("\n7. Combine property filter with geographic search...")
depth_filter_geo = client.create_property_filter(
property_name="depth",
property_value="0 - 0.1m",
property_type="raw"
)
florida_depth_entities = client.get_entities_in_region(
center_lat=28.1,
center_lng=-81.4,
radius_km=100,
filter_dict=depth_filter_geo
)
print(f"Found {florida_depth_entities.count} entities near Florida with depth '0 - 0.1m'")

# Example 8: Combine property filter with source filter
# Find NMDC entities with specific elevation
print("\n8. Combine property filter with source filter...")
elevation_filter_nmdc = client.create_property_filter(
property_name="elevation",
property_id=None,
property_value=24,
property_type="numeric"
)
nmdc_elevation_entities = client.find_entities_by_source(
source="NMDC",
filter_dict=elevation_filter_nmdc
)
print(f"Found {nmdc_elevation_entities.count} NMDC entities with elevation = 24m")

# Example 9: Filter by range for depth (using minimum/maximum values)
# Find entities with depth overlapping 0-0.2m range
print("\n9. Filter by depth range (0-0.2m)...")
depth_range_filter = client.create_property_filter(
property_name="depth",
property_id=None,
property_value=[0, 0.2],
property_type="range"
)
depth_range_entities = client.find_entities(filter_dict=depth_range_filter)
print(f"Found {depth_range_entities.count} entities with depth in range 0-0.2m")

# Example 10: Multiple property filters combined
# Find entities with both specific depth AND elevation
print("\n10. Multiple property filters (depth AND elevation)...")
combined_filter = {
"$and": [
client.create_property_filter(
property_name="depth",
property_id=None,
property_value="0 - 0.1m",
property_type="raw"
),
client.create_property_filter(
property_name="elevation",
property_id=None,
property_value=24,
property_type="numeric"
)
]
}
combined_entities = client.find_entities(filter_dict=combined_filter)
print(f"Found {combined_entities.count} entities with depth '0 - 0.1m' AND elevation = 24m")

except BertronAPIError as e:
print(f"API Error: {e}")
except Exception as e:
Expand Down