Add Realm execution support for parallel operators (Replicate, Repartition, Combine, Reduction)#1641
Open
seemamirch wants to merge 9 commits intoflexflow:masterfrom
Open
Conversation
added 5 commits
April 9, 2026 15:49
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion - Add perform_shard_expansion_for_replicate and _bwd for shard expansion - Add build_replicate_invocation in make_dynamic_open_dataflow_graph - Add is_replicate_attrs helper and guard replicate in copy_insertion - Add ReplicateAttrs to TrainingOperationAttrs - Add SumReductionFloat/Double for backward replicate reduce operation - Add issue_replicate_bwd in spawn_dynamic_node_invocation - Fix per_device_op_state init race condition with direct write - Fix .value() calls on optional per_device_op_state across op impls - Update issue_copy to support optional reduction op - Add testcase for replicate op
- Add perform_pass_expansion_for_replicate for fwd/bwd pass expansion - Add perform_shard_expansion_for_replicate and _bwd for shard expansion - Add build_replicate_invocation in make_dynamic_open_dataflow_graph - Add is_replicate_attrs helper and guard replicate in copy_insertion - Add ReplicateAttrs to TrainingOperationAttrs - Add SumReductionFloat/Double for backward replicate reduce operation - Add issue_replicate_bwd in spawn_dynamic_node_invocation - Fix per_device_op_state init race condition with direct write - Fix .value() calls on optional per_device_op_state across op impls - Update issue_copy to support optional reduction op - Add testcase for replicate op
…ion) in Realm backend (CPU versions) Each parallel op is handled via Realm copies rather than op tasks: - Replicate FWD: broadcast copy; BWD: sum-reduce replica gradients - Repartition FWD: scatter into shards; BWD: gather shards into full tensor - Combine FWD: gather shards into full tensor; BWD: scatter gradient into shards - Reduction FWD: sum-reduce partials; BWD: broadcast gradient to all partials Key implementation details: - Parallel ops have no ComputationGraphOpAttrs equivalent - Instance allocation uses offset index spaces for sharded tensors - issue_copy uses actual instance index space via get_indexspace() - Add CopyDomain::SRC/DST to select correct copy domain - Combine FWD and Reduction FWD register only first invocation in ManyToOne - Add get_per_device_shape() for correct per-device tensor size - Add perform_shard_expansion_one_to_many and _many_to_one generic functions - Add parallel_op_utils.h shared header for is_parallel_op_attrs - Add CopyDomain enum and create_instance_with_offset to RealmContext - Add multi-cpu tests for the parallel operators
Author
|
@lockshaw @elliottslaughter - please review |
Key changes: - Add CUDA reduction registration via Realm::Cuda::add_cuda_redop_kernels - Add apply_cuda/fold_cuda methods to SumReductionFloat and SumReductionDouble - Add REALM_CUDA_HD decorators for host/device compatibility - Add atomicAdd GPU paths with pre-Pascal CAS fallback for double - Move register_reductions() to realm_reduction_cuda.cu compiled with NVCC - Update CMakeLists.txt to use manual add_library with LANGUAGES CXX CUDA - Add GPU test cases for Combine, Repartition and Reduction ops The reduction registration uses create_reduction_op + add_cuda_redop_kernels which registers both CPU and GPU paths in a single call. Realm handles per-device kernel translation automatically for all GPUs.
Add GPU support for parallel operators in the Realm backend
…d CPU kernels Implements several key components for the Realm-based execution backend: External Tensor Instance Support - Add create_external_instance to RealmContext — wraps existing memory buffer in a Realm region instance using layout cloned from a temporary instance - Add create_external_tensor to RealmContext — allocates in Z_COPY or SYSTEM_MEM (CPU-accessible) and creates external instance - Update perform_instance_allocation to handle preallocated external tensors - Fix dynamic_tensor_accessor_from_instance to use get_per_device_shape instead of get_piece_shape so accessor shapes correctly reflect per-device tensor sizes for sharded tensors Parallel Operator Ordering Fix - Fix topological ordering in dynamic_open_dataflow_graph.cc — many-to-one parallel op FWD shards (combine, reduction) now always precede their consumers by updating value_map to the latest producer instead of skipping duplicate outputs CPU Kernel Implementations - element_unary_kernels_cpu: implement cpu_forward_kernel and cpu_backward_kernel for RELU - element_binary_kernels_cpu: implement cpu_forward_kernel (EW_ADD, EW_SUB, EW_MUL, EW_DIV) and cpu_backward_kernel (EW_ADD, EW_SUB, EW_MUL); add num_elements parameter to both cpu kernels and forward/backward_kernel - linear_kernels_cpu: fix cpu_backward_kernel relu backward — was returning binary mask instead of grad * (output > 0) ProfilingSettings Type Safety - Change measure_iters to positive_int and warmup_iters to nonnegative_int in profiling_settings.dtg.toml — guarantees kernels always execute - Update cpu_profiling_wrapper to use int_from_positive_int for division and loop bounds Per-Device Op State for CPU Ops - Add has_per_device_op_state() to ITaskArgumentAccessor, TaskArgumentAccessor, and LocalTaskArgumentAccessor - Guard get_per_device_op_state() calls in element_unary and element_binary forward/backward impls with has_per_device_op_state() - Fix per_device_op_state_init_task to early-return for CPU ops that return nullopt from init_kernel instead of panicking on assert_unwrap Tests - test_op_replicate: add external input instance test with value verification - test_op_combine: add external input instance test with value verification - test_op_repartition: add external input instance test with value verification - test_op_reduce: add external input instance test with value verification - test_e2e (local-execution): disable loss decrease check pending CPU loss forward kernel implementation
Support CPU external tensor instances, additional CPU kernels, ProfilingSettings update, per device state and tests with value verification
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR implements support for all four parallel operators (Replicate, Repartition, Combine, Reduction) CPU based in the Realm-based execution backend.
Motivation
Parallel operators are fundamental to distributed training in FlexFlow — they handle data redistribution between devices (repartition), replication (replicate), gathering (combine), and partial sum reduction (reduction). Previously only the graph pass infrastructure existed; this PR adds the Realm execution layer.
Approach
Each parallel op is executed as a Realm copy operation rather than a regular op task, since they involve data movement between devices rather than computation.
Data Movement
Key Design Decisions
Offset index space allocation: Each shard instance is allocated with a non-zero origin rect reflecting its position in the full tensor (e.g. shard 1 of a [10,16] tensor split along dim 0 is allocated at [5..9, 0..15]). This allows plain Realm copies between shards and combined tensors to work correctly.
CopyDomain: A new
CopyDomainenum (SRC/DST) selects which instance's index space is used as the copy domain.SRCis used when the source is the smaller piece;DSTwhen the destination is the smaller piece.get_per_device_shape: A new function that correctly computes the per-device tensor size by dividing each dimension by its shard degree, used for instance allocation.
ManyToOne collision fix: Combine FWD and Reduction FWD produce multiple invocations with the same output
DynamicValueAttrs. Only the first is registered in the producer map to avoid collision while still marking the value as having a producer.Changes
New files
lib/task-spec/include/task-spec/dynamic_graph/parallel_op_utils.h— sharedis_parallel_op_attrshelperlib/realm-execution/test/src/realm-execution/test_op_combine.cc— combine op e2e testlib/realm-execution/test/src/realm-execution/test_op_reduce.cc— reduction op e2e testlib/realm-execution/test/src/realm-execution/test_op_repartition.cc— repartition op e2e testModified files
shard_expansion.cc— addperform_shard_expansion_one_to_manyandperform_shard_expansion_many_to_onegeneric functions covering all eight FWD/BWD combinationspass_expansion.cc— addperform_pass_expansion_for_parallel_opcopy_insertion.cc— guard parallel ops in copy insertionmake_dynamic_open_dataflow_graph_from_mapped_pcg.cc— dispatch parallel ops tobuild_parallel_op_invocationdynamic_open_dataflow_graph.cc— fix ManyToOne collision for combine/reduction FWDpcg_instance.cc— add Realm copy dispatch for all parallel opsrealm_context.cc/h— addCopyDomain,create_instance_with_offset, updateissue_copyto useget_indexspace()instance_allocation.cc— use offset index spaces for sharded tensor allocationparallel_tensor_dims.cc/h— addget_per_device_dimsparallel_tensor_shape.cc/h— addget_per_device_shapetask_id_t.cc— returnnulloptfor all parallel ops inget_init_task_id_for_op_attrsTesting
Added e2e tests for each parallel op running on 2 CPU devices:
test_op_combine.cc— repartition → combine → relutest_op_reduce.cc— repartition → linear → reduction → relutest_op_repartition.cc— repartition → reluThis change is