Skip to content

Ruby 3.3.5 - Consumer Pub Sub - Crashing with Cancelled error recurrently #27448

@rmzoni

Description

@rmzoni

Environment details

OS: linux
Ruby version: 3.3.5
Gem name and version: google-cloud-pubsub v2.19.0

Steps to reproduce

  • just start the ConsumeEvents.new.consume method bellow using ruby 3.3.5
  • After a few minutes the error will happen and the process will stop
  • The error is: 1:CANCELLED. debug_error_string:{UNKNOWN:Error received from peer {grpc_message:"CANCELLED", grpc_status:1, created_time:"2024-10-15T13:48:37.628426185+00:00"}} (GRPC::Cancelled)

obs. This error occurs in a interval of 5 to 10 minutes and the POD in our K8s got restarted.

Code example

class ConsumeEvents
  def initialize(config, overrides = {})
    @config = config
    @subscriber_service = overrides.fetch(:subscriber_service) { ::Events::PubSub::Subscriber.new }
    @subscribers = {}
    @logger = overrides.fetch(:logger) { GRPC.logger }
  end

  def consume
    # Gracefully shut down the subscriber on program exit, blocking until
    # all received messages have been processed or n seconds have passed
    at_exit { stop_subscribers }

    loop do
      subscribe_to_configured_topics
      # Block, letting processing threads continue in the background
      sleep(15.seconds)
    end
  end

  def subscribe_to_configured_topics
    @config.each do |consumer_config|
      next if @subscribers.key?(consumer_config.subscription)

      subscriber = nil
      begin
        subscriber = @subscriber_service
          .with_topic_id(consumer_config.topic)
          .with_subscription_name(consumer_config.subscription) ## Error happen here
          ## This methos run this code herer
          # @pubsub = overrides.fetch(:pubsub) { Google::Cloud::Pubsub.new(project_id: ENV.fetch("PUBSUB_PROJECT_ID"), emulator_host: ENV.fetch("PUBSUB_EMULATOR_HOST", nil)) }
          # @pubsub.subscription(subscriber_name) || topic.subscribe(subscriber_name, **opts)
          .listen do |received_message|
            process_message(received_message, consumer_config)
          end
      rescue Events::PubSub::Subscriber::TopicNotFound => e
        set_error_on_span("Could not subscribe to topic: `#{consumer_config.key}`. Reason: #{e.message}")
        next
      rescue => e
        message = "General error #{e.class}: `#{consumer_config.key}` - `#{consumer_config.topic}` - `#{consumer_config.subscription}`. Reason: #{e.message}"
        set_error_on_span(message)
        @logger.error(message)

        raise StandardError.new(message)
      end

      subscriber.on_error { |exception| handle_error(exception, consumer_config) }

      subscriber.start

      @logger.info("Subscribed to #{consumer_config.topic}")
      @subscribers[consumer_config.subscription] = subscriber
    end
  end

  ...

end

Full backtrace

usr/local/bundle/gems/grpc-1.66.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in check_status': 1:CANCELLED. debug_error_string:{UNKNOWN:Error received from peer {grpc_message:"CANCELLED", grpc_status:1, created_time:"2024-10-15T13:48:37.628426185+00:00"}} (GRPC::Cancelled) from /app/app/infra/events/pub_sub/subscription.rb:18:in subscribe_to'
│ from /app/app/infra/events/pub_sub/subscriber.rb:22:in with_subscription_name' │ from /app/app/infra/events/pub_sub/consumer/consume_events.rb:32:in block in subscribe_to_configured_topics'
│ from /app/app/infra/events/pub_sub/consumer/config.rb:53:in each' │ from /app/app/infra/events/pub_sub/consumer/config.rb:53:in each'
│ from /app/app/infra/events/pub_sub/consumer/consume_events.rb:25:in `subscribe_to_configured_topics'

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions