Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 68 additions & 56 deletions sqlonfhir/sqlonfhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def evaluate(resources, view_definition):
if "resource" not in view_definition:
raise Exception("View Definition is missing resource type.")

norm = normalize(view_definition)
constants = view_definition.get("constant", [])
norm = normalize(view_definition, constants)
result = []
evaluator = ViewDefinitionEvaluator()
for resource in resources:
Expand All @@ -42,7 +43,7 @@ def evaluate(resources, view_definition):
or resource["resourceType"] != view_definition["resource"]
):
continue
result += evaluator.call_fn(norm, resource, view_definition)
result += evaluator.call_fn(norm, resource)
return result


Expand All @@ -61,29 +62,25 @@ def __init__(self):

def eval_fhirpath(self, resource, path):
if path not in self.fhirpath_cache:
# $this is not in the FHIRPath spec, replacing as per
# reference implementation
replaced_path = path.replace("$this", "identity()")
self.fhirpath_cache[path] = compile(
replaced_path,
path,
model=models["r4"],
options={"userInvocationTable": self.user_invocation_table},
)

return self.fhirpath_cache[path](resource)

def union_all(self, expr, resource, view_definition):
def union_all(self, expr, resource):
result = []
for expression in expr["unionAll"]:
result += self.call_fn(expression, resource, view_definition)
result += self.call_fn(expression, resource)
return result

def for_each(self, expr, resource, view_definition):
def for_each(self, expr, resource):
result = []
replaced_path = self.replace_constants(expr["forEach"], view_definition)
selections = self.eval_fhirpath(resource, replaced_path)
selections = self.eval_fhirpath(resource, expr["forEach"])
for selection in selections:
result += self.select(expr, selection, view_definition)
result += self.select(expr, selection)
return result

def get_all_child_columns(self, expression):
Expand All @@ -99,40 +96,36 @@ def get_all_child_columns(self, expression):
empty_record |= self.get_all_child_columns(selection)
return empty_record

def for_each_or_null(self, expr, resource, view_definition):
def for_each_or_null(self, expr, resource):
result = []
selections = self.eval_fhirpath(resource, expr["forEachOrNull"])
if len(selections) == 0:
return [self.get_all_child_columns(expr)]
for selection in selections:
result += self.select(expr, selection, view_definition)
result += self.select(expr, selection)
return result

def select(self, expr, resource, view_definition):
def select(self, expr, resource):
if "where" in expr:
for condition in expr["where"]:
replaced_path = self.replace_constants(
condition["path"], view_definition
)
val = self.eval_fhirpath(resource, replaced_path)
val = self.eval_fhirpath(resource, condition["path"])
if len(val) == 0 or not val[0]:
return []
elif not isinstance(val[0], bool):
raise Exception("Where clause did not evaluate to boolean")
sub_selections = []
for selection in expr["select"]:
selection_evaluation = self.call_fn(selection, resource, view_definition)
selection_evaluation = self.call_fn(selection, resource)
if selection_evaluation != []:
sub_selections.append(selection_evaluation)
else:
return []
return self.row_product(sub_selections)

def column(self, expr, resource, view_definition):
def column(self, expr, resource):
record = {}
for column in expr["column"]:
replaced_path = self.replace_constants(column["path"], view_definition)
value = self.eval_fhirpath(resource, replaced_path)
value = self.eval_fhirpath(resource, column["path"])
if "collection" in column and column["collection"]:
record[column["name"]] = value
elif len(value) == 1:
Expand All @@ -143,25 +136,24 @@ def column(self, expr, resource, view_definition):
raise Exception("Unexpected multiple values")
return [record]

def call_fn(self, expr, resource, view_definition):
def call_fn(self, expr, resource):
if "forEachOrNull" in expr:
return self.for_each_or_null(expr, resource, view_definition)
return self.for_each_or_null(expr, resource)
elif "forEach" in expr:
return self.for_each(expr, resource, view_definition)
return self.for_each(expr, resource)
elif "select" in expr:
return self.select(expr, resource, view_definition)
return self.select(expr, resource)
elif "unionAll" in expr:
return self.union_all(expr, resource, view_definition)
return self.union_all(expr, resource)
elif "column" in expr:
return self.column(expr, resource, view_definition)
return self.column(expr, resource)

