Skip to content

Add cooperative rebalancing consumer example#2248

Open
jmathai-confluent wants to merge 1 commit into
masterfrom
feat/1420-cooperative-rebalancing-example
Open

Add cooperative rebalancing consumer example#2248
jmathai-confluent wants to merge 1 commit into
masterfrom
feat/1420-cooperative-rebalancing-example

Conversation

@jmathai-confluent
Copy link
Copy Markdown
Member

Summary

Adds a new cooperative_consumer.py example demonstrating how to use the cooperative-sticky partition assignment strategy with incremental rebalance callbacks (on_assign, on_revoke, on_lost). This is a commonly requested example that shows how cooperative rebalancing differs from the default eager strategy.

Fixes #1420

Example overview

The example creates a consumer configured with partition.assignment.strategy: cooperative-sticky and subscribes to one or more topics. It demonstrates:

  • Incremental on_assign — logs partitions as they are incrementally assigned
  • Incremental on_revoke — commits offsets for only the partitions being transferred to another consumer (other partitions continue processing without interruption)
  • on_lost — handles partitions lost due to session/poll timeout (no commit attempted since the consumer is no longer in the group)
  • Explains why setting on_lost is important when committing in on_revoke

Usage: python cooperative_consumer.py <bootstrap-brokers> <group> <topic1> [<topic2> ..]

Verification script

A verification script (verify_example.sh) validates the example without requiring a running Kafka broker.

Expected output

The script should print PASS for all 5 checks: file exists, compiles, callbacks defined, cooperative-sticky configured, and usage message on bad args.

Actual output

Using Python: /usr/bin/python3 (Python 3.9.6)

--- Test 1: File exists ---
PASS: cooperative_consumer.py exists

--- Test 2: Module compiles ---
PASS: cooperative_consumer.py compiles successfully

--- Test 3: Callbacks are defined ---
PASS: on_assign, on_revoke, and on_lost are all defined and callable

--- Test 4: cooperative-sticky configured ---
PASS: cooperative-sticky partition assignment strategy configured

--- Test 5: Usage message on bad args ---
PASS: Usage message printed on missing args

=== All verification tests PASSED ===

Script

#!/usr/bin/env bash
set -euo pipefail

# Verification script for cooperative_consumer.py
# Checks that the example has valid syntax, callbacks are defined,
# and the cooperative-sticky configuration is correctly applied.
# Does NOT require a running Kafka broker.

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
EXAMPLE="${SCRIPT_DIR}/cooperative_consumer.py"

# Find a working python with confluent_kafka installed
PYTHON=""
for candidate in python3 python /usr/bin/python3; do
    if command -v "$candidate" &>/dev/null && "$candidate" -c "import confluent_kafka" &>/dev/null; then
        PYTHON="$candidate"
        break
    fi
done

if [ -z "$PYTHON" ]; then
    echo "FAIL: No Python with confluent_kafka installed found"
    exit 1
fi

echo "Using Python: $PYTHON ($($PYTHON --version 2>&1))"

# Test 1: File exists
echo ""
echo "--- Test 1: File exists ---"
if [[ ! -f "$EXAMPLE" ]]; then
    echo "FAIL: cooperative_consumer.py not found"
    exit 1
fi
echo "PASS: cooperative_consumer.py exists"

# Test 2: Module compiles
echo ""
echo "--- Test 2: Module compiles ---"
$PYTHON -m py_compile "$EXAMPLE"
echo "PASS: cooperative_consumer.py compiles successfully"

# Test 3: Callbacks are defined and callable
echo ""
echo "--- Test 3: Callbacks are defined ---"
$PYTHON -c "
import importlib.util, sys
spec = importlib.util.spec_from_file_location('cooperative_consumer', '$EXAMPLE')
mod = importlib.util.module_from_spec(spec)
mod.__name__ = 'cooperative_consumer'
spec.loader.exec_module(mod)
assert callable(mod.on_assign), 'on_assign not callable'
assert callable(mod.on_revoke), 'on_revoke not callable'
assert callable(mod.on_lost), 'on_lost not callable'
print('PASS: on_assign, on_revoke, and on_lost are all defined and callable')
"

# Test 4: cooperative-sticky strategy is in the source
echo ""
echo "--- Test 4: cooperative-sticky configured ---"
if grep -q "'partition.assignment.strategy': 'cooperative-sticky'" "$EXAMPLE"; then
    echo "PASS: cooperative-sticky partition assignment strategy configured"
else
    echo "FAIL: cooperative-sticky not found in example"
    exit 1
fi

# Test 5: Usage message on bad args
echo ""
echo "--- Test 5: Usage message on bad args ---"
OUTPUT=$($PYTHON "$EXAMPLE" 2>&1 || true)
if echo "$OUTPUT" | grep -q "Usage:"; then
    echo "PASS: Usage message printed on missing args"
else
    echo "FAIL: Expected usage message, got: $OUTPUT"
    exit 1
fi

echo ""
echo "=== All verification tests PASSED ==="

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent
Copy link
Copy Markdown

Quality Gate passed Quality Gate passed

Issues
0 New issues
0 Fixed issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cooperative rebalancing example

1 participant