Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Gemfile.lock
.bundle
.idea
.ruby-version
examples/remote_executor_db.sqlite
vendor/
examples/*.sqlite
/Gemfile.local.rb
/test.log
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Metrics/CyclomaticComplexity:
# Offense count: 135
# Configuration parameters: CountComments, CountAsOne, ExcludedMethods.
Metrics/MethodLength:
Max: 47
Max: 44

# Offense count: 4
# Configuration parameters: CountComments, CountAsOne.
Expand Down
72 changes: 72 additions & 0 deletions examples/actors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative 'example_helper'

class ExampleActor
def initialize
@value = 0
end

def increment
@value += 1
STDOUT.puts "Value incremented to #{@value}"
end
end

def server_world
ExampleHelper.create_world do |config|
config.persistence_adapter = persistence_adapter
config.connector = connector
config.managed_actors.add('example', class: ExampleActor)
end
end

def client_world
ExampleHelper.create_world do |config|
config.persistence_adapter = persistence_adapter
config.connector = connector
end
end

def db_path
File.expand_path('actor_remote_executor_db.sqlite', __dir__)
end

def persistence_conn_string
ENV['DB_CONN_STRING'] || "sqlite://#{db_path}"
end

def persistence_adapter
Dynflow::PersistenceAdapters::Sequel.new persistence_conn_string
end

def connector
proc { |world| Dynflow::Connectors::Database.new(world) }
end

command = ARGV.first || 'server'

if $PROGRAM_NAME == __FILE__
case command
when 'server'
puts <<~MSG
The server is starting…. You can send the work to it by running:

#{$PROGRAM_NAME} client

MSG

world = server_world
world.managed_actors['example'].tell(:increment)
ExampleHelper.run_web_console(world)
when 'client'
world = client_world
100.times do |i|
world.message_actor('example', 'increment', [])
end
else
puts "Unknown command #{command}"
exit 1
end
end
2 changes: 1 addition & 1 deletion examples/remote_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def run_client(count)
when 'client'
RemoteExecutorExample.run_client(ARGV[1]&.to_i)
else
puts "Unknown command #{comment}"
puts "Unknown command #{command}"
exit 1
end
elsif defined?(Sidekiq)
Expand Down
56 changes: 55 additions & 1 deletion lib/dynflow/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def inspect
Concurrent::Actor::Envelope.prepend(EnvelopeBacktraceExtension)

# Common parent for all the Dynflow actors defining some defaults
# that we preffer here.
# that we prefer here.
class Actor < Concurrent::Actor::Context
module LogWithFullBacktrace
def log(level, message = nil, &block)
Expand Down Expand Up @@ -149,4 +149,58 @@ def behaviour_definition
Concurrent::Actor::Behaviour::ErrorsOnUnknownMessage]
end
end

class ManagedActor < Actor
def initialize(implementation)
@implementation = implementation.new
# TODO: validate timer_options, start_timer?
if @implementation.respond_to?(:timer_options)
@timer = Concurrent::TimerTask.new(@implementation.timer_options) do
reference.tell(:tick)
end
end
end

def start_timer
@timer.execute if @timer
end

def stop_timer
@timer.shutdown if @timer
end

def on_message(message)
method, *args = message

case method
when :start
@implementation.start if @implementation.respond_to?(:start)
when :terminate
stop_timer
@implementation.stop if @implementation.respond_to?(:stop)
args.first.fulfill true
return
else
@implementation.send(method, *args)
end

return unless @implementation.respond_to?(:start_timer?)

if @implementation.start_timer?
start_timer
else
stop_timer
end
end

def on_envelope(envelope)
# TODO: "run_user_code" ?
# TODO: ::Foreman.settings.load_values
# TODO: this should be in foreman-tasks?
return super unless defined? ::Rails
::Rails.application.executor.wrap do
super
end
end
end
end
21 changes: 21 additions & 0 deletions lib/dynflow/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def queues
@queues ||= @config.queues.finalized_config(self)
end

def managed_actors
@config.managed_actors.actors
end

def method_missing(name)
return @cache[name] if @cache.key?(name)
value = @config.send(name)
Expand All @@ -48,6 +52,19 @@ def method_missing(name)
end
end

class ManagedActorsConfig
attr_reader :actors

def initialize
@actors = {}
end

def add(name, options = {})
raise ArgumentError, "Actor #{name} is already defined" if @actors.key?(name)
@actors[name] = options
end
end

class QueuesConfig
attr_reader :queues

Expand Down Expand Up @@ -79,6 +96,10 @@ def queues
@queues ||= QueuesConfig.new
end

def managed_actors
@managed_actors ||= ManagedActorsConfig.new
end

config_attr :logger_adapter, LoggerAdapters::Abstract do
LoggerAdapters::Simple.new
end
Expand Down
11 changes: 11 additions & 0 deletions lib/dynflow/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ def self.inherited(klass)
end
end

class SingletonActorLock < LockByWorld
def initialize(world, actor_name)
super(world)
@data[:id] = self.class.lock_id(actor_name)
end

def self.lock_id(actor_name)
"actor:#{actor_name}"
end
end

class DelayedExecutorLock < LockByWorld
def initialize(world)
super
Expand Down
8 changes: 7 additions & 1 deletion lib/dynflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ module Dispatcher
optional: Algebrick::Types::Boolean
end

ActorMessage = type do
fields! actor_name: String,
message: String,
args: Array
end

Execution = type do
fields! execution_plan_id: String
end
Expand All @@ -33,7 +39,7 @@ module Dispatcher
fields! execution_plan_id: String
end

variants Event, Execution, Ping, Status, Planning, Halt
variants ActorMessage, Event, Execution, Ping, Status, Planning, Halt
end

Response = Algebrick.type do
Expand Down
14 changes: 13 additions & 1 deletion lib/dynflow/dispatcher/client_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def dispatch_request(request, client_world_id, request_id)
ignore_unknown = event.optional
find_executor(event.execution_plan_id)
end),
(on ~ActorMessage do |event|
find_actor_executor(event.actor_name)
end),
(on ~Halt do |event|
executor = find_executor(event.execution_plan_id)
executor == Dispatcher::UnknownWorld ? AnyExecutor : executor
Expand Down Expand Up @@ -207,6 +210,15 @@ def find_executor(execution_plan_id)
Dispatcher::UnknownWorld
end

def find_actor_executor(actor_name)
# TODO: viable to check world config for singleton property before checking for lock?
actor_lock = @world.coordinator.find_locks(class: Coordinator::SingletonActorLock.name,
id: "actor:#{actor_name}").first
return actor_lock.world_id if actor_lock

AnyExecutor
end

def track_request(finished, request, timeout)
id_suffix = @last_id_suffix += 1
id = "#{@world.id}-#{id_suffix}"
Expand Down Expand Up @@ -240,7 +252,7 @@ def resolve_tracked_request(id, error = nil)
(on Execution.(execution_plan_id: ~any) do |uuid|
@world.persistence.load_execution_plan(uuid)
end),
(on Event | Ping | Halt do
(on ActorMessage | Event | Ping | Halt do
true
end)
@tracked_requests.delete(id).success! resolve_to
Expand Down
16 changes: 16 additions & 0 deletions lib/dynflow/dispatcher/executor_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def initialize(world, semaphore)

def handle_request(envelope)
match(envelope.message,
on(ActorMessage) { handle_actor_message(envelope, envelope.message) },
on(Planning) { perform_planning(envelope, envelope.message) },
on(Execution) { perform_execution(envelope, envelope.message) },
on(Event) { perform_event(envelope, envelope.message) },
Expand All @@ -26,6 +27,21 @@ def perform_planning(envelope, planning)
respond(envelope, Failed[e.message])
end

def handle_actor_message(envelope, actor_message)
actor = @world.managed_actors[actor_message.actor_name]

unless actor
respond(envelope, Failed["Actor #{actor_message.actor_name} not found"])
return
end

actor.tell([actor_message.message.to_sym, *actor_message.args])

respond(envelope, Accepted)
rescue Dynflow::Error => e
respond(envelope, Failed[e.message])
end

def perform_execution(envelope, execution)
allocate_executor(execution.execution_plan_id, envelope.sender_id, envelope.request_id)
execution_lock = Coordinator::ExecutionLock.new(@world, execution.execution_plan_id, envelope.sender_id, envelope.request_id)
Expand Down
7 changes: 2 additions & 5 deletions lib/dynflow/rails/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Configuration
# the orchestration tied to the models.
attr_accessor :disable_active_record_actions

delegate :managed_actors, :queues, to: :world_config

def initialize
self.pool_size = 5
self.remote = ::Rails.env.production?
Expand Down Expand Up @@ -165,11 +167,6 @@ def world_config
end
end

# expose the queues definition to Rails developers
def queues
world_config.queues
end

protected

def default_sequel_adapter_options(world)
Expand Down
Loading
Loading