Skip to content

[AURON #1853] Convert Flink Calc operators to Native Calc operators#2283

Open
weiqingy wants to merge 8 commits into
apache:masterfrom
weiqingy:AURON-1853-impl
Open

[AURON #1853] Convert Flink Calc operators to Native Calc operators#2283
weiqingy wants to merge 8 commits into
apache:masterfrom
weiqingy:AURON-1853-impl

Conversation

@weiqingy
Copy link
Copy Markdown
Contributor

@weiqingy weiqingy commented May 24, 2026

Which issue does this PR close?

Closes #1853

Rationale for this change

FlinkAuronCalcOperator (#1857) can execute Flink Calc plans natively but is unreachable from real Flink SQL jobs — the planner instantiates Flink's stock StreamExecCalc which builds a JVM codegen operator via CodeGenOperatorFactory<RowData>. This PR closes the loop: a shadowed StreamExecCalc in auron-flink-planner (at the same FQCN as Flink's, picked up by classpath ordering) attempts to translate its projection + condition RexNodes into a native Project[Filter?[FFIReader]] plan using the converter framework (#1856 / #1859). On success, it constructs a FlinkAuronCalcOperator inline; on any failure, it falls back transparently to Flink's stock Calc via super.translateToPlanInternal.

After this PR, a SELECT a + 1 FROM t query routes through Auron's native arithmetic instead of Flink's codegen-Calc bytecode — the first end-to-end Flink-on-Auron acceleration path is operational. Subsequent sub-issues (#1860 / #1861 / #1862 / #1863 / #1864) layer on more RexNode converters; #1865 adds source-Calc fusion. This PR provides the substitution mechanism they all plug into.

What changes are included in this PR?

Six commits, each independently reviewable.

Commit 1Register built-in RexNode converters; add fallback config option

  • FlinkNodeConverterFactory ships the three built-in converters (RexInputRefConverter, RexLiteralConverter, RexCallConverter) on its singleton at class load.
  • FlinkAuronConfiguration gains FAIL_BACK_FLINK_ENGINE_ENABLED (boolean, default true, key auron.failback.flink.engine.enabled).

Commit 2Shadow Flink's StreamExecCalc to attempt native Calc

  • New org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc in auron-flink-planner at the same FQCN as Flink's; classpath ordering surfaces this class to the planner.
  • translateToPlanInternal builds Project[Filter?[FFIReader-placeholder]] via tryBuildAuronPlan(...); the FFIReader leaf carries a placeholder resource ID that FlinkAuronCalcOperator.open() rewrites at runtime per the Introduce FlinkAuronCalcOperator #1857 contract.
  • Observability: WARN per unique unsupported RexNode class (deduplicated by ThreadLocal<Set>) + WARN per plan-composition exception (with stack trace).
  • FlinkAuronCalcOperator.NativeRuntimeFactory now extends java.io.Serializable so the operator survives TaskManager dispatch. Interface is @VisibleForTesting package-private; public API unchanged.

Commit 3Add AuronCalcRewriteITCase for end-to-end Flink SQL coverage

  • Four SQL-level tests against TestValuesTableFactory: multi-column native projection, filter-with-fallback (> not yet converter-supported), unsupported-function silent fallback (UPPER), mixed Calcs (UNION ALL with one convertible + one non-convertible).

Commit 4Decouple StreamExecCalcTest from javac's StreamExecCalc binding

  • Replaced direct StreamExecCalc.peekWarnEmitCountForTest() calls with a new invokeStaticInt(Class, String) reflection helper, matching the existing invokeStatic pattern.
  • Dropped @Override on CapturingTranslator.translateToFlinkCalc; runtime virtual dispatch routes correctly via the loaded shadow.

Commit 5Complete native Calc runtime path for Flink

Three runtime defects fixed at root cause:

  • INPUT_BATCH_STATISTICS_ENABLE added to FlinkAuronConfiguration (default false). The native execution context queries this key by reflection on every Auron-native plan; absence on FlinkAuronConfiguration (only SparkAuronConfiguration had it) NPE'd before the first batch.
  • FlinkAuronCalcOperator.open now builds a metric tree mirroring the plan shape via a new buildMetricTree(plan, mg) helper. Native finalization walks the metric tree in lockstep with Project[Filter?[FFIReader]] and calls MetricNode.getChild(i); the previous Collections.emptyList() IOOBE'd.
  • Native runtime construction deferred from open() to first non-empty drainNative(). JniBridge.callNative spawns a tokio task that immediately streams output through the FFI Reader; on the first pull from an empty exporter the reader saw EOF and called exporter.close(), nulling the writer. reinitExporterAndRuntime now leaves nativeRuntime null between cycles. FlinkAuronCalcOperatorTest's multi-cycle drain contract was updated (factory invocations = drain cycles, not 1 + N).

Commit 6Read Arrow timestamp unit from vector instead of assuming microseconds

  • ArrowTimestampColumnVector.getTimestamp previously hard-coded floorDiv(raw, 1000). FlinkArrowUtils.toArrowSchema already maps Flink TIMESTAMP precision to four Arrow units (SECOND / MILLISECOND / MICROSECOND / NANOSECOND) and TimestampWriter.doWrite writes symmetrically across all four — only the reader was unit-blind, which shrank 2026-03-16T12:03 down to 1970-01-21T12:41:02.580 on Timestamp(Millisecond) vectors.
  • Now reads TimeUnit from vector.getField().getType() at construction and dispatches on it (SECOND × 1000, MILLISECOND identity, MICROSECOND ÷ 1000, NANOSECOND ÷ 1_000_000).
  • New FlinkArrowReaderTest cases (testTimestampMilliVector, testTimestampSecVector, testTimestampNanoVector).

Are there any user-facing changes?

  1. Native acceleration for Calc operators — any Flink SQL SELECT … WHERE … whose projection and condition use only converter-supported RexNodes now runs through Auron's native engine. Today's supported set (from Convert Math operators to Auron Native operators #1859): RexInputRef, RexLiteral, RexCall with SqlKind in {+, -, *, /, %, MINUS_PREFIX, PLUS_PREFIX, CAST}.

  2. Two new config options on FlinkAuronConfiguration:

    • flink.auron.failback.flink.engine.enabled (boolean, default true). When true, an unsupported RexNode silently falls back to Flink's codegen Calc. When false, conversion failure throws IllegalStateException at job submission.
    • flink.auron.input.batch.statistics.enable (boolean, default false). Mirrors Spark's INPUT_BATCH_STATISTICS_ENABLE; queried by the native engine on every Auron-native plan.
  3. WARN log lines on fallback — TaskManager logs surface per-unique-RexNode-class lines like Auron StreamExecCalc fallback (node 17): unsupported RexNode org.apache.calcite.rex.RexFieldAccess; using Flink CodeGen Calc. and per-exception lines including the stack trace.

No deprecations. No removed APIs.

How was this patch tested?

CI: 123 / 123 checks passing.

Unit tests:

  • StreamExecCalcTest — 12 tests: plan-build paths, fallback paths, strict mode, WARN dedup.
  • FlinkAuronCalcOperatorTest — 14 tests: open / processElement / drain / close lifecycle, including the deferred-native-runtime contract.
  • FlinkArrowReaderTest — 25 tests; new testTimestamp{Milli,Sec,Nano}Vector lock the Arrow time-unit dispatch.
  • FlinkAuronConfigurationTest, FlinkNodeConverterFactoryTest.

Integration tests (real StreamTableEnvironment against TestValuesTableFactory / Kafka source):

  • AuronCalcRewriteITCase — 4 tests covering native + fallback + mixed paths.
  • AuronFlinkCalcITCase.testPlusSELECT \int` + 1 FROM T1` end-to-end through native.
  • AuronKafkaSourceITCase — 4 windowed-aggregation tests; post-aggregate Calc routes through the native operator.

Command:

./build/mvn test -Pspark-3.5,scala-2.12,flink-1.18 \
    -pl auron-flink-extension/auron-flink-planner,auron-flink-extension/auron-flink-runtime

Build the native library first via ./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 to exercise the Auron-execution paths.

Checkstyle: 0 violations on both modules.

weiqingy added 3 commits May 21, 2026 23:35
…k config option

Prerequisite infrastructure for the shadowed StreamExecCalc landing in
a subsequent commit:

- FlinkNodeConverterFactory now ships the three built-in converters
  (RexInputRefConverter, RexLiteralConverter, RexCallConverter) on the
  singleton at class load, so production callers reach a usable
  factory without explicit registration. Tests creating fresh
  instances via the package-private constructor stay unaffected.

- FlinkAuronConfiguration gains FAIL_BACK_FLINK_ENGINE_ENABLED
  (boolean, default true), keyed auron.failback.flink.engine.enabled.
  Controls whether conversion failure falls back to the Flink engine
  silently (default) or fails the job at submit time.

Tests: FlinkAuronConfigurationTest 2/2, FlinkNodeConverterFactoryTest
9/9, checkstyle 0 violations.

Issue: apache#1853
The shadowed class lives at the same FQCN as Flink's stock
StreamExecCalc; classpath ordering (auron-flink-planner ahead of
flink-table-planner via the standard auron-flink-assembly packaging)
makes the planner construct this class whenever it builds a Calc
ExecNode. Same pattern as Apache Gluten's gluten-flink.

At translateToPlanInternal time, the class attempts to translate its
projection + condition into a native Project[Filter?[FFIReader]] plan
using the converter framework. On success, constructs a
FlinkAuronCalcOperator inline and wraps it in a OneInputTransformation.
On any failure (unsupported RexNode or exception during composition),
falls back to Flink's stock CodeGenOperator via
super.translateToPlanInternal — gated by FAIL_BACK_FLINK_ENGINE_ENABLED
(default true falls back, false throws IllegalStateException).

Observability: WARN per unique unsupported RexNode class
(deduplicated), WARN per plan-composition exception (per-occurrence
with stack trace). Per-submission INFO summary deferred — Flink
1.18's PlannerBase exposes no clean submission-end hook.

Also: NativeRuntimeFactory now extends java.io.Serializable
(@VisibleForTesting interface). Without the marker, Flink's operator
dispatch to TaskManagers throws NotSerializableException — a latent
defect in apache#1857 that the E2E ITCase in the next commit will exercise.
Marker interfaces are non-breaking.

Tests: StreamExecCalcTest 12/12, FlinkAuronCalcOperatorTest 14/14,
checkstyle 0 violations.

Issue: apache#1853
…QL coverage

Four tests exercising distinct paths of the shadowed StreamExecCalc:

- testMultiColumnArithmeticProjection — Auron path with multi-expression
  projection (`int + 1, int * 2`).
- testFilterAndProjectEndToEnd — Calc-with-condition; GREATER_THAN is
  not yet converter-supported, so this verifies fallback-path
  correctness. The Auron-side Filter[FFIReader] plan-shape coverage
  lands once a predicate-returning converter does.
- testFallbackOnUnsupportedExprStillExecutes — UPPER(string) triggers
  silent fallback; the job emits the correct UPPERed rows.
- testMixedSupportedAndUnsupportedCalcs — UNION ALL of one convertible
  and one non-convertible Calc; verifies per-Calc granularity at the
  job-level correctness layer.

No duplicate with AuronFlinkCalcITCase.testPlus (single-expression
arithmetic). Two of the four tests pass without the native library;
the other two share testPlus's native-lib prerequisite.

Tests: compile clean, checkstyle 0 violations.

Issue: apache#1853
@github-actions github-actions Bot added the flink label May 24, 2026
weiqingy added 3 commits May 24, 2026 11:03
…ecCalc binding

The shadowed StreamExecCalc shares its FQCN with Flink's stock class.
Maven's scala-maven-plugin testCompile classpath ordering is not always
deterministic across environments: on Linux CI runners, javac resolves
StreamExecCalc to flink-table-planner_2.12-1.18.1.jar instead of the
local target/classes, so symbols only present on the shadow
(peekWarnEmitCountForTest, translateToFlinkCalc) are not visible at
compile time.

- Add invokeStaticInt helper and reach peekWarnEmitCountForTest via
  reflection, matching the existing pattern used for
  resetWarnDedupForTest.
- Drop @OverRide on CapturingTranslator.translateToFlinkCalc so javac
  no longer requires the parent class to declare it. Runtime virtual
  dispatch is unaffected: the loaded StreamExecCalc is the shadow,
  signatures match, and translateToPlanInternal's invocation routes
  to the subclass override as before.

Tested: StreamExecCalcTest 12/12 locally on JDK 8 + JDK 11; spotless
and checkstyle clean; isolated javac run against only the stock Flink
JAR (no local target/classes) compiles the test cleanly, reproducing
the CI classpath condition.
Three end-to-end runtime defects surfaced once the shadowed StreamExecCalc
began converting Calcs that other tests were already exercising:

- FlinkAuronConfiguration was missing INPUT_BATCH_STATISTICS_ENABLE.
  The native execution context queries it on every Auron-native plan via
  JniBridge.getConfValue, which resolves config keys by reflection on the
  active AuronConfiguration class; absence of the field threw NPE before
  the first batch executed.

- FlinkAuronCalcOperator.open built the FlinkMetricNode with an empty
  children list, but native finalization walks the metric tree in lockstep
  with the plan tree (Project[Filter?[FFIReader]]). The first
  MetricNode.getChild(0) call therefore threw IndexOutOfBoundsException.
  buildMetricTree now mirrors the plan shape so each plan node has a
  matching metric child.

- The native runtime was constructed eagerly in open(), but
  JniBridge.callNative spawns a tokio task that immediately starts
  streaming output through the FFI Reader. On the first pull from an
  empty exporter the reader sees end-of-stream and calls exporter.close(),
  nulling the writer; the next processElement then NPE'd on
  writer.write(row). Construction is now deferred to the first non-empty
  drainNative call, and reinitExporterAndRuntime leaves nativeRuntime
  null between cycles so subsequent drains also start against a populated
  buffer.

Tests updated: FlinkAuronCalcOperatorTest's multi-cycle drain contract
asserts factoryCalls == drain cycles (no eager open()/reinit construction
inflating the count).

Issue: apache#1853
…assuming microseconds

ArrowTimestampColumnVector hard-coded a divide-by-1000 on every read,
treating the underlying value as microseconds. When native produces a
plan with Timestamp(Millisecond) output — e.g. a TUMBLE window's
window_start column — the raw millisecond value was divided by 1000
again, shrinking 2026 timestamps down to 1970 (off-by-1000x).

Now read the unit from the vector's ArrowType.Timestamp metadata at
construction and convert per unit (SEC / MS / US / NS) in
getTimestamp. Added FlinkArrowReaderTest coverage for the three units
that were previously untested.

Issue: apache#1853
@weiqingy
Copy link
Copy Markdown
Contributor Author

@Tartarus0zm Could you please help review this PR when you get a chance? Thanks!

@@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This classpath-ordering assumption is not enforced anywhere — if JAR order changes, the shadow silently stops working with no error or warning. Consider adding a startup assertion that verifies Class.forName("...StreamExecCalc") resolves to Auron's classloader, and throw IllegalStateException if it doesn't.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this — the silent-degradation failure mode is real and worth surfacing.

A literal Class.forName("...StreamExecCalc") assertion inside the shadow turns out to be tautological though: if our shadow's static initializer runs, our class is already the cached one for that FQCN by definition, so the check trivially succeeds. In the case you're actually worried about — Flink's stock StreamExecCalc resolved first — our shadow's <clinit> never runs and the assertion is silently skipped.

For an independent check to fire, it would need to live on a planner-side Auron class whose loading is decoupled from the shadow. Flink 1.18 doesn't expose an SPI for the planner side (which is why we shadowed instead of using ExecNodeGraphProcessor here in the first place). On the runtime side, FlinkAuronCalcOperator only runs when the shadow already succeeded, so it's too late to be an independent witness.

I went with a practical alternative in b52fe386: a one-shot INFO log on the first translateToPlanInternal call per JVM —

Auron StreamExecCalc shadow active (loaded from file:/.../auron-flink-planner-...jar).

Absence of this line under SQL load is a positive-signal indication of silent degradation, and the embedded code-source location surfaces "wrong JAR resolved" cases too. Combined with FAIL_BACK_FLINK_ENGINE_ENABLED=false for strict CI runs (already in this PR — it turns per-Calc fallback into a thrown exception), operators get two complementary detection paths. Class-level Javadoc documents both.

A true independent planner-side check would need a new Auron-controlled entry point on the planner (custom Planner subclass, or an SPI we don't currently use). Happy to file that as a follow-up if you'd like — it's a broader architectural change than this PR was scoped for.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weiqingy @yew1eb Does this scenario actually exist in practice? Using Auron in a Flink setup is an explicit, user-aware action — users need to manually place the Auron jar into the Flink lib directory, and this is something we should clearly require in our user manual. As part of that step, users can also remove the Flink planner jar from the lib directory, since our Auron Flink jar already depends on the Flink planner jar. So I think this might not be a real-world scenario. What do you all think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both — engaging with this carefully because the thread touches the deployment model rather than the code itself. Full analysis (Spark/Gluten contrast, A1/A2 tradeoff tables, the gluten-flink procedure breakdown, and the SPI verification across Flink 1.18 / 1.20 / 2.0 / master) is now in design-doc Rev 4 on my fork: AURON-1853-DESIGN.md → Rev 4. Short summary inline below.

The discussion points at a question the design left implicit: how is Auron deployed into Flink, and what guarantees the JAR-shadowing mechanism activates? Two real options on the table:

A1 — current overlay. Drop auron-flink-planner.jar into $FLINK_HOME/lib/ alongside Flink's flink-table-planner.jar. Both jars present at runtime; Auron's wins via JVM directory traversal order (typically alphabetical, but not spec-guaranteed across JVM vendors). This is what the PR currently implements.

A2 — shaded uber-jar replacement. Auron ships auron-flink-planner-shaded.jar containing flink-table-planner's classes with Auron's StreamExecCalc (and any other Auron overrides) substituted in. The user replaces $FLINK_HOME/lib/flink-table-planner.jar with this shaded jar. Only one StreamExecCalc exists on the classpath — activation is structural rather than ordering-dependent.

A natural-seeming third option — "remove flink-table-planner.jar from lib/, keep only Auron's overlay jar" — doesn't link without shading: Auron's StreamExecCalc extends CommonExecCalc and transitively depends on classes that live in flink-table-planner.jar. Removing that jar NoClassDefFoundErrors at class-load. So "only Auron's jar in lib/" actually working implies bundling the planner content into Auron's jar — which is A2.

For prior art I checked gluten-flink's documented procedure (Flink.md): they don't drop their jars directly in lib/ — they place them in $FLINK_HOME/gluten_lib/ and edit $FLINK_HOME/bin/config.sh to prepend gluten_lib/* to FLINK_CLASSPATH. More deterministic than A1 (the shell-script edit makes ordering an explicit operator action), but it's 2 user steps vs A2's 1, and config.sh edits are fragile in containerized/managed Flink deployments where config.sh may be regenerated or version-locked.

UX comparison across systems:

User steps Activation determinism
Auron-Spark drop jar + spark.sql.extensions=... config Spark SPI — explicit class load
Gluten-Spark drop jar + spark.plugins=... config Same Spark SPI
gluten-flink place jars in gluten_lib/ + edit bin/config.sh Classpath ordering — explicit operator action
Auron-Flink A1 (current) drop in lib/ Classpath ordering — implicit (JVM filesystem traversal)
Auron-Flink A2 replace one jar in lib/ Structural — exactly one class on the classpath

A2 is the only Flink-side path that matches Spark's UX clarity — one user action, deterministic outcome. (Flink 1.18, 1.20, 2.0, and current master all lack an ExecNode-substitution SPI; no proposed FLIP in flight. So JAR overlay or shaded uber-jar is the choice for the foreseeable future.)

My current leaning: A2, held lightly. Reasoning:

  • Activation is structural, not procedural — the classpath-ordering concern goes away by construction; no need to verify via the INFO activation log (which can be kept or simplified).
  • Future ExecNode shadows (Convert Logical operators to Auron Native operators #1860-Support merge native operators #1865) ride on the same clean deployment model.
  • Costs are bounded: ~50 lines of pom.xml shade config; ~30-40MB artifact (flink-table-planner's size; acceptable next to flink-dist*.jar at ~120MB); per-Flink-version re-shading slots into the existing build matrix; one-time NOTICE update for embedded Flink content (Apache projects do this regularly — cf. flink-shaded, spark-hadoop-cloud).

Thinking if we can make A2 be as an immediate follow-up after this PR lands A1?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A2 +1

…shadow is loaded

The JAR-shadowing substitution is silent by design: if the classpath
ordering puts flink-table-planner ahead of auron-flink-planner, Flink
resolves its stock StreamExecCalc instead of this one and the rewriter
quietly does nothing.

Emit an INFO log on the first translateToPlanInternal call per JVM with
the resolved class's code source so operators have a positive signal to
grep for. Absence of the line under SQL load means the shadow is not
active. Class Javadoc documents the failure mode and points at
FAIL_BACK_FLINK_ENGINE_ENABLED=false as a complementary stricter signal.
Copy link
Copy Markdown
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!
It seems we haven't reached consensus on some of the underlying assumptions. Once the user's usage pattern is clearly defined, our solution will likely become much clearer.

@@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weiqingy @yew1eb Does this scenario actually exist in practice? Using Auron in a Flink setup is an explicit, user-aware action — users need to manually place the Auron jar into the Flink lib directory, and this is something we should clearly require in our user manual. As part of that step, users can also remove the Flink planner jar from the lib directory, since our Auron Flink jar already depends on the Flink planner jar. So I think this might not be a real-world scenario. What do you all think?

* adds noise if a brand-new class shows up. ThreadLocal keeps the bookkeeping off the static
* mutable state path (planner threads in session clusters are reused).
*/
private static final ThreadLocal<Set<Class<? extends RexNode>>> WARN_DEDUP = ThreadLocal.withInitial(HashSet::new);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ThreadLocal approach isn't really a good fit here — it can potentially lead to memory leaks. Compilation typically happens inside the JobManager (JM).
Also, should we throw an exception at unsupported nodes instead? It might be more straightforward/intuitive than just logging.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two good catches — addressing both:

(a) ThreadLocal<Set> retention. Agreed. The dedup is intended to be one-per-JVM-per-RexNode-class (so the same unsupported class doesn't spam the log across many job submissions), but ThreadLocal makes it one-per-thread, and JM planner threads in session-cluster mode are long-lived. Fixed in b94b59be: both WARN_DEDUP and WARN_EMIT_COUNT now use non-thread-bound state — a static ConcurrentHashMap.newKeySet() for the dedup (bounded by the small finite set of RexNode subclasses, no unbounded growth) and a static AtomicInteger for the emit counter. Surrounding Javadoc and class-level documentation updated to drop the ThreadLocal / per-planner-thread wording; existing StreamExecCalcTest (12 tests) passes and checkstyle is clean.

(b) Throw vs log on unsupported nodes. This is already configurable in this PR — FAIL_BACK_FLINK_ENGINE_ENABLED (boolean, default true, key auron.failback.flink.engine.enabled):

  • true (default): unsupported RexNode silently falls back to Flink's CodeGen Calc, with the WARN log.
  • false: unsupported RexNode throws IllegalStateException at translation time — job submission fails fast.

I picked true as the default thinking a brand-new deployment would want soft-fallback as the path of least surprise during early adoption. Would you rather see the default flipped to false (strict mode by default, opt-in to fallback)? Either way works — open to it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding question (b), I think the existing option is fine. My original intent was to throw an exception directly for unsupported RexNodes, so we wouldn't need to print logs for each individual node. That said, there would indeed be cases where multiple operators end up throwing duplicate exception logs. Overall, I think the current approach is fine.

…tHashMap

Both WARN_DEDUP and WARN_EMIT_COUNT were ThreadLocal-scoped, which
retains per-thread state on long-lived JM planner threads in session
clusters. Move them to static state — a ConcurrentHashMap.newKeySet()
for the dedup (bounded by the small finite set of RexNode subclasses)
and an AtomicInteger for the test-only emit counter.

The shift also matches the original design intent of dedup at most once
per JVM per unsupported RexNode class, where ThreadLocal had given a
weaker once-per-thread guarantee.
weiqingy added a commit to weiqingy/auron that referenced this pull request May 26, 2026
…y vs shaded uber-jar)

Round 2 review on PR apache#2283 surfaced a question that earlier design
rounds left implicit: how is Auron deployed into a Flink installation,
and what guarantees the JAR-shadowing mechanism activates? The chosen
architecture (shadow class in Flink's package) is unchanged; this Rev
documents the two deployment options and a preferred direction.

A1 — current overlay. Drop the Auron jar into FLINK_HOME/lib/ alongside
flink-table-planner.jar; activation depends on JVM directory traversal
order.

A2 — shaded uber-jar replacement. Auron ships a fat jar that contains
flink-table-planner's classes with Auron overrides substituted in. User
replaces flink-table-planner.jar with the shaded jar; activation is
structural (only one StreamExecCalc class exists on the classpath).

Includes a Spark/Gluten UX comparison, analysis of why gluten-flink's
config.sh-prepend procedure is not separately listed as a third option,
and confirmation that Flink 1.20, 2.0, and current master all lack an
ExecNode-substitution SPI (no proposed FLIP in flight).

Preferred direction (held lightly): A2. Open question to reviewers: A2
in this PR (~2-4 days scope expansion for shade config + NOTICE + smoke
test) or A2 as an immediate follow-up?
Copy link
Copy Markdown
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I don't have any major concerns — just a few nitpicks.

}

/** Test seam: clears the dedup set and emit counter so independent tests do not share state. */
static void resetWarnDedupForTest() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the @VisibleForTesting annotation and call the API directly in the unit tests, rather than going through reflection.

* Test seam: returns the number of WARN log lines actually emitted since the last
* {@link #resetWarnDedupForTest()}.
*/
static int peekWarnEmitCountForTest() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// Reset id counter so getId()-derived strings are reproducible across tests.
ExecNodeContext.resetIdCounter();
// Reset the per-fallback WARN dedup so each test starts from a clean slate.
invokeStatic(StreamExecCalc.class, "resetWarnDedupForTest");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Convert Flink Calc operators to Native Calc operators

3 participants