Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from core.models.payload import DataPayload, DataFrame
from core.models.state import State
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
Expand Down Expand Up @@ -87,6 +88,10 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

self._port_state_writers: typing.Dict[
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand Down Expand Up @@ -124,9 +129,12 @@ def add_output_port(
def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
"""
Create a separate thread for saving output tuples of a port
to storage in batch.
to storage in batch, and open a long-lived buffered writer for
state materialization on the same port.
"""
document, _ = DocumentFactory.open_document(storage_uri)
document, _ = DocumentFactory.open_document(
VFSURIFactory.result_uri(storage_uri)
)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
port_storage_writer = PortStorageWriter(
Expand All @@ -144,6 +152,29 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
writer_thread,
)

state_document, _ = DocumentFactory.open_document(
VFSURIFactory.state_uri(storage_uri)
)
state_buffered_item_writer = state_document.writer(
str(get_worker_index(self.worker_id))
)
state_writer_queue = Queue()
state_port_writer = PortStorageWriter(
buffered_item_writer=state_buffered_item_writer,
queue=state_writer_queue,
)
state_writer_thread = threading.Thread(
target=state_port_writer.run,
daemon=True,
name=f"port_state_writer_thread_{port_id}",
)
state_writer_thread.start()
self._port_state_writers[port_id] = (
state_writer_queue,
state_port_writer,
state_writer_thread,
)

def get_port(self, port_id=None) -> WorkerPort:
return list(self._ports.values())[0]

Expand Down Expand Up @@ -171,6 +202,20 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)

def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
# When port_id is omitted the same state row is fanned out to
# every output port's state table. This mirrors the
# broadcast-to-all-workers behavior on the emit side: state is
# shared context, not per-key data, so every downstream operator
# (and every worker reading the materialization) needs the full
# set.
element = PortStorageWriterElement(data_tuple=state.to_tuple())
if port_id is None:
for writer_queue, _, _ in self._port_state_writers.values():
writer_queue.put(element)
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)

Comment thread
aglinxinyuan marked this conversation as resolved.
def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand All @@ -184,6 +229,11 @@ def close_port_storage_writers(self) -> None:
for _, _, writer_thread in self._port_storage_writers.values():
# This blocking call will wait for all the writer to finish commit
writer_thread.join()
for _, state_writer, _ in self._port_state_writers.values():
state_writer.stop()
for _, _, state_writer_thread in self._port_state_writers.values():
state_writer_thread.join()
self._port_state_writers.clear()

def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def process_input_state(self) -> None:
payload=batch,
)
)
self.context.output_manager.save_state_to_storage_if_needed(output_state)

def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
Expand Down
107 changes: 59 additions & 48 deletions amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
namespace,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
Expand All @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
namespace,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
from core.models import Tuple, InternalQueue, DataFrame, DataPayload
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
from core.util import Stoppable, get_one_of
from core.util.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
Expand Down Expand Up @@ -132,14 +133,28 @@ def run(self) -> None:
emits an EndChannel ECM. Use the same partitioner implementation as that in
output manager, where a tuple is batched by the partitioner and only
selected as the input of this worker according to the partitioner.

States and tuples are persisted to separate tables, so the original
interleaving is lost and replay has to pick an order: we replay states
first because downstream operators typically need their state set up
before they process the incoming tuples. Every state is broadcast to
every downstream worker -- no partitioner filtering, unlike the tuple
loop. State is shared context (e.g. config / counters), not per-key
data, so each worker needs the full set.
"""
try:
self.materialization, self.tuple_schema = DocumentFactory.open_document(
self.uri
VFSURIFactory.result_uri(self.uri)
)
self.emit_ecm("StartChannel", EmbeddedControlMessageType.NO_ALIGNMENT)
storage_iterator = self.materialization.get()

state_document, _ = DocumentFactory.open_document(
Comment thread
aglinxinyuan marked this conversation as resolved.
VFSURIFactory.state_uri(self.uri)
)
for state_row in state_document.get():
self.emit_payload(StateFrame(State.from_tuple(state_row)))

storage_iterator = self.materialization.get()
# Iterate and process tuples.
for tup in storage_iterator:
if self._stopped:
Expand Down
3 changes: 3 additions & 0 deletions amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StorageConfig:
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_TABLE_STATE_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None

Expand All @@ -51,6 +52,7 @@ def initialize(
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
table_state_namespace,
directory_path,
commit_batch_size,
s3_endpoint,
Expand All @@ -71,6 +73,7 @@ def initialize(
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name

cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)

Expand Down
17 changes: 14 additions & 3 deletions amber/src/main/python/core/storage/vfs_uri_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
RESULT = "result"
RUNTIME_STATISTICS = "runtimeStatistics"
CONSOLE_MESSAGES = "consoleMessages"
STATE = "state"


class VFSURIFactory:
Expand Down Expand Up @@ -88,12 +89,22 @@ def extract_value(key: str) -> str:
)

@staticmethod
def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
"""Creates a URI pointing to a result storage."""
base_uri = (
def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
"""Base URI for a port. Result and state URIs derive from it via
`result_uri` / `state_uri`.
"""
return (
f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
f"/eid/{execution_id.id}/globalportid/"
f"{serialize_global_port_identity(global_port_id)}"
)

@staticmethod
def result_uri(base_uri: str) -> str:
"""The result-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.RESULT.value}"

@staticmethod
def state_uri(base_uri: str) -> str:
"""The state-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.STATE.value}"
2 changes: 2 additions & 0 deletions amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def init_loguru_logger(stream_log_level) -> None:
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
Expand All @@ -68,6 +69,7 @@ def init_loguru_logger(stream_log_level) -> None:
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_table_state_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
Expand Down
Loading
Loading