-
Notifications
You must be signed in to change notification settings - Fork 567
Description
Environment details
OS: linux
Ruby version: 3.3.5
Gem name and version: google-cloud-pubsub v2.19.0
Steps to reproduce
- just start the ConsumeEvents.new.consume method bellow using ruby 3.3.5
- After a few minutes the error will happen and the process will stop
- The error is: 1:CANCELLED. debug_error_string:{UNKNOWN:Error received from peer {grpc_message:"CANCELLED", grpc_status:1, created_time:"2024-10-15T13:48:37.628426185+00:00"}} (GRPC::Cancelled)
obs. This error occurs in a interval of 5 to 10 minutes and the POD in our K8s got restarted.
Code example
class ConsumeEvents
def initialize(config, overrides = {})
@config = config
@subscriber_service = overrides.fetch(:subscriber_service) { ::Events::PubSub::Subscriber.new }
@subscribers = {}
@logger = overrides.fetch(:logger) { GRPC.logger }
end
def consume
# Gracefully shut down the subscriber on program exit, blocking until
# all received messages have been processed or n seconds have passed
at_exit { stop_subscribers }
loop do
subscribe_to_configured_topics
# Block, letting processing threads continue in the background
sleep(15.seconds)
end
end
def subscribe_to_configured_topics
@config.each do |consumer_config|
next if @subscribers.key?(consumer_config.subscription)
subscriber = nil
begin
subscriber = @subscriber_service
.with_topic_id(consumer_config.topic)
.with_subscription_name(consumer_config.subscription) ## Error happen here
## This methos run this code herer
# @pubsub = overrides.fetch(:pubsub) { Google::Cloud::Pubsub.new(project_id: ENV.fetch("PUBSUB_PROJECT_ID"), emulator_host: ENV.fetch("PUBSUB_EMULATOR_HOST", nil)) }
# @pubsub.subscription(subscriber_name) || topic.subscribe(subscriber_name, **opts)
.listen do |received_message|
process_message(received_message, consumer_config)
end
rescue Events::PubSub::Subscriber::TopicNotFound => e
set_error_on_span("Could not subscribe to topic: `#{consumer_config.key}`. Reason: #{e.message}")
next
rescue => e
message = "General error #{e.class}: `#{consumer_config.key}` - `#{consumer_config.topic}` - `#{consumer_config.subscription}`. Reason: #{e.message}"
set_error_on_span(message)
@logger.error(message)
raise StandardError.new(message)
end
subscriber.on_error { |exception| handle_error(exception, consumer_config) }
subscriber.start
@logger.info("Subscribed to #{consumer_config.topic}")
@subscribers[consumer_config.subscription] = subscriber
end
end
...
endFull backtrace
usr/local/bundle/gems/grpc-1.66.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in check_status': 1:CANCELLED. debug_error_string:{UNKNOWN:Error received from peer {grpc_message:"CANCELLED", grpc_status:1, created_time:"2024-10-15T13:48:37.628426185+00:00"}} (GRPC::Cancelled) from /app/app/infra/events/pub_sub/subscription.rb:18:in subscribe_to'
│ from /app/app/infra/events/pub_sub/subscriber.rb:22:in with_subscription_name' │ from /app/app/infra/events/pub_sub/consumer/consume_events.rb:32:in block in subscribe_to_configured_topics'
│ from /app/app/infra/events/pub_sub/consumer/config.rb:53:in each' │ from /app/app/infra/events/pub_sub/consumer/config.rb:53:in each'
│ from /app/app/infra/events/pub_sub/consumer/consume_events.rb:25:in `subscribe_to_configured_topics'