# Utility functions
@staticmethod
def row_product(parts):
if len(parts) == 1:
return parts[0]
rows = [{}]
new_rows = None
for part in parts:
new_rows = []
for partial_row in part:
Expand All @@ -170,27 +162,6 @@ def row_product(parts):
rows = new_rows
return rows

@staticmethod
def replace_constants(path, view_definition):
for constant in view_definition.get("constant") or []:
value_key = [vk for vk in constant.keys() if vk.startswith("value")]
if "valueBoolean" in constant:
path = path.replace(
"%" + constant["name"], str(constant["valueBoolean"]).lower()
)
elif value_key[0] in [
"valueInteger",
"valueDecimal",
"valueUnsignedInt",
"valuePositiveInt",
]:
path = path.replace("%" + constant["name"], str(constant[value_key[0]]))
elif len(value_key) == 1:
path = path.replace(
"%" + constant["name"], "'" + constant[value_key[0]] + "'"
)
return path

# FHIRPath Helper Functions
@staticmethod
def get_resource_key(ctx):
Expand All @@ -209,7 +180,7 @@ def identity(resource):


# View Definition Normalization & Validation
def normalize(view):
def normalize(view, constants):
# Make sure we only operate on keys we have implemented for
current_functions = view.keys() & {
"select",
Expand All @@ -219,30 +190,71 @@ def normalize(view):
"forEachOrNull",
}
if "forEach" in view or "forEachOrNull" in view:
if "forEach" in view:
view["forEach"] = replace_constants(view["forEach"], constants)
if "select" not in view:
view["select"] = []
# Move column & unionAll under forEach for evaluation for_each() -> union_all()/column()
view = move_functions(
view, "select", current_functions - {"forEach", "select", "forEachOrNull"}
)
view["select"] = [normalize(selection) for selection in view["select"]]
view["select"] = [
normalize(selection, constants) for selection in view["select"]
]
elif "select" in view:
view = move_functions(view, "select", current_functions - {"select"})
view["select"] = [normalize(selection) for selection in view["select"]]
view["select"] = [
normalize(selection, constants) for selection in view["select"]
]
if "where" in view:
for where_clause in view["where"]:
where_clause["path"] = replace_constants(
where_clause["path"], constants
)
# if unionAll and column are present make sure it is evaluated as row_product(unionAll + column)
# instead of union_all(column()). We also require select() to make sure both get evaluated
# and union_all doesn't take precedence
elif "unionAll" in view and "column" in view:
view["select"] = []
view = move_functions(view, "select", current_functions - {"select"})
view["select"] = [normalize(selection) for selection in view["select"]]
view["select"] = [
normalize(selection, constants) for selection in view["select"]
]
elif "unionAll" in view:
view = move_functions(view, "unionAll", current_functions - {"unionAll"})
view["unionAll"] = [normalize(selection) for selection in view["unionAll"]]
view["unionAll"] = [
normalize(selection, constants) for selection in view["unionAll"]
]
validate_union_all(view["unionAll"])
elif "column" in view:
for column in view["column"]:
column["path"] = replace_constants(column["path"], constants)
return view


def replace_constants(path, constants):
for constant in constants:
value_key = [vk for vk in constant.keys() if vk.startswith("value")]
if "valueBoolean" in constant:
path = path.replace(
"%" + constant["name"], str(constant["valueBoolean"]).lower()
)
elif value_key[0] in [
"valueInteger",
"valueDecimal",
"valueUnsignedInt",
"valuePositiveInt",
]:
path = path.replace("%" + constant["name"], str(constant[value_key[0]]))
elif len(value_key) == 1:
path = path.replace(
"%" + constant["name"], "'" + constant[value_key[0]] + "'"
)
# $this is not in the FHIRPath spec, replacing as per reference implementation
replaced_path = path.replace("$this", "identity()")
return replaced_path


def move_functions(view, function, sub_functions):
if "column" in sub_functions:
view[function].insert(0, {"column": view["column"]})
Expand Down