Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and start a new "In Progress" section above it.
- Support experimental `corsa_compress_v2` and `corsa_decompress_v2` processes ([Open-EO/openeo-geotrellis-extensions#702](https://github.com/Open-EO/openeo-geotrellis-extensions/issues/702))
- Dry run: pass through `align` argument of `resample_spatial` operation ([Open-EO/openeo-geopyspark-driver#1662](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1662))
- `BoundingBox.round_to_resolution()`: add `offset_x` and `offset_y` parameters ([Open-EO/openeo-geopyspark-driver#1662](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1662))
- Support logging added value for synchronous requests ([Open-EO/openeo-geopyspark-driver#1436](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1436))


## 0.138.0
Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.139.0a8"
__version__ = "0.139.0a9"
5 changes: 4 additions & 1 deletion openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container, Any, Tuple

import flask
from openeo_driver.processgraph.definitions import ProcessGraphFlatDict

import openeo_driver.util.view_helpers
from openeo.utils.version import ComparableVersion
Expand Down Expand Up @@ -786,7 +787,7 @@ class Processing(MicroService):
def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> ProcessRegistry:
raise NotImplementedError

def evaluate(self, process_graph: dict, env: EvalEnv = None):
def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not a big fan of this change: this makes the dry-run aspect leak into the generic Processing.evaluate() API, which is not intended to be concerned with dry and wet run aspects.

For example: this will break the aggregator, which also implements this API, but has no dry-run concept

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to pass the tracer trough the existing EvalEnv instead?

"""Evaluate given process graph (flat dict format)."""
raise NotImplementedError

Expand Down Expand Up @@ -1063,6 +1064,8 @@ def request_costs(
job_options: Union[dict, None] = None,
request_id: str,
success: bool,
process_graph: Union[ProcessGraphFlatDict, None] = None,
tracer: Union[DryRunDataTracer, None] = None,
) -> Optional[float]:
"""
Report resource usage of (current) synchronous processing request and get associated cost.
Expand Down
10 changes: 5 additions & 5 deletions openeo_driver/processgraph/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from openeo.utils.version import ComparableVersion

from openeo_driver.backend import OpenEoBackendImplementation, Processing
from openeo_driver.dry_run import DryRunDataTracer
from openeo_driver.errors import OpenEOApiException
from openeo_driver.processes import DEFAULT_NAMESPACE, ProcessArgs, ProcessRegistry, ProcessSpec
from openeo_driver.specs import SPECS_ROOT
Expand Down Expand Up @@ -238,7 +239,7 @@ def get_basic_env(self, api_version: str = OPENEO_API_VERSION_DEFAULT) -> EvalEn
}
)

def evaluate(self, process_graph: dict, env: EvalEnv = None):
def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True):
from openeo_driver.processgraph.evaluator import evaluate
return evaluate(process_graph=process_graph, env=env or self.get_basic_env(), do_dry_run=False)

Expand All @@ -257,9 +258,10 @@ def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> Pr
else:
raise OpenEOApiException(message=f"No process support for openEO version {api_version}")

def evaluate(self, process_graph: dict, env: EvalEnv = None):
def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True):
from openeo_driver.processgraph.evaluator import evaluate
return evaluate(process_graph=process_graph, env=env)

return evaluate(process_graph=process_graph, env=env, do_dry_run=do_dry_run)

def validate(self, process_graph: dict, env: EvalEnv = None):
from openeo_driver.processgraph.evaluator import evaluate, _collect_end_nodes, convert_node
Expand Down Expand Up @@ -304,5 +306,3 @@ def validate(self, process_graph: dict, env: EvalEnv = None):

def extra_validation(self, process_graph, env, result, source_constraints):
return []


25 changes: 18 additions & 7 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
LINK_REL,
)
from openeo_driver.datacube import DriverMlModel
from openeo_driver.dry_run import DryRunDataTracer
from openeo_driver.errors import (
FeatureUnsupportedException,
FilePathInvalidException,
Expand Down Expand Up @@ -725,8 +726,22 @@ def result(user: User):
}
)

tracer = DryRunDataTracer()

request_costs = functools.partial(
backend_implementation.request_costs,
user=user,
job_options=job_options,
request_id=request_id,
process_graph=process_graph,
tracer=tracer,
)

try:
result = backend_implementation.processing.evaluate(process_graph=process_graph, env=env)
result = backend_implementation.processing.evaluate(
process_graph=copy.deepcopy(process_graph), env=env, do_dry_run=tracer
)

_log.info(f"`POST /result`: {type(result)}")

if result is None:
Expand All @@ -742,18 +757,14 @@ def result(user: User):
result = to_save_result(data=result)
response = result.create_flask_response()

costs = backend_implementation.request_costs(
success=True, user=user, request_id=request_id, job_options=job_options
)
costs = request_costs(success=True)
if costs:
# TODO not all costs are accounted for so don't expose in "OpenEO-Costs" yet
response.headers["OpenEO-Costs-experimental"] = costs

except Exception:
# TODO: also send "OpenEO-Costs" header on failure
backend_implementation.request_costs(
success=False, user=user, request_id=request_id, job_options=job_options
)
request_costs(success=False)
raise

# Add request id as "OpenEO-Identifier" like we do for batch jobs.
Expand Down
8 changes: 5 additions & 3 deletions tests/test_views_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from openeo_driver.datacube import DriverDataCube, DriverVectorCube
from openeo_driver.datastructs import ResolutionMergeArgs, SarBackscatterArgs
from openeo_driver.dry_run import ProcessType
from openeo_driver.dry_run import ProcessType, DryRunDataTracer
from openeo_driver.dummy import dummy_backend
from openeo_driver.dummy.dummy_backend import DummyVisitor
from openeo_driver.errors import (
Expand Down Expand Up @@ -4308,13 +4308,13 @@ def test_vector_buffer_returns_error_on_empty_result_geometry(api):
(None, None, None),
# request_costs override
(
lambda user, request_id, success, job_options: 1234 + isinstance(user, User),
lambda user, job_options, request_id, success, process_graph, tracer: 1234 + isinstance(user, User),
None,
"1235",
),
# Extra job options handling
(
lambda user, request_id, success, job_options: 1234 * job_options.get("extra", 0),
lambda user, job_options, request_id, success, process_graph, tracer: 1234 * job_options.get("extra", 0),
{"extra": 2},
"2468",
),
Expand Down Expand Up @@ -4363,6 +4363,8 @@ def test_synchronous_processing_request_costs(
job_options=job_options,
success=success,
request_id="r-abc123",
process_graph=pg,
tracer=dirty_equals.IsInstance(DryRunDataTracer),
)


Expand Down