Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
language: ruby

rvm:
- 1.9.3
- 2.0
- 2.1
- 2.2.2
- 2.3.4
- 2.4.1

gemfile:
- Gemfile
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-gcloud-pubsub.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Gem::Specification.new do |gem|
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
gem.require_paths = ['lib']

gem.add_runtime_dependency "fluentd", "~> 0.12.0"
gem.add_runtime_dependency "fluentd", [">= 0.14.15", "< 2"]
gem.add_runtime_dependency "gcloud", "= 0.6.3"
gem.add_runtime_dependency "fluent-plugin-buffer-lightening", ">= 0.0.2"

Expand Down
34 changes: 12 additions & 22 deletions lib/fluent/plugin/in_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
@@ -1,40 +1,34 @@
require 'gcloud'
require 'fluent/input'
require 'fluent/parser'
require 'fluent/plugin/input'
require 'fluent/plugin/parser'

module Fluent
module Fluent::Plugin
class GcloudPubSubInput < Input
Fluent::Plugin.register_input('gcloud_pubsub', self)

helpers :parser, :thread

config_param :tag, :string
config_param :project, :string, :default => nil
config_param :topic, :string, :default => nil
config_param :subscription, :string, :default => nil
config_param :topic, :string
config_param :subscription, :string
config_param :key, :string, :default => nil
config_param :pull_interval, :integer, :default => 5
config_param :max_messages, :integer, :default => 100
config_param :return_immediately, :bool, :default => true

unless method_defined?(:log)
define_method("log") { $log }
end

unless method_defined?(:router)
define_method("router") { Fluent::Engine }
config_section :parse do
config_set_default :@type, 'json'
end

def configure(conf)
super

raise Fluent::ConfigError, "'topic' must be specified." unless @topic
raise Fluent::ConfigError, "'subscription' must be specified." unless @subscription

configure_parser(conf)
end

def configure_parser(conf)
@parser = Fluent::TextParser.new
@parser.configure(conf)
@parser = parser_create
end

def start
Expand All @@ -44,7 +38,7 @@ def start
topic = pubsub.topic @topic
@client = topic.subscription @subscription
@stop_subscribing = false
@subscribe_thread = Thread.new(&method(:subscribe))
@subscribe_thread = thread_create(:in_gcloud_pubsub_input, &method(:subscribe))
end

def shutdown
Expand All @@ -55,10 +49,6 @@ def shutdown
end

private
def configure_parser(conf)
@parser = Fluent::TextParser.new
@parser.configure(conf)
end

def subscribe
until @stop_subscribing
Expand Down Expand Up @@ -87,7 +77,7 @@ def subscribe
end

def parse_messages(messages)
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
messages.each do |m|
convert_line_to_event(m.message.data, es)
end
Expand Down
43 changes: 25 additions & 18 deletions lib/fluent/plugin/out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
require 'gcloud'
require 'fluent/output'
require 'fluent/plugin/output'

module Fluent
class GcloudPubSubOutput < BufferedOutput
module Fluent::Plugin
class GcloudPubSubOutput < Output
Fluent::Plugin.register_output('gcloud_pubsub', self)

config_set_default :buffer_type, 'lightening'
config_set_default :flush_interval, 1
config_set_default :try_flush_interval, 0.05
config_set_default :buffer_chunk_records_limit, 900
config_set_default :buffer_chunk_limit, 9437184
config_set_default :buffer_queue_limit, 64
helpers :compat_parameters

DEFAULT_BUFFER_TYPE = "memory"

config_param :project, :string, :default => nil
config_param :topic, :string, :default => nil
config_param :topic, :string
config_param :key, :string, :default => nil
config_param :autocreate_topic, :bool, :default => false

unless method_defined?(:log)
define_method("log") { $log }
end

unless method_defined?(:router)
define_method("router") { Fluent::Engine }
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
# In v0.14, buffer configurations are renamed.
# see: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb
config_set_default :flush_interval, 1
config_set_default :try_flush_interval, 0.05
config_set_default :chunk_limit_records, 900
config_set_default :chunk_limit_size, 9437184
config_set_default :queue_limit_length, 64
end

def configure(conf)
compat_parameters_convert(conf, :buffer)
super

raise Fluent::ConfigError, "'topic' must be specified." unless @topic
end

def start
Expand All @@ -42,6 +41,14 @@ def format(tag, time, record)
[tag, time, record].to_msgpack
end

def formatted_to_msgpack_binary?
true
end

def multi_workers_ready?
true
end

def write(chunk)
messages = []

Expand Down
4 changes: 2 additions & 2 deletions test/plugin/test_in_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
require_relative "../test_helper"

require 'fluent/test/driver/input'

class GcloudPubSubInputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end

def create_driver(conf=CONFIG)
Fluent::Test::InputTestDriver.new(Fluent::GcloudPubSubInput).configure(conf)
Fluent::Test::Driver::Input.new(Fluent::Plugin::GcloudPubSubInput).configure(conf)
end

def test_configure
Expand Down
19 changes: 14 additions & 5 deletions test/plugin/test_out_gcloud_pubsub.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require_relative "../test_helper"

require 'fluent/test/driver/output'

class GcloudPubSubOutputTest < Test::Unit::TestCase
DEFAULT_CONFIG = <<-EOC
Expand All @@ -16,7 +16,7 @@ def setup
end

def create_driver(conf)
Fluent::Test::BufferedOutputTestDriver.new(Fluent::GcloudPubSubOutput).configure(conf)
Fluent::Test::Driver::Output.new(Fluent::Plugin::GcloudPubSubOutput).configure(conf)
end

def test_configure
Expand All @@ -32,7 +32,7 @@ def test_configure
assert_equal('topic-test', d.instance.topic)
assert_equal('key-test', d.instance.key)
assert_equal(false, d.instance.autocreate_topic)
assert_equal(1, d.instance.flush_interval)
assert_equal(1, d.instance.buffer_config.flush_interval)
end

def test_autocreate_topic
Expand All @@ -47,7 +47,9 @@ def test_autocreate_topic

assert_equal(true, d.instance.autocreate_topic)

chunk = Fluent::MemoryBufferChunk.new('key', 'data')
tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"}
metadata = d.instance.metadata_for_test(tag, time, record)
chunk = d.instance.buffer.generate_chunk(metadata)
client = mock!
client.topic("topic-test", autocreate: true).once

Expand All @@ -61,7 +63,14 @@ def test_autocreate_topic

def test_re_raise_errors
d = create_driver(DEFAULT_CONFIG)
chunk = Fluent::MemoryBufferChunk.new('key', 'data')
tag, time, record = "tag", Fluent::Engine.now, {"a" => "b"}
metadata = d.instance.metadata_for_test(tag, time, record)
chunk = d.instance.buffer.generate_chunk(metadata).tap do |c|
c.append([d.instance.format(tag, time, record)])
end
# For chunk#msgpack_each
chunk.extend Fluent::ChunkMessagePackEventStreamer

client = Object.new
def client.publish
raise ReRaisedError
Expand Down