Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type Server struct {
Expand Down Expand Up @@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server {
s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
opts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping
PermitWithoutStream: true, // Allow pings even if there are no active streams
}),
}
s.server = grpc.NewServer(opts...)
jobpb.RegisterJobServiceServer(s.server, s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;

/** Facade for a {@link List<T>} that keeps track of weight, for cache limit reasons. */
public class WeightedList<T> implements Weighted {
Expand Down Expand Up @@ -72,6 +71,14 @@ public void addAll(List<T> values, long weight) {
}

public void accumulateWeight(long weight) {
this.weight.accumulateAndGet(weight, LongMath::saturatedAdd);
this.weight.accumulateAndGet(
weight,
(first, second) -> {
try {
return Math.addExact(first, second);
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;

/**
* Adapters which convert a logical series of chunks using continuation tokens over the Beam Fn
Expand Down Expand Up @@ -251,11 +249,15 @@ static class BlocksPrefix<T> extends Blocks<T> implements Shrinkable<BlocksPrefi

@Override
public long getWeight() {
long sum = 8 + blocks.size() * 8L;
for (Block<T> block : blocks) {
sum = LongMath.saturatedAdd(sum, block.getWeight());
try {
long sum = 8 + blocks.size() * 8L;
for (Block<T> block : blocks) {
sum = Math.addExact(sum, block.getWeight());
}
return sum;
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
return sum;
}

BlocksPrefix(List<Block<T>> blocks) {
Expand All @@ -280,7 +282,8 @@ public List<Block<T>> getBlocks() {

@AutoValue
abstract static class Block<T> implements Weighted {
private static final Block<Void> EMPTY = fromValues(ImmutableList.of(), 0, null);
private static final Block<Void> EMPTY =
fromValues(WeightedList.of(Collections.emptyList(), 0), null);

@SuppressWarnings("unchecked") // Based upon as Collections.emptyList()
public static <T> Block<T> emptyBlock() {
Expand All @@ -296,37 +299,21 @@ public static <T> Block<T> mutatedBlock(WeightedList<T> values) {
}

public static <T> Block<T> fromValues(List<T> values, @Nullable ByteString nextToken) {
if (values.isEmpty() && nextToken == null) {
return emptyBlock();
}
ImmutableList<T> immutableValues = ImmutableList.copyOf(values);
long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE;
for (T value : immutableValues) {
listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value));
}
return fromValues(immutableValues, listWeight, nextToken);
return fromValues(WeightedList.of(values, Caches.weigh(values)), nextToken);
}

public static <T> Block<T> fromValues(
WeightedList<T> values, @Nullable ByteString nextToken) {
if (values.isEmpty() && nextToken == null) {
return emptyBlock();
}
return fromValues(ImmutableList.copyOf(values.getBacking()), values.getWeight(), nextToken);
}

private static <T> Block<T> fromValues(
ImmutableList<T> values, long listWeight, @Nullable ByteString nextToken) {
long weight = LongMath.saturatedAdd(listWeight, 24);
long weight = values.getWeight() + 24;
if (nextToken != null) {
if (nextToken.isEmpty()) {
nextToken = ByteString.EMPTY;
} else {
weight = LongMath.saturatedAdd(weight, Caches.weigh(nextToken));
weight += Caches.weigh(nextToken);
}
}
return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<>(
values, nextToken, weight);
values.getBacking(), nextToken, weight);
}

abstract List<T> getValues();
Expand Down Expand Up @@ -385,12 +372,10 @@ public void remove(Set<Object> toRemoveStructuralValues) {
totalSize += tBlock.getValues().size();
}

ImmutableList.Builder<T> allValues = ImmutableList.builderWithExpectedSize(totalSize);
long weight = 0;
List<T> blockValuesToKeep = new ArrayList<>();
WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L);
for (Block<T> block : blocks) {
blockValuesToKeep.clear();
boolean valueRemovedFromBlock = false;
List<T> blockValuesToKeep = new ArrayList<>();
for (T value : block.getValues()) {
if (!toRemoveStructuralValues.contains(valueCoder.structuralValue(value))) {
blockValuesToKeep.add(value);
Expand All @@ -402,19 +387,13 @@ public void remove(Set<Object> toRemoveStructuralValues) {
// If any value was removed from this block, need to estimate the weight again.
// Otherwise, just reuse the block's weight.
if (valueRemovedFromBlock) {
allValues.addAll(blockValuesToKeep);
for (T value : blockValuesToKeep) {
weight = LongMath.saturatedAdd(weight, Caches.weigh(value));
}
allValues.addAll(blockValuesToKeep, Caches.weigh(block.getValues()));
} else {
allValues.addAll(block.getValues());
weight = LongMath.saturatedAdd(weight, block.getWeight());
allValues.addAll(block.getValues(), block.getWeight());
}
}

cache.put(
IterableCacheKey.INSTANCE,
new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null)));
cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues)));
}

/**
Expand Down Expand Up @@ -505,24 +484,21 @@ private void appendHelper(List<T> newValues, long newWeight) {
for (Block<T> block : blocks) {
totalSize += block.getValues().size();
}
ImmutableList.Builder<T> allValues = ImmutableList.builderWithExpectedSize(totalSize);
long weight = 0;
WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L);
for (Block<T> block : blocks) {
allValues.addAll(block.getValues());
weight = LongMath.saturatedAdd(weight, block.getWeight());
allValues.addAll(block.getValues(), block.getWeight());
}
if (newWeight < 0) {
newWeight = 0;
for (T value : newValues) {
newWeight = LongMath.saturatedAdd(newWeight, Caches.weigh(value));
if (newValues.size() == 1) {
// Optimize weighing of the common value state as single single-element bag state.
newWeight = Caches.weigh(newValues.get(0));
} else {
newWeight = Caches.weigh(newValues);
}
}
allValues.addAll(newValues);
weight = LongMath.saturatedAdd(weight, newWeight);
allValues.addAll(newValues, newWeight);

cache.put(
IterableCacheKey.INSTANCE,
new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null)));
cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues)));
}

class CachingStateIterator implements PrefetchableIterator<T> {
Expand Down Expand Up @@ -604,7 +580,8 @@ public boolean hasNext() {
return false;
}
// Release the block while we are loading the next one.
currentBlock = Block.emptyBlock();
currentBlock =
Block.fromValues(WeightedList.of(Collections.emptyList(), 0L), ByteString.EMPTY);

@Nullable Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
boolean isFirstBlock = ByteString.EMPTY.equals(nextToken);
Expand Down
32 changes: 17 additions & 15 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult':
self.options.get_all_options(
retain_unknown_options=True, display_warnings=True)

# logging.exception("all_options:" + str(opts))
logging.error('runner class:' + str(self.runner))
for error_handler in self._error_handlers:
error_handler.verify_closed()

Expand Down Expand Up @@ -867,7 +869,7 @@ def _infer_result_type(
inputs: Sequence[Union[pvalue.PBegin, pvalue.PCollection]],
result_pcollection: Union[pvalue.PValue, pvalue.DoOutputsTuple]) -> None:
"""Infer and set the output element type for a PCollection.

This function determines the output types of transforms by combining:
1. Concrete input types from previous transforms
2. Type hints declared on the current transform
Expand All @@ -878,43 +880,43 @@ def _infer_result_type(
Type variables (K, V, T, etc.) act as placeholders that get bound to
concrete types through pattern matching. This requires both an input
pattern and an output template:

Input Pattern (from .with_input_types()):
Defines where in the input to find each type variable
Example: Tuple[K, V] means "K is the first element, V is the second"

Output Template (from .with_output_types()):
Defines how to use the bound variables in the output
Example: Tuple[V, K] means "swap the positions"

CONCRETE TYPES VS TYPE VARIABLES
---------------------------------
The system handles these differently:

Concrete Types (e.g., str, int, Tuple[str, int]):
- Used as-is without any binding
- Do not fall back to Any
- Example: .with_output_types(Tuple[str, int]) → Tuple[str, int]

Type Variables (e.g., K, V, T):
- Must be bound through pattern matching
- Require .with_input_types() to provide the pattern
- Fall back to Any if not bound
- Example without pattern: Tuple[K, V] → Tuple[Any, Any]
- Example with pattern: Tuple[K, V] → Tuple[str, int]

BINDING ALGORITHM
-----------------
1. Match: Compare input pattern to concrete input
Pattern: Tuple[K, V]
Concrete: Tuple[str, int]
Result: {K: str, V: int} ← Bindings created

2. Substitute: Apply bindings to output template
Template: Tuple[V, K] ← Note: swapped!
Bindings: {K: str, V: int}
Result: Tuple[int, str] ← Swapped concrete types

Each transform operates in its own type inference scope. Type variables
declared in a parent composite transform do NOT automatically propagate
to child transforms.
Expand All @@ -925,36 +927,36 @@ class MyComposite(PTransform):
def expand(self, pcoll):
# Child scope - parent's K, V are NOT available
return pcoll | ChildTransform()

Type variables that remain unbound after inference fall back to Any:

EXAMPLES
--------
Example 1: Concrete types (no variables)
Input: Tuple[str, int]
Transform: .with_output_types(Tuple[str, int])
Output: Tuple[str, int] ← Used as-is

Example 2: Type variables with pattern (correct)
Input: Tuple[str, int]
Transform: .with_input_types(Tuple[K, V])
.with_output_types(Tuple[V, K])
Binding: {K: str, V: int}
Output: Tuple[int, str] ← Swapped!

Example 3: Type variables without pattern (falls back to Any)
Input: Tuple[str, int]
Transform: .with_output_types(Tuple[K, V]) ← No input pattern!
Binding: None (can't match)
Output: Tuple[Any, Any] ← Fallback

Example 4: Mixed concrete and variables
Input: Tuple[str, int]
Transform: .with_input_types(Tuple[str, V])
.with_output_types(Tuple[str, V])
Binding: {V: int} ← Only V needs binding
Output: Tuple[str, int] ← str passed through, V bound to int

Args:
transform: The PTransform being applied
inputs: Input PCollections (provides concrete types)
Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,18 @@ def visit_transform(self, applied_ptransform):
# Check whether all transforms used in the pipeline are supported by the
# PrismRunner
if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
_LOGGER.info('Running pipeline with PrismRunner.')
_LOGGER.error('Running pipeline with PrismRunner.')
from apache_beam.runners.portability import prism_runner
runner = prism_runner.PrismRunner()

try:
pr = runner.run_pipeline(pipeline, options)
# This is non-blocking, so if the state is *already* finished, something
# probably failed on job submission.
_LOGGER.error('PrismRunner state:' + str(pr.state))
if (PipelineState.is_terminal(pr.state) and
pr.state != PipelineState.DONE):
_LOGGER.info(
_LOGGER.error(
'Pipeline failed on PrismRunner, falling back to DirectRunner.')
runner = BundleBasedDirectRunner()
else:
Expand All @@ -225,8 +226,8 @@ def visit_transform(self, applied_ptransform):
# If prism fails in Preparing the portable job, then the PortableRunner
# code raises an exception. Catch it, log it, and use the Direct runner
# instead.
_LOGGER.info('Exception with PrismRunner:\n %s\n' % (e))
_LOGGER.info('Falling back to DirectRunner')
_LOGGER.error('Exception with PrismRunner:\n %s\n' % (e))
_LOGGER.error('Falling back to DirectRunner')
runner = BundleBasedDirectRunner()

# Check whether all transforms used in the pipeline are supported by the
Expand All @@ -240,6 +241,7 @@ def visit_transform(self, applied_ptransform):
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
_LOGGER.error("Use FnApiRunner")
runner = fn_runner.FnApiRunner(provision_info=provision_info)

return runner.run_pipeline(pipeline, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def __init__(
("grpc.http2.min_ping_interval_without_data_ms", 20_000),
]

_LOGGER.error("Starting a few grpc server with options:" + str(options))
self.state = state
self.provision_info = provision_info
self.control_server = grpc.server(
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ def __init__(self, data_buffer_time_limit_ms=0):
self._closed = False
self._exception = None # type: Optional[Exception]

_LOGGER.error("start grpc data channel")

def close(self):
# type: () -> None
self._to_send.put(self._WRITES_FINISHED)
Expand Down
1 change: 1 addition & 0 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def find_by_ext(root_dir, ext):


# We must generate protos after setup_requires are installed.
# Add some comments here.
def generate_protos_first():
try:
# Pyproject toml build happens in isolated environemnts. In those envs,
Expand Down
Loading