Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
592ff64
init
aglinxinyuan Feb 9, 2026
b95465d
update
aglinxinyuan Feb 11, 2026
f83041a
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 11, 2026
d9d0cd9
update
aglinxinyuan Feb 11, 2026
144ae29
update
aglinxinyuan Feb 11, 2026
7b13fef
update
aglinxinyuan Feb 11, 2026
bd5ac3a
update
aglinxinyuan Feb 11, 2026
19be0c1
update
aglinxinyuan Feb 11, 2026
706884f
update
aglinxinyuan Feb 11, 2026
21e6a41
update
aglinxinyuan Feb 11, 2026
24da3e3
update
aglinxinyuan Feb 11, 2026
2ba0fa4
update
aglinxinyuan Feb 11, 2026
a05ffd1
update
aglinxinyuan Feb 11, 2026
44fc0e7
update
aglinxinyuan Feb 11, 2026
846aac2
update
aglinxinyuan Feb 12, 2026
6be7dc5
update
aglinxinyuan Feb 12, 2026
d8338d1
update
aglinxinyuan Feb 12, 2026
36e517e
fix
aglinxinyuan Feb 13, 2026
a4bfbdb
fix
aglinxinyuan Feb 13, 2026
393faac
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
cbb2fc7
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
084f602
update
aglinxinyuan Feb 15, 2026
ae0d4ed
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 15, 2026
55d5cec
update
aglinxinyuan Feb 15, 2026
b8faf93
update
aglinxinyuan Feb 15, 2026
a53506a
update
aglinxinyuan Feb 15, 2026
d44a664
update
aglinxinyuan Feb 15, 2026
1cd48fd
update
aglinxinyuan Feb 15, 2026
e35a332
update
aglinxinyuan Feb 16, 2026
4d18d1d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 17, 2026
30a8562
update
aglinxinyuan Feb 24, 2026
b717fb0
update
aglinxinyuan Feb 24, 2026
da8d6ed
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Feb 24, 2026
04fe614
update
aglinxinyuan Feb 27, 2026
160bc6d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 27, 2026
bd27031
update
aglinxinyuan Feb 28, 2026
99e0f86
update
aglinxinyuan Feb 28, 2026
53ae08b
update
aglinxinyuan Feb 28, 2026
8c7d53c
update
aglinxinyuan Feb 28, 2026
92ab10f
update
aglinxinyuan Mar 1, 2026
bc58566
update
aglinxinyuan Mar 1, 2026
c655856
update
aglinxinyuan Mar 1, 2026
0970a53
update
aglinxinyuan Mar 1, 2026
2b78e2c
update
aglinxinyuan Mar 1, 2026
55f288f
update
aglinxinyuan Mar 1, 2026
2ccde1e
update
aglinxinyuan Mar 1, 2026
9b0d14d
update
aglinxinyuan Mar 1, 2026
3a2d0b9
update
aglinxinyuan Mar 1, 2026
0cfcf2f
update
aglinxinyuan Mar 1, 2026
00e49a5
update
aglinxinyuan Mar 1, 2026
f71dbec
update
aglinxinyuan Mar 1, 2026
565ee71
update
aglinxinyuan Mar 1, 2026
aa444a0
update
aglinxinyuan Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message ControlRequest {
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
EndIterationRequest endIterationRequest = 59;

// request for testing
Ping ping = 100;
Expand Down Expand Up @@ -278,4 +279,9 @@ enum StatisticsUpdateTarget {
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
}

message EndIterationRequest{
core.ActorVirtualIdentity LoopStartId = 1 [(scalapb.field).no_box = true];
int32 iteration = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ service WorkerService {
rpc EndWorker(EmptyRequest) returns (EmptyReturn);
rpc StartChannel(EmptyRequest) returns (EmptyReturn);
rpc EndChannel(EmptyRequest) returns (EmptyReturn);
rpc EndIteration(EndIterationRequest) returns (EmptyReturn);
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
Expand Down
7 changes: 7 additions & 0 deletions amber/src/main/python/core/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def __init__(
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)

@classmethod
def from_tuple(cls, tuple, schema):
obj = cls()
obj.__dict__.update(tuple.as_dict())
obj.schema = schema
return obj

def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
self._switch_context()
self._set_output_state(executor.process_state(state, port_id))

except Exception as err:
Expand Down
2 changes: 1 addition & 1 deletion amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def _process_ecm(self, ecm_element: ECMElement):

if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)

self._switch_context()
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
Expand Down
4 changes: 2 additions & 2 deletions amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
if resource_type in {VFSResourceType.RESULT, VFSResourceType.STATE}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
Expand Down Expand Up @@ -96,7 +96,7 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
if resource_type in {VFSResourceType.RESULT, VFSResourceType.STATE}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import typing
from loguru import logger
from pyarrow import Table
from typing import Union
from pyarrow import Table

from core.architecture.sendsemantics.broad_cast_partitioner import (
BroadcastPartitioner,
Expand All @@ -34,7 +34,7 @@
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
from core.models import Tuple, InternalQueue, DataFrame, DataPayload
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.util import Stoppable, get_one_of
Expand Down Expand Up @@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> typing.Iterator[DataFrame
if receiver == self.worker_actor_id:
yield self.tuples_to_data_frame(tuples)

def emit_state_with_filter(self, state: State) -> typing.Iterator[StateFrame]:
for receiver, payload in self.partitioner.flush_state(state):
if receiver == self.worker_actor_id:
yield (
StateFrame(payload)
if isinstance(payload, State)
else self.tuples_to_data_frame(payload)
)

def run(self) -> None:
"""
Main execution logic that reads tuples from the materialized storage and
Expand All @@ -149,6 +158,19 @@ def run(self) -> None:
tup.cast_to_schema(self.tuple_schema)
for data_frame in self.tuple_to_batch_with_filter(tup):
self.emit_payload(data_frame)
try:
state_document, state_schema = DocumentFactory.open_document(
f"{self.uri}/state"
)
state_iterator = state_document.get()
for state in state_iterator:
for state_frame in self.emit_state_with_filter(
State.from_tuple(state, state_schema)
):
self.emit_payload(state_frame)
except ValueError:
pass

self.emit_ecm("EndChannel", EmbeddedControlMessageType.PORT_ALIGNMENT)
self._finished = True
except Exception as err:
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/storage/vfs_uri_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
RESULT = "result"
RUNTIME_STATISTICS = "runtimeStatistics"
CONSOLE_MESSAGES = "consoleMessages"
STATE = "state"


class VFSURIFactory:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class AmberProcessor(
with Serializable {

/** FIFO & exactly once */
val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)

// 1. Unified Output
val outputGateway: NetworkOutputGateway =
Expand All @@ -55,7 +55,7 @@ abstract class AmberProcessor(
}
)
// 2. RPC Layer
val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
val asyncRPCServer: AsyncRPCServer =
new AsyncRPCServer(outputGateway, actorId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class WorkflowScheduler(
this.physicalPlan = updatedPhysicalPlan
}

def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext()

def hasPendingRegions: Boolean = schedule != null && schedule.hasNext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ case class WorkflowExecution() {
* @throws AssertionError if the `RegionExecution` has already been initialized.
*/
def initRegionExecution(region: Region): RegionExecution = {
regionExecutions.remove(region.id)
// ensure the region execution hasn't been initialized already.
assert(
!regionExecutions.contains(region.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging {
private var portId: Option[PortIdentity] = None

def acceptMessage(msg: WorkflowFIFOMessage): Unit = {
//channel remove
val seq = msg.sequenceNumber
val payload = msg.payload
if (isDuplicated(seq)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
enforcers += enforcer
}

def removeControlChannel(from: ActorVirtualIdentity): Unit = {
inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ class NetworkOutputGateway(
idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement()
}

def removeControlChannel(to: ActorVirtualIdentity): Unit = {
idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer
import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.storage.result.ResultSchema
import org.apache.texera.amber.core.tuple._
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
Expand Down Expand Up @@ -124,6 +125,9 @@ class OutputManager(
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()

private val ECMWriters: mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] =
mutable.HashMap()

/**
* Add down stream operator and its corresponding Partitioner.
*
Expand Down Expand Up @@ -232,6 +236,10 @@ class OutputManager(
})
}

def saveECMToStorageIfNeeded(tuple: Tuple, outputPortId: PortIdentity): Unit = {
this.ECMWriters(outputPortId).putOne(new Tuple(ResultSchema.ecmSchema, Array("erge")))
}

/**
* Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that
* the output port is properly completed. If the output port does not need storage, no action will be done.
Expand Down Expand Up @@ -280,6 +288,10 @@ class OutputManager(
}

private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = {
this.ECMWriters(portId) = DocumentFactory
.createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema)
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
val bufferedItemWriter = DocumentFactory
.openDocument(storageUri)
._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class RegionExecutionCoordinator(
val actorRef = actorRefService.getActorRef(workerId)
// Remove the actorRef so that no other actors can find the worker and send messages.
actorRefService.removeActorRef(workerId)
asyncRPCClient.inputGateway.removeControlChannel(workerId)
asyncRPCClient.outputGateway.removeControlChannel(workerId)
gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter()
}
}.toSeq
Expand Down Expand Up @@ -209,14 +211,15 @@ class RegionExecutionCoordinator(
regionExecution: RegionExecution,
attempt: Int = 1
): Future[Unit] = {
terminateWorkers(regionExecution).rescue { case err =>
logger.warn(
s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.",
err
)
Future
.sleep(killRetryDelay)(killRetryTimer)
.flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
terminateWorkers(regionExecution).rescue {
case err =>
logger.warn(
s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.",
err
)
Future
.sleep(killRetryDelay)(killRetryTimer)
.flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
}
}

Expand Down Expand Up @@ -568,7 +571,18 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing"))
DocumentFactory.createDocument(storageUriToAdd, schema)

if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) {
try {
DocumentFactory.openDocument(storageUriToAdd)
} catch {
case _: Exception =>
DocumentFactory.createDocument(storageUriToAdd, schema)
}
} else {
DocumentFactory.createDocument(storageUriToAdd, schema)
}

WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.apache.texera.amber.engine.architecture.scheduling

import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
import org.apache.texera.amber.util.JSONUtils.objectMapper

case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] {
private var currentLevel = levelSets.keys.minOption.getOrElse(0)
private var loopStartLevel = currentLevel
private var iteration = 10
private var i = 1

def getRegions: List[Region] = levelSets.values.flatten.toList

Expand All @@ -31,4 +38,24 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat
currentLevel += 1
regions
}

def loopNext(): Set[Region] = {
val regions = levelSets(currentLevel)
if (
regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))
) {
//iteration = objectMapper.readValue(regions.head.getOperators.head.opExecInitInfo.asInstanceOf[OpExecWithClassName].descString, classOf[LoopStartOpDesc]).iteration
loopStartLevel = currentLevel - 1
}
if (
regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))
) {
if (i < iteration) {
currentLevel = loopStartLevel
i += 1
}
}
currentLevel += 1
regions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with ResumeHandler
with StartHandler
with EndHandler
with EndIterationHandler
with StartChannelHandler
with EndChannelHandler
with AssignPortHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.engine.architecture.worker.promisehandlers

import com.twitter.util.Future
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest,
EndIterationRequest
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer

trait EndIterationHandler {
this: DataProcessorRPCHandlerInitializer =>

override def endIteration(
request: EndIterationRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
dp.executor match {
//case _: LoopEndOpExec =>
//workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker))
case _ =>
//dp.processOnFinish()
//dp.outputManager.finalizeIteration(request.worker)
}
EmptyReturn()
}
}
Loading
Loading