Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,693 changes: 4,675 additions & 18 deletions notebooks/tutorial/MultipleSources.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/pipeline/src/pyearthtools/pipeline/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ def undo(self, sample):
@property
def iteration_order(self) -> tuple[Any, ...]:
"""Get ordering from `iterator`"""

if self.iterator is None:
raise ValueError("Cannot iterate over pipeline if iterator is not set.")
return tuple(i for i in self.iterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


from functools import reduce
from typing import TypeVar, Union, Optional, Any

import xarray as xr
Expand Down Expand Up @@ -43,6 +44,32 @@ def join(self, sample: tuple[Union[xr.Dataset, xr.DataArray], ...]) -> xr.Datase
def unjoin(self, sample: Any) -> tuple:
return super().unjoin(sample)

class InterpLike(Joiner):
"""
Merge a tuple of xarray object's.

Currently cannot undo this operation
"""

_override_interface = "Serial"

def __init__(self, reference_dataset=None, merge_kwargs: Optional[dict[str, Any]] = None):
super().__init__()
self.record_initialisation()
self.reference_dataset = reference_dataset
self._merge_kwargs = merge_kwargs

def join(self, sample: tuple[Union[xr.Dataset, xr.DataArray], ...]) -> xr.Dataset:
"""Join sample"""
# merged = reduce(lambda a, b: a.interp_like(b), sample)
reference = self.reference_dataset
interped = [i.interp_like(reference) for i in sample]
merged = xr.merge(interped)
return merged

def unjoin(self, sample: Any) -> tuple:
raise NotImplementedError("Not Implemented")


class Concatenate(Joiner):
"""
Expand Down
Loading