Skip to content

Add Realm execution support for parallel operators (Replicate, Repartition, Combine, Reduction)#1641

Open
seemamirch wants to merge 9 commits intoflexflow:masterfrom
seemamirch:sm/realm-parallel-operators-all
Open

Add Realm execution support for parallel operators (Replicate, Repartition, Combine, Reduction)#1641
seemamirch wants to merge 9 commits intoflexflow:masterfrom
seemamirch:sm/realm-parallel-operators-all

Conversation

@seemamirch
Copy link
Copy Markdown

@seemamirch seemamirch commented Apr 23, 2026

Summary

This PR implements support for all four parallel operators (Replicate, Repartition, Combine, Reduction) CPU based in the Realm-based execution backend.

Motivation

Parallel operators are fundamental to distributed training in FlexFlow — they handle data redistribution between devices (repartition), replication (replicate), gathering (combine), and partial sum reduction (reduction). Previously only the graph pass infrastructure existed; this PR adds the Realm execution layer.

Approach

Each parallel op is executed as a Realm copy operation rather than a regular op task, since they involve data movement between devices rather than computation.

Data Movement

Op FWD BWD
Replicate broadcast copy to all replicas sum-reduce replica gradients
Repartition scatter full tensor into shards gather shards into full tensor
Combine gather shards into full tensor scatter gradient into shards
Reduction sum-reduce partial tensors broadcast gradient to all partials

Key Design Decisions

Offset index space allocation: Each shard instance is allocated with a non-zero origin rect reflecting its position in the full tensor (e.g. shard 1 of a [10,16] tensor split along dim 0 is allocated at [5..9, 0..15]). This allows plain Realm copies between shards and combined tensors to work correctly.

CopyDomain: A new CopyDomain enum (SRC/DST) selects which instance's index space is used as the copy domain. SRC is used when the source is the smaller piece; DST when the destination is the smaller piece.

get_per_device_shape: A new function that correctly computes the per-device tensor size by dividing each dimension by its shard degree, used for instance allocation.

ManyToOne collision fix: Combine FWD and Reduction FWD produce multiple invocations with the same output DynamicValueAttrs. Only the first is registered in the producer map to avoid collision while still marking the value as having a producer.

Changes

New files

  • lib/task-spec/include/task-spec/dynamic_graph/parallel_op_utils.h — shared is_parallel_op_attrs helper
  • lib/realm-execution/test/src/realm-execution/test_op_combine.cc — combine op e2e test
  • lib/realm-execution/test/src/realm-execution/test_op_reduce.cc — reduction op e2e test
  • lib/realm-execution/test/src/realm-execution/test_op_repartition.cc — repartition op e2e test

Modified files

  • shard_expansion.cc — add perform_shard_expansion_one_to_many and perform_shard_expansion_many_to_one generic functions covering all eight FWD/BWD combinations
  • pass_expansion.cc — add perform_pass_expansion_for_parallel_op
  • copy_insertion.cc — guard parallel ops in copy insertion
  • make_dynamic_open_dataflow_graph_from_mapped_pcg.cc — dispatch parallel ops to build_parallel_op_invocation
  • dynamic_open_dataflow_graph.cc — fix ManyToOne collision for combine/reduction FWD
  • pcg_instance.cc — add Realm copy dispatch for all parallel ops
  • realm_context.cc/h — add CopyDomain, create_instance_with_offset, update issue_copy to use get_indexspace()
  • instance_allocation.cc — use offset index spaces for sharded tensor allocation
  • parallel_tensor_dims.cc/h — add get_per_device_dims
  • parallel_tensor_shape.cc/h — add get_per_device_shape
  • task_id_t.cc — return nullopt for all parallel ops in get_init_task_id_for_op_attrs

Testing

Added e2e tests for each parallel op running on 2 CPU devices:

  • test_op_combine.cc — repartition → combine → relu
  • test_op_reduce.cc — repartition → linear → reduction → relu
  • test_op_repartition.cc — repartition → relu

This change is Reviewable

Seema Mirchandaney added 5 commits April 9, 2026 15:49
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion
- Add perform_shard_expansion_for_replicate and _bwd for shard expansion
- Add build_replicate_invocation in make_dynamic_open_dataflow_graph
- Add is_replicate_attrs helper and guard replicate in copy_insertion
- Add ReplicateAttrs to TrainingOperationAttrs
- Add SumReductionFloat/Double for backward replicate reduce operation
- Add issue_replicate_bwd in spawn_dynamic_node_invocation
- Fix per_device_op_state init race condition with direct write
- Fix .value() calls on optional per_device_op_state across op impls
- Update issue_copy to support optional reduction op
- Add testcase for replicate op
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion
- Add perform_shard_expansion_for_replicate and _bwd for shard expansion
- Add build_replicate_invocation in make_dynamic_open_dataflow_graph
- Add is_replicate_attrs helper and guard replicate in copy_insertion
- Add ReplicateAttrs to TrainingOperationAttrs
- Add SumReductionFloat/Double for backward replicate reduce operation
- Add issue_replicate_bwd in spawn_dynamic_node_invocation
- Fix per_device_op_state init race condition with direct write
- Fix .value() calls on optional per_device_op_state across op impls
- Update issue_copy to support optional reduction op
- Add testcase for replicate op
…ion) in Realm backend (CPU versions)

Each parallel op is handled via Realm copies rather than op tasks:
- Replicate FWD: broadcast copy; BWD: sum-reduce replica gradients
- Repartition FWD: scatter into shards; BWD: gather shards into full tensor
- Combine FWD: gather shards into full tensor; BWD: scatter gradient into shards
- Reduction FWD: sum-reduce partials; BWD: broadcast gradient to all partials

Key implementation details:
- Parallel ops have no ComputationGraphOpAttrs equivalent
- Instance allocation uses offset index spaces for sharded tensors
- issue_copy uses actual instance index space via get_indexspace()
- Add CopyDomain::SRC/DST to select correct copy domain
- Combine FWD and Reduction FWD register only first invocation in ManyToOne
- Add get_per_device_shape() for correct per-device tensor size
- Add perform_shard_expansion_one_to_many and _many_to_one generic functions
- Add parallel_op_utils.h shared header for is_parallel_op_attrs
- Add CopyDomain enum and create_instance_with_offset to RealmContext
- Add multi-cpu tests for the parallel operators
@seemamirch
Copy link
Copy Markdown
Author

@lockshaw @elliottslaughter - please review

Seema Mirchandaney and others added 4 commits April 28, 2026 15:54
Key changes:
- Add CUDA reduction registration via Realm::Cuda::add_cuda_redop_kernels
- Add apply_cuda/fold_cuda methods to SumReductionFloat and SumReductionDouble
- Add REALM_CUDA_HD decorators for host/device compatibility
- Add atomicAdd GPU paths with pre-Pascal CAS fallback for double
- Move register_reductions() to realm_reduction_cuda.cu compiled with NVCC
- Update CMakeLists.txt to use manual add_library with LANGUAGES CXX CUDA
- Add GPU test cases for Combine, Repartition and Reduction ops

The reduction registration uses create_reduction_op + add_cuda_redop_kernels
which registers both CPU and GPU paths in a single call. Realm handles
per-device kernel translation automatically for all GPUs.
Add GPU support for parallel operators in the Realm backend
…d CPU kernels

Implements several key components for the Realm-based execution backend:

External Tensor Instance Support
- Add create_external_instance to RealmContext — wraps existing memory buffer
  in a Realm region instance using layout cloned from a temporary instance
- Add create_external_tensor to RealmContext — allocates in Z_COPY or
  SYSTEM_MEM (CPU-accessible) and creates external instance
- Update perform_instance_allocation to handle preallocated external tensors
- Fix dynamic_tensor_accessor_from_instance to use get_per_device_shape
  instead of get_piece_shape so accessor shapes correctly reflect per-device
  tensor sizes for sharded tensors

Parallel Operator Ordering Fix
- Fix topological ordering in dynamic_open_dataflow_graph.cc — many-to-one
  parallel op FWD shards (combine, reduction) now always precede their
  consumers by updating value_map to the latest producer instead of skipping
  duplicate outputs

CPU Kernel Implementations
- element_unary_kernels_cpu: implement cpu_forward_kernel and
  cpu_backward_kernel for RELU
- element_binary_kernels_cpu: implement cpu_forward_kernel (EW_ADD, EW_SUB,
  EW_MUL, EW_DIV) and cpu_backward_kernel (EW_ADD, EW_SUB, EW_MUL); add
  num_elements parameter to both cpu kernels and forward/backward_kernel
- linear_kernels_cpu: fix cpu_backward_kernel relu backward — was returning
  binary mask instead of grad * (output > 0)

ProfilingSettings Type Safety
- Change measure_iters to positive_int and warmup_iters to nonnegative_int
  in profiling_settings.dtg.toml — guarantees kernels always execute
- Update cpu_profiling_wrapper to use int_from_positive_int for division
  and loop bounds

Per-Device Op State for CPU Ops
- Add has_per_device_op_state() to ITaskArgumentAccessor, TaskArgumentAccessor,
  and LocalTaskArgumentAccessor
- Guard get_per_device_op_state() calls in element_unary and element_binary
  forward/backward impls with has_per_device_op_state()
- Fix per_device_op_state_init_task to early-return for CPU ops that return
  nullopt from init_kernel instead of panicking on assert_unwrap

Tests
- test_op_replicate: add external input instance test with value verification
- test_op_combine: add external input instance test with value verification
- test_op_repartition: add external input instance test with value verification
- test_op_reduce: add external input instance test with value verification
- test_e2e (local-execution): disable loss decrease check pending CPU loss
  forward kernel implementation
Support CPU external tensor instances, additional CPU kernels, ProfilingSettings update, per device state and tests with value verification
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant