Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ publish dummy json data like `{"message": "dummy", "value": 0}\n{"message": "dum
key <YOUR KEY>
flush_interval 10
autocreate_topic false
max_messages 1000
</match>
```

- `autocreate_topic` (optional, default: `false`)
- If set to `true`, specified topic will be created when it doesn't exist.
- `max_messages` (optional, default: `1000`)
- Publishing messages per request to Cloud Pub/Sub. ref: https://cloud.google.com/pubsub/quotas#other_limits

## Pull messages
Use `in_gcloud_pubsub`.
Expand Down
16 changes: 12 additions & 4 deletions lib/fluent/plugin/out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class GcloudPubSubOutput < BufferedOutput
config_param :topic, :string, :default => nil
config_param :key, :string, :default => nil
config_param :autocreate_topic, :bool, :default => false
config_param :max_messages, :integer, :default => 1000

unless method_defined?(:log)
define_method("log") { $log }
Expand Down Expand Up @@ -50,16 +51,23 @@ def write(chunk)
end

if messages.length > 0
@client.publish do |batch|
messages.each do |m|
batch.publish m
end
messages.each_slice(@max_messages).each do |msg|
publish msg
end
end
rescue => e
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
raise e
end

def publish(messages)
log.debug "send message topic:#{@client.name} length:#{messages.length.to_s}"
@client.publish do |batch|
messages.each do |m|
batch.publish m
end
end
end
end
end
25 changes: 25 additions & 0 deletions test/plugin/test_out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_configure
assert_equal('key-test', d.instance.key)
assert_equal(false, d.instance.autocreate_topic)
assert_equal(1, d.instance.flush_interval)
assert_equal(1000, d.instance.max_messages)
end

def test_autocreate_topic
Expand All @@ -59,13 +60,37 @@ def test_autocreate_topic
d.instance.start()
end

def test_max_messages
d = create_driver(DEFAULT_CONFIG)

client = mock!
client.name.times(2) { 'topic-test' }
client.publish.times(2)

pubsub_mock = mock!.topic(anything, anything) { client }
gcloud_mock = mock!.pubsub { pubsub_mock }
stub(Gcloud).new { gcloud_mock }

time = Time.parse("2016-07-09 11:12:13 UTC").to_i

# max_messages is default 1000
1001.times do |i|
d.emit({"a" => i}, time)
end

d.run
end

def test_re_raise_errors
d = create_driver(DEFAULT_CONFIG)
chunk = Fluent::MemoryBufferChunk.new('key', 'data')
client = Object.new
def client.publish
raise ReRaisedError
end
def client.name
'test-topic'
end
d.instance.instance_variable_set(:@client, client)

assert_raises ReRaisedError do
Expand Down