Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
381 changes: 51 additions & 330 deletions bin/proto/MLDataFormats_pb2.py

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ RUN apk add --no-cache \
bash \
python3 \
py3-pip \
py3-grpcio \
py3-yaml \
gcompat \
libgcc \
Expand All @@ -105,9 +104,15 @@ RUN apk add --no-cache \
RUN apk upgrade --no-cache

# Python dependencies
# The pinned grpcio and protobuf versions should be compatible with the generated Protobuf and gRPC stubs used
# in Pulsar Functions Python runtime. You should also update the grpcio version in src/update_python_protobuf_stubs.sh
# and regenerate the Python stubs if you change the grpcio version here. Please see
# pulsar-functions/instance/src/main/python/README.md for more details.
ARG PULSAR_CLIENT_PYTHON_VERSION
RUN pip3 install --break-system-packages --no-cache-dir \
--only-binary grpcio \
--only-binary \
grpcio==1.73.1 \
protobuf==6.31.1 \
pulsar-client[all]==${PULSAR_CLIENT_PYTHON_VERSION} \
kazoo

Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ flexible messaging model and an intuitive client API.</description>
<pulsar.broker.compiler.release>${maven.compiler.target}</pulsar.broker.compiler.release>
<pulsar.client.compiler.release>17</pulsar.client.compiler.release>

<pulsar.client.python.version>3.7.0</pulsar.client.python.version>
<pulsar.client.python.version>3.8.0</pulsar.client.python.version>

<IMAGE_JDK_MAJOR_VERSION>21</IMAGE_JDK_MAJOR_VERSION>

Expand Down Expand Up @@ -1996,6 +1996,8 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/.mvn/**</exclude>
<exclude>**/generated/**</exclude>
<exclude>**/zk-3.5-test-data/*</exclude>
<exclude>**/*_pb2.py</exclude>
<exclude>**/*_pb2_grpc.py</exclude>
</excludes>
</licenseSet>
</licenseSets>
Expand Down Expand Up @@ -2149,6 +2151,8 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/*.dylib</exclude>
<exclude>**/*.patch</exclude>
<exclude>src/test/resources/*.txt</exclude>
<exclude>**/*_pb2.py</exclude>
<exclude>**/*_pb2_grpc.py</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
1,861 changes: 95 additions & 1,766 deletions pulsar-functions/instance/src/main/python/Function_pb2.py

Large diffs are not rendered by default.

690 changes: 34 additions & 656 deletions pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

import InstanceCommunication_pb2 as InstanceCommunication__pb2
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2

GRPC_GENERATED_VERSION = '1.73.1'
GRPC_VERSION = grpc.__version__


class InstanceControlStub(object):
"""Missing associated documentation comment in .proto file."""
Expand All @@ -38,27 +23,27 @@ def __init__(self, channel):
'/proto.InstanceControl/GetFunctionStatus',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.FunctionStatus.FromString,
)
_registered_method=True)
self.GetAndResetMetrics = channel.unary_unary(
'/proto.InstanceControl/GetAndResetMetrics',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
)
_registered_method=True)
self.ResetMetrics = channel.unary_unary(
'/proto.InstanceControl/ResetMetrics',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
)
_registered_method=True)
self.GetMetrics = channel.unary_unary(
'/proto.InstanceControl/GetMetrics',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
)
_registered_method=True)
self.HealthCheck = channel.unary_unary(
'/proto.InstanceControl/HealthCheck',
request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
)
_registered_method=True)


class InstanceControlServicer(object):
Expand Down Expand Up @@ -126,6 +111,7 @@ def add_InstanceControlServicer_to_server(servicer, server):
generic_handler = grpc.method_handlers_generic_handler(
'proto.InstanceControl', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('proto.InstanceControl', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
Expand All @@ -143,11 +129,21 @@ def GetFunctionStatus(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetFunctionStatus',
return grpc.experimental.unary_unary(
request,
target,
'/proto.InstanceControl/GetFunctionStatus',
google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
InstanceCommunication__pb2.FunctionStatus.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def GetAndResetMetrics(request,
Expand All @@ -160,11 +156,21 @@ def GetAndResetMetrics(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetAndResetMetrics',
return grpc.experimental.unary_unary(
request,
target,
'/proto.InstanceControl/GetAndResetMetrics',
google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
InstanceCommunication__pb2.MetricsData.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def ResetMetrics(request,
Expand All @@ -177,11 +183,21 @@ def ResetMetrics(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/ResetMetrics',
return grpc.experimental.unary_unary(
request,
target,
'/proto.InstanceControl/ResetMetrics',
google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
google_dot_protobuf_dot_empty__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def GetMetrics(request,
Expand All @@ -194,11 +210,21 @@ def GetMetrics(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/GetMetrics',
return grpc.experimental.unary_unary(
request,
target,
'/proto.InstanceControl/GetMetrics',
google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
InstanceCommunication__pb2.MetricsData.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def HealthCheck(request,
Expand All @@ -211,8 +237,18 @@ def HealthCheck(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/proto.InstanceControl/HealthCheck',
return grpc.experimental.unary_unary(
request,
target,
'/proto.InstanceControl/HealthCheck',
google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
InstanceCommunication__pb2.HealthCheckResult.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
40 changes: 40 additions & 0 deletions pulsar-functions/instance/src/main/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Pulsar Functions Python Runtime

### Updating Protobuf and gRPC generated stubs

When using generated Protobuf and gRPC stubs (`*_pb2.py`, `*_pb2_gprc.py`), the generated code should be
updated when the grpcio and protobuf Python packages are updated. This is due to the fact that generated
Protobuf and gRPC stubs are not necessarily compatible across different versions of these packages at runtime.
The compatibility policy of Protobuf is documented in
[Protobuf's "Cross-Version Runtime Guarantee"](https://protobuf.dev/support/cross-version-runtime-guarantee/),
which states that cross-version runtime support isn't guaranteed. gRPC follows a similar policy.

In Pulsar's [Docker image](../../../../../docker/pulsar/Dockerfile), the `grpcio` and `protobuf` packages are
pinned to specific versions. Whenever these versions are updated, the `PYTHON_GRPCIO_VERSION`
in [src/update_python_protobuf_stubs.sh](../../../../../src/update_python_protobuf_stubs.sh) should also be updated
and the generated stubs should be regenerated with this script to ensure compatibility.

To update the generated stubs, run the following command in the project root directory:

```bash
# run this command from the project root directory
src/update_python_protobuf_stubs.sh
```

Alternatively, you can run this command to install the required tools in a docker container and update the stubs:

```bash
# run this command from the project root directory
src/update_python_protobuf_stubs_with_docker.sh
```

When the script is run, it will also print such information to the console:

```
libprotoc library included in grpcio-tools will be used:
libprotoc 31.0
The compatible matching protobuf package version in Python is prefixed with '6.'
Ensure that you are using a compatible version of the protobuf package such as 6.31.0 (or a matching patch version).
```

When pinning the `protobuf` package in your Python project follow this guidance to ensure compatibility of the generated stubs with the `protobuf` package version.
84 changes: 84 additions & 0 deletions src/update_python_protobuf_stubs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# This script generates Python gRPC and Protobuf stubs from the .proto files
# Set the version of PYTHON_GRPCIO_VERSION to the version which matches the version of grpcio in the Dockerfile
PYTHON_GRPCIO_VERSION=${PYTHON_GRPCIO_VERSION:-"1.73.1"}

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"

# Create a temporary virtual environment to avoid polluting the global Python environment
tempvenv=$(mktemp -d /tmp/pulsar-venv.XXXXXX)
python3 -m venv $tempvenv
source $tempvenv/bin/activate

# install the required packages for protobuf and grpc
echo "Installing grpc-tools $PYTHON_GRPCIO_VERSION..."
python3 -m pip install grpcio-tools==$PYTHON_GRPCIO_VERSION

echo "libprotoc library included in grpcio-tools will be used:"
python3 -m grpc_tools.protoc --version
echo "The compatible matching protobuf package version in Python is prefixed with '6.'"
echo "Ensure that you are using a compatible version of the protobuf package such as 6.$(python3 -m grpc_tools.protoc --version | awk '{print $2}') (or a matching patch version)."

cd $SCRIPT_DIR/..
echo "Generating Python gRPC and Protobuf stubs from the .proto files..."

# Generate Python gRPC and Protobuf stubs from the .proto files

# Generate stubs for Function.proto and InstanceCommunication.proto, used for Pulsar Functions Python runtime
python3 -m grpc_tools.protoc \
--proto_path=pulsar-functions/proto/src/main/proto \
--python_out=pulsar-functions/instance/src/main/python \
pulsar-functions/proto/src/main/proto/Function.proto

# Remove the strict version checking in the generated file
sed -i '/^_runtime_version\.ValidateProtobufRuntimeVersion($/,/^)$/d' \
pulsar-functions/instance/src/main/python/Function_pb2.py

python3 -m grpc_tools.protoc \
--proto_path=pulsar-functions/proto/src/main/proto \
--python_out=pulsar-functions/instance/src/main/python \
--grpc_python_out=pulsar-functions/instance/src/main/python \
pulsar-functions/proto/src/main/proto/InstanceCommunication.proto

# Remove the strict version checking in the generated file
sed -i '/^_runtime_version\.ValidateProtobufRuntimeVersion($/,/^)$/d' \
pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
sed -i '/^_version_not_supported = False$/,/^ )$/d' \
pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py

# Generate stubs for MLDataFormats.proto, used for managed-ledger python scripts
python3 -m grpc_tools.protoc \
--proto_path=managed-ledger/src/main/proto \
--python_out=bin/proto \
managed-ledger/src/main/proto/MLDataFormats.proto

# Remove the strict version checking in the generated file
sed -i '/^_runtime_version\.ValidateProtobufRuntimeVersion($/,/^)$/d' \
bin/proto/MLDataFormats_pb2.py

echo "Python gRPC and Protobuf stubs generated successfully."

# Deactivate and remove the temporary virtual environment
deactivate
rm -rf $tempvenv
Loading
Loading