Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

## New Features / Improvements

* Capability introduces an indicator for aggregations and timers firing during a pipeline drain, allowing users and sinks to recognize and appropriately handle potentially incomplete or partial data ([#36884](https://github.com/apache/beam/issues/36884)).
* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go/Python) ([#38349](https://github.com/apache/beam/issues/38349)).
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
Expand Down
11 changes: 9 additions & 2 deletions website/www/site/content/en/blog/looping-timers.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,18 @@ public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<St
public void onTimer(
OnTimerContext c,
@StateId("key") ValueState<String> key,
@TimerId("loopingTimer") Timer loopingTimer) {
@TimerId("loopingTimer") Timer loopingTimer,
CausedByDrain drain) {

LOG.info("Timer @ {} fired", c.timestamp());
c.output(KV.of(key.read(), 0));

// Check if drain is in progress and avoid resetting the timer
if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
LOG.info("Drain in progress, stopping looping timer.");
return;
}

// If we do not put in a “time to live” value, then the timer would loop forever
Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
if (nextTimer.isBefore(stopTimerTime)) {
Expand Down Expand Up @@ -347,4 +354,4 @@ support for dealing with this use case in production.


Runner specific notes:
Google Cloud Dataflow Runners Drain feature does not support looping timers (Link to matrix)
Support for cancelling looping timers on drain is currently limited to Dataflow and is being implemented (see [Issue #36884](https://github.com/apache/beam/issues/36884)).
83 changes: 83 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -7501,6 +7501,89 @@ class BufferDoFn(DoFn):
{{< code_sample "sdks/go/examples/snippets/04transforms.go" batching_dofn_example >}}
{{< /highlight >}}

#### 11.5.3. Looping timers {#looping-timers}

Looping timers are a pattern where a timer sets another timer for a future time, creating a loop. This is useful for producing periodic outputs or heartbeats in the absence of data for a specific key.

When draining a pipeline, it is important to terminate these loops to allow the pipeline to finish. In the Java SDK, you can use the `CausedByDrain` parameter in the `@OnTimer` method to check if the timer firing was induced by a drain operation. **Note:** `CausedByDrain` will be set only in certain runners. Check the [capability matrix](/documentation/runners/capability-matrix/) for more details.

{{< highlight java >}}
public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {
@TimerId("loopingTimer") private final TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(
@Element KV<String, Integer> element,
@TimerId("loopingTimer") Timer timer,
OutputReceiver<KV<String, Integer>> output) {

// Set initial timer
timer.offset(Duration.standardMinutes(1)).setRelative();
output.output(element);
}

@OnTimer("loopingTimer")
public void onTimer(
@Key String key,
@TimerId("loopingTimer") Timer timer,
OutputReceiver<KV<String, Integer>> output,
CausedByDrain drain) {

output.output(KV.of(key, 0));

// Cancel looping timer if drain is in progress
if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
return;
}

// Set next timer
timer.offset(Duration.standardMinutes(1)).setRelative();
}
}
{{< /highlight >}}

{{< highlight py >}}
# Python does not currently support detecting drain in OnTimer.
# The following example demonstrates a looping timer without drain support,
# using event time.

class LoopingTimerDoFn(DoFn):
TIMER = TimerSpec('timer', TimeDomain.WATERMARK)

def process(self, element, ts=DoFn.TimestampParam, timer=DoFn.TimerParam(TIMER)):
timer.set(ts + Duration(seconds=60))
yield element

@on_timer(TIMER)
def on_timer(self, key=DoFn.KeyParam, timestamp=DoFn.TimestampParam, timer=DoFn.TimerParam(TIMER)):
yield (key, 0)
# Loops forever, cannot handle drain safely if it never stops.
timer.set(timestamp + Duration(seconds=60))
{{< /highlight >}}

{{< highlight go >}}
// Go does not currently support detecting drain in OnTimer.
// The following example demonstrates a looping timer without drain support,
// using event time.

type LoopingTimerFn struct {
Timer timers.EventTime
}

func (fn *LoopingTimerFn) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
nextTime := et.ToTime().Add(60 * time.Second)
fn.Timer.Set(tp, nextTime)
emit(key, value)
}

func (fn *LoopingTimerFn) OnTimer(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(string, int)) {
emit(key, 0)
// Loops forever, cannot handle drain safely if it never stops.
nextTime := et.ToTime().Add(60 * time.Second)
fn.Timer.Set(tp, nextTime)
}
{{< /highlight >}}


## 12. Splittable `DoFns` {#splittable-dofns}

Expand Down
2 changes: 1 addition & 1 deletion website/www/site/data/capability_matrix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ capability-matrix:
- class: dataflow
l1: "Partially"
l2:
l3: Dataflow has a native drain operation, but it does not work in the presence of event time timer loops. Final implemention pending model support.
l3: Dataflow has a native drain operation, support for event time timer loops drain is limited to Non-portable runner.
- class: prism
l1: "No"
l2:
Expand Down
Loading