Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions google-cloud-pubsub-v1/.owlbot.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

OwlBot.modifier path: "lib/google/cloud/pubsub/v1/schema_service/client.rb" do |content|
content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator"
end
OwlBot.modifier path: "lib/google/cloud/pubsub/v1/subscription_admin/client.rb" do |content|
content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator"
end
OwlBot.modifier path: "lib/google/cloud/pubsub/v1/topic_admin/client.rb" do |content|
content.gsub! "::Logger", "::Logger, ::Google::Logging::GoogleSdkLoggerDelegator"
end

OwlBot.move_files
1 change: 1 addition & 0 deletions google-cloud-pubsub/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ gem "retriable", "~> 3.1.2"
gem "simplecov", "~> 0.22"
gem "yard", "~> 0.9.37"
gem "yard-doctest", "~> 0.1.17"
gem "google-logging-utils", "~> 0.3.0"
51 changes: 51 additions & 0 deletions google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,57 @@ module GRPC
end
```

### Enabling library level logging

This library includes an opt-in logging mechanism that provides detailed information about high-level operations. These logs are useful for troubleshooting and monitoring the client's behavior. When enabled, logs are tagged with subtags to indicate the operation type.

The following subtags are used:

* `callback-delivery`: Logs when a message is delivered to the user-provided callback.
* `callback-exceptions`: Logs any exceptions raised from the user callback.
* `ack-nack`: Logs when a message is acknowledged (`ack`) or negatively acknowledged (`nack`).
* `ack-batch`: Logs the reason and size of acknowledgement batches sent to the server.
* `publish-batch`: Logs the reason and size of message batches sent to the server for publishing.
* `expiry`: Logs when a message's lease expires and it is dropped from client-side lease management.
* `subscriber-streams`: Logs key events in the subscriber's streaming connection, such as opening, closing, and errors.
* `subscriber-flow-control`: Logs when the subscriber's client-side flow control is paused or resumed.

**WARNING:** These logs may contain message data in plaintext, which could include sensitive information. Ensure you are practicing good data hygiene with your application logs. It is recommended to enable this logging only for debugging purposes and not permanently in production.

To enable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to `all`.
To enable logging for just the google-cloud-pubsub gem, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` environment variable to a comma separated string that contains `pubsub`
To disable logging across all of Google Cloud Ruby SDK Gems, set the `GOOGLE_SDK_RUBY_LOGGING_GEMS` to `none`

```sh
export GOOGLE_SDK_RUBY_LOGGING_GEMS=pubsub
```

You can programmatically configure a custom logger. The logger can be set globally for the Pub/Sub library, or provided on a per-client basis.

To set a logger globally, configure it on the `Google::Cloud` configuration object:

```ruby
require "google/cloud/pubsub"
require "logger"

# Configure a global logger for the pubsub library
Google::Cloud.configure.pubsub.logger = Logger.new "my-app.log"
```

Alternatively, you can provide a logger directly to the `PubSub` client initializer. If a logger instance is provided, it will override any globally configured logger.

```ruby
require "google/cloud/pubsub"
require "logger"

# Provide a logger directly to the client
custom_logger = Logger.new "pubsub-client.log"
pubsub = Google::Cloud::PubSub.new logger: custom_logger
```

If no custom logger is configured, a default logger that writes to standard output will be used.


## Supported Ruby Versions

This library is supported on Ruby 3.1+.
Expand Down
4 changes: 4 additions & 0 deletions google-cloud-pubsub/lib/google-cloud-pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require "google/cloud" unless defined? Google::Cloud.new
require "google/cloud/config"
require "googleauth"
require "logger"

module Google
module Cloud
Expand Down Expand Up @@ -142,6 +143,8 @@ def self.pubsub project_id = nil,
"https://www.googleapis.com/auth/pubsub"
]

default_logger = Logger.new $stdout

config.add_field! :project_id, default_project, match: String, allow_nil: true
config.add_alias! :project, :project_id
config.add_field! :credentials, default_creds, match: [String, Hash, Google::Auth::Credentials], allow_nil: true
Expand All @@ -153,4 +156,5 @@ def self.pubsub project_id = nil,
config.add_field! :on_error, nil, match: Proc
config.add_field! :endpoint, nil, match: String
config.add_field! :universe_domain, nil, match: String
config.add_field! :logger, default_logger, match: Logger, allow_nil: true
end
16 changes: 13 additions & 3 deletions google-cloud-pubsub/lib/google/cloud/pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


require "google-cloud-pubsub"
require "google/cloud/pubsub/logging"
require "google/cloud/pubsub/project"
require "google/cloud/config"
require "google/cloud/env"
Expand Down Expand Up @@ -70,11 +71,16 @@ module PubSub
#
# * `https://www.googleapis.com/auth/pubsub`
# @param [Numeric] timeout Default timeout to use in requests. Optional.
# @param [String] universe_domain A custom universe domain. Optional.
# @param [String] endpoint Override of the endpoint host name. Optional.
# If the param is nil, uses the default endpoint.
# @param [String] emulator_host Pub/Sub emulator host. Optional.
# If the param is nil, uses the value of the `emulator_host` config.
# @param universe_domain [String] A custom universe domain. Optional.
# @param [Logger] logger Optional Logger instance for emitting
# library-level debug logs. If not provided, it will default to
# configure.logger, which defaults to Logger.new STDOUT if not set. To
# enable logging, set environment variable GOOGLE_SDK_RUBY_LOGGING_GEMS
# to "all" or a comma separated list of gem names, including "pubsub".
#
# @return [Google::Cloud::PubSub::Project]
#
Expand All @@ -92,13 +98,15 @@ def self.new project_id: nil,
timeout: nil,
universe_domain: nil,
endpoint: nil,
emulator_host: nil
emulator_host: nil,
logger: nil
project_id ||= default_project_id
scope ||= configure.scope
timeout ||= configure.timeout
endpoint ||= configure.endpoint
universe_domain ||= configure.universe_domain
emulator_host ||= configure.emulator_host
logger ||= configure.logger

if emulator_host
credentials = :this_channel_is_insecure
Expand All @@ -114,10 +122,12 @@ def self.new project_id: nil,
project_id = project_id.to_s # Always cast to a string
raise ArgumentError, "project_id is missing" if project_id.empty?

logging = Google::Cloud::PubSub::Logging.create logger
service = PubSub::Service.new project_id, credentials,
host: endpoint,
timeout: timeout,
universe_domain: universe_domain
universe_domain: universe_domain,
logging: logging
PubSub::Project.new service
end

Expand Down
21 changes: 11 additions & 10 deletions google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &cal
end
batch_action = batch.add msg, callback
if batch_action == :full
publish_batches!
publish_batches! reason: "batch full"
elsif @published_at.nil?
# Set initial time to now to start the background counter
@published_at = Time.now
Expand All @@ -180,7 +180,7 @@ def stop
break if @stopped

@stopped = true
publish_batches! stop: true
publish_batches! stop: true, reason: "shutdown"
@cond.signal
@publish_thread_pool.shutdown
end
Expand Down Expand Up @@ -234,7 +234,7 @@ def stop! timeout = nil
# @return [AsyncPublisher] returns self so calls can be chained.
def flush
synchronize do
publish_batches!
publish_batches! reason: "manual flush"
@cond.signal
end

Expand Down Expand Up @@ -313,7 +313,7 @@ def run_background
time_since_first_publish = Time.now - @published_at
if time_since_first_publish > @interval
# interval met, flush the batches...
publish_batches!
publish_batches! reason: "interval timeout"
@cond.wait
else
# still waiting for the interval to publish the batch...
Expand Down Expand Up @@ -347,28 +347,28 @@ def stop_publish ordering_key, err
end
end

def publish_batches! stop: nil
def publish_batches! stop: nil, reason: "unknown"
@batches.reject! { |_ordering_key, batch| batch.empty? }
@batches.each_value do |batch|
ready = batch.publish! stop: stop
publish_batch_async @topic_name, batch if ready
publish_batch_async @topic_name, batch, reason: reason if ready
end
# Set published_at to nil to wait indefinitely
@published_at = nil
end

def publish_batch_async topic_name, batch
def publish_batch_async topic_name, batch, reason: "unknown"
# TODO: raise unless @publish_thread_pool.running?
return unless @publish_thread_pool.running?

Concurrent::Promises.future_on(
@publish_thread_pool, topic_name, batch
) { |t, b| publish_batch_sync t, b }
@publish_thread_pool, topic_name, batch, reason
) { |t, b, r| publish_batch_sync t, b, reason: r }
end

# rubocop:disable Metrics/AbcSize

def publish_batch_sync topic_name, batch
def publish_batch_sync topic_name, batch, reason: "unknown"
# The only batch methods that are safe to call from the loop are
# rebalance! and reset! because they are the only methods that are
# synchronized.
Expand All @@ -379,6 +379,7 @@ def publish_batch_sync topic_name, batch
grpc = @service.publish topic_name,
items.map(&:msg),
compress: compress && batch.total_message_bytes >= compression_bytes_threshold
service.logging.log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize)
items.zip Array(grpc.message_ids) do |item, id|
@flow_controller.release item.bytesize
next unless item.callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module PubSub
# end
#
class BatchPublisher

##
# @private The messages to publish
attr_reader :messages
Expand Down Expand Up @@ -117,10 +118,11 @@ def to_gcloud_messages message_ids

##
# @private Call the publish API with arrays of data and attrs.
def publish_batch_messages topic_name, service
def publish_batch_messages topic_name, service, reason: "unknown"
grpc = service.publish topic_name,
messages,
compress: compress && total_message_bytes >= compression_bytes_threshold
service.logging.log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes
to_gcloud_messages Array(grpc.message_ids)
end
end
Expand Down
79 changes: 79 additions & 0 deletions google-cloud-pubsub/lib/google/cloud/pubsub/logging.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "logger"
require "google/logging/google_sdk_logger_delegator"
require "google/cloud/config"

module Google
module Cloud
module PubSub
##
# @private
class Logging
LOG_NAME = "pubsub".freeze
VALID_LOG_LEVELS = [:debug, :info, :warn, :error, :fatal].freeze
private_constant :VALID_LOG_LEVELS, :LOG_NAME

##
# @private
def self.create logger
new logger
end

##
# @private
# rubocop:disable Naming/BlockForwarding
def log level, subtag, &message_block
return unless VALID_LOG_LEVELS.include?(level) && block_given?
@logging_delegator.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block)
end
# rubocop:enable Naming/BlockForwarding

##
# @private
def log_batch logger_name, reason, type, num_messages, total_bytes
log :info, logger_name do
"#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes"
end
end

##
# @private
def log_ack_nack ack_ids, type
ack_ids.each do |ack_id|
log :info, "ack-nack" do
"message (ackID #{ack_id}) #{type}"
end
end
end

##
# @private
def log_expiry expired
expired.each do |ack_id, item|
log :info, "expiry" do
"message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout"
end
end
end

private

def initialize logger
@logging_delegator = Google::Logging::GoogleSdkLoggerDelegator.new LOG_NAME, logger
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


require "google/cloud/pubsub/service"
require "google/cloud/pubsub/subscriber"
require "google/cloud/pubsub/message_listener/stream"
require "google/cloud/pubsub/message_listener/timed_unary_buffer"
require "monitor"
Expand Down
Loading
Loading