Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sqlonfhir"
version = "0.0.1"
version = "0.0.2"
authors = [
{ name="Stuart Mackle", email="stuart.mackle@sas.com" },
{ name="SAS Healthcare Solutions", email="support@sas.com" }
Expand Down
341 changes: 173 additions & 168 deletions sqlonfhir/sqlonfhir.py
Original file line number Diff line number Diff line change
@@ -1,165 +1,188 @@
# Copyright © 2025, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from fhirpathpy import evaluate
from fhirpathpy import compile
from fhirpathpy.models import models


# This is here to change identified.ofType(boolean) to identifiedBoolean.ofType(boolean)
# This function should be replaced at some point as fhirpathpy and the SQL on FHIR specifications converge
def escape_path(path):
return path.replace("$this", "identity()")


def eval_fhirpath(resource, path):
path = escape_path(path)
user_invocation_table = {
"getReferenceKey": {
"fn": get_reference_key,
"arity": {0: [], 1: ["Identifier"]},
},
"getResourceKey": {"fn": get_resource_key},
"identity": {"fn": identity},
}
e = evaluate(
resource,
path,
options={"userInvocationTable": user_invocation_table},
model=models["r4"],
)
return e


def identity(resource):
return resource


def union_all(expr, resource, view_definition):
result = []
for expression in expr["unionAll"]:
result += call_fn(expression, resource, view_definition)
return result


def for_each(expr, resource, view_definition):
result = []
replaced_path = replace_constants(expr["forEach"], view_definition)
selections = eval_fhirpath(resource, replaced_path)
for selection in selections:
result += select(expr, selection, view_definition)
return result


def get_all_child_columns(expression):
empty_record = {}
if "column" in expression:
for column in expression["column"]:
empty_record[column["name"]] = None
if "select" in expression:
for selection in expression["select"]:
empty_record = empty_record | get_all_child_columns(selection)
if "unionAll" in expression:
for selection in expression["unionAll"]:
empty_record = empty_record | get_all_child_columns(selection)
return empty_record


def for_each_or_null(expr, resource, view_definition):
result = []
selections = eval_fhirpath(resource, expr["forEachOrNull"])
if len(selections) == 0:
return [get_all_child_columns(expr)]
for selection in selections:
result += select(expr, selection, view_definition)
return result


def row_product(parts):
if len(parts) == 1:
return parts[0]
rows = [{}]
new_rows = None
for part in parts:
new_rows = []
for partial_row in part:
for row in rows:
new_rows.append(partial_row | row)
rows = new_rows
return rows


def select(expr, resource, view_definition):
if "where" in expr:
for condition in expr["where"]:
replaced_path = replace_constants(condition["path"], view_definition)
val = eval_fhirpath(resource, replaced_path)
if len(val) == 0:
return []
elif not val[0]:
return []
elif type(val[0]) is not bool:
raise Exception("Where clause did not evaluate to boolean")
sub_selections = []
for selection in expr["select"]:
selection_evaluation = call_fn(selection, resource, view_definition)
if selection_evaluation != []:
sub_selections.append(selection_evaluation)
else:
return []
return row_product(sub_selections)


def column(expr, resource, view_definition):
record = {}
for column in expr["column"]:
replaced_path = replace_constants(column["path"], view_definition)
value = eval_fhirpath(resource, replaced_path)
if "collection" in column and column["collection"]:
record[column["name"]] = value
elif len(value) == 1:
record[column["name"]] = value[0]
elif len(value) == 0:
record[column["name"]] = None
else:
raise Exception("Unexpected multiple values")
return [record]


def call_fn(expr, resource, view_definition):
if "forEachOrNull" in expr:
return for_each_or_null(expr, resource, view_definition)
elif "forEach" in expr:
return for_each(expr, resource, view_definition)
elif "select" in expr:
return select(expr, resource, view_definition)
elif "unionAll" in expr:
return union_all(expr, resource, view_definition)
elif "column" in expr:
return column(expr, resource, view_definition)


def get_resource_key(ctx):
return ctx[0]["id"]


def get_reference_key(ctx, identifier=None):
if identifier and not ctx[0]["reference"].startswith(identifier):
return None
index = ctx[0]["reference"].find("/")
return ctx[0]["reference"][index + 1 :]


def eval(resources, view_definition):
norm = normalize(view_definition)
result = []
evaluator = ViewDefinitionEvaluator()
for resource in resources:
if resource["resourceType"] != view_definition["resource"]:
continue
result += call_fn(norm, resource, view_definition)
result += evaluator.call_fn(norm, resource, view_definition)
return result


# View Definition Evaluation
class ViewDefinitionEvaluator:
def __init__(self):
self.fhirpath_cache = {}

def eval_fhirpath(self, resource, path):
# $this is not in the FHIRPath spec, replacing as per
# reference implementation
path = path.replace("$this", "identity()")
# Add functions to FHIRPath that are needed for SQL on FHIR
user_invocation_table = {
"getReferenceKey": {
"fn": self.get_reference_key,
"arity": {0: [], 1: ["Identifier"]},
},
"getResourceKey": {"fn": self.get_resource_key},
"identity": {"fn": self.identity},
}

if path not in self.fhirpath_cache:
self.fhirpath_cache[path] = compile(
path,
model=models["r4"],
options={"userInvocationTable": user_invocation_table},
)

return self.fhirpath_cache[path](resource)

def union_all(self, expr, resource, view_definition):
result = []
for expression in expr["unionAll"]:
result += self.call_fn(expression, resource, view_definition)
return result

def for_each(self, expr, resource, view_definition):
result = []
replaced_path = self.replace_constants(expr["forEach"], view_definition)
selections = self.eval_fhirpath(resource, replaced_path)
for selection in selections:
result += self.select(expr, selection, view_definition)
return result

def get_all_child_columns(self, expression):
empty_record = {}
if "column" in expression:
for column in expression["column"]:
empty_record[column["name"]] = None
if "select" in expression:
for selection in expression["select"]:
empty_record = empty_record | self.get_all_child_columns(selection)
if "unionAll" in expression:
for selection in expression["unionAll"]:
empty_record = empty_record | self.get_all_child_columns(selection)
return empty_record

def for_each_or_null(self, expr, resource, view_definition):
result = []
selections = self.eval_fhirpath(resource, expr["forEachOrNull"])
if len(selections) == 0:
return [self.get_all_child_columns(expr)]
for selection in selections:
result += self.select(expr, selection, view_definition)
return result

def select(self, expr, resource, view_definition):
if "where" in expr:
for condition in expr["where"]:
replaced_path = self.replace_constants(
condition["path"], view_definition
)
val = self.eval_fhirpath(resource, replaced_path)
if len(val) == 0 or not val[0]:
return []
elif type(val[0]) is not bool:
raise Exception("Where clause did not evaluate to boolean")
sub_selections = []
for selection in expr["select"]:
selection_evaluation = self.call_fn(selection, resource, view_definition)
if selection_evaluation != []:
sub_selections.append(selection_evaluation)
else:
return []
return self.row_product(sub_selections)

def column(self, expr, resource, view_definition):
record = {}
for column in expr["column"]:
replaced_path = self.replace_constants(column["path"], view_definition)
value = self.eval_fhirpath(resource, replaced_path)
if "collection" in column and column["collection"]:
record[column["name"]] = value
elif len(value) == 1:
record[column["name"]] = value[0]
elif len(value) == 0:
record[column["name"]] = None
else:
raise Exception("Unexpected multiple values")
return [record]

def call_fn(self, expr, resource, view_definition):
if "forEachOrNull" in expr:
return self.for_each_or_null(expr, resource, view_definition)
elif "forEach" in expr:
return self.for_each(expr, resource, view_definition)
elif "select" in expr:
return self.select(expr, resource, view_definition)
elif "unionAll" in expr:
return self.union_all(expr, resource, view_definition)
elif "column" in expr:
return self.column(expr, resource, view_definition)

# Utility functions
@staticmethod
def row_product(parts):
if len(parts) == 1:
return parts[0]
rows = [{}]
new_rows = None
for part in parts:
new_rows = []
for partial_row in part:
for row in rows:
new_rows.append(partial_row | row)
rows = new_rows
return rows

@staticmethod
def replace_constants(path, view_definition):
for constant in view_definition.get("constant") or []:
value_key = [vk for vk in constant.keys() if vk.startswith("value")]
if "valueBoolean" in constant:
path = path.replace(
"%" + constant["name"], str(constant["valueBoolean"]).lower()
)
elif value_key[0] in [
"valueInteger",
"valueDecimal",
"valueUnsignedInt",
"valuePositiveInt",
]:
path = path.replace("%" + constant["name"], str(constant[value_key[0]]))
elif len(value_key) == 1:
path = path.replace(
"%" + constant["name"], "'" + constant[value_key[0]] + "'"
)
return path

# FHIRPath Helper Functions
@staticmethod
def get_resource_key(ctx):
return ctx[0]["id"]

@staticmethod
def get_reference_key(ctx, identifier=None):
if identifier and not ctx[0]["reference"].startswith(identifier):
return None
index = ctx[0]["reference"].find("/")
return ctx[0]["reference"][index + 1 :]

@staticmethod
def identity(resource):
return resource


# View Definition Normalization & Validation
def normalize(view):
# Make sure we only operate on keys we have implemented for
current_functions = view.keys() & {
"select",
"column",
Expand All @@ -170,14 +193,17 @@ def normalize(view):
if "forEach" in view or "forEachOrNull" in view:
if "select" not in view:
view["select"] = []
# Move column & unionAll under forEach for evaluation for_each() -> union_all()/column()
view = move_functions(
view, "select", current_functions - {"forEach", "select", "forEachOrNull"}
)
view["select"] = [normalize(selection) for selection in view["select"]]
elif "select" in view:
view = move_functions(view, "select", current_functions - {"select"})
view["select"] = [normalize(selection) for selection in view["select"]]
# if unionAll and column are present, we want to select so they are merged at the same level
# if unionAll and column are present make sure it is evaluated as row_product(unionAll + column)
# instead of union_all(column()). We also require select() to make sure both get evaluated
# and union_all doesn't take precedence
elif "unionAll" in view and "column" in view:
view["select"] = []
view = move_functions(view, "select", current_functions - {"select"})
Expand All @@ -199,27 +225,6 @@ def move_functions(view, function, sub_functions):
return view


def replace_constants(path, view_definition):
for constant in view_definition.get("constant") or []:
value_key = [vk for vk in constant.keys() if vk.startswith("value")]
if "valueBoolean" in constant:
path = path.replace(
"%" + constant["name"], str(constant["valueBoolean"]).lower()
)
elif value_key[0] in [
"valueInteger",
"valueDecimal",
"valueUnsignedInt",
"valuePositiveInt",
]:
path = path.replace("%" + constant["name"], str(constant[value_key[0]]))
elif len(value_key) == 1:
path = path.replace(
"%" + constant["name"], "'" + constant[value_key[0]] + "'"
)
return path


def validate_union_all(union_all):
all_column_orderings = []
for expression in union_all:
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.