Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/lhm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require 'lhm/table_name'
require 'lhm/table'
require 'lhm/progress'
require 'lhm/invoker'
require 'lhm/throttler'
require 'lhm/version'
Expand All @@ -11,6 +12,8 @@
require 'lhm/proxysql_helper'
require 'lhm/connection'
require 'lhm/test_support'
require 'lhm/connection'
require 'lhm/test_support'
require 'lhm/railtie' if defined?(Rails::Railtie)
require 'logger'

Expand Down Expand Up @@ -106,6 +109,14 @@ def self.logger=(new_logger)
@@logger = new_logger
end

def progress=(new_progress)
@@progress = new_progress
end

def progress
@@progress
end

def self.logger
@@logger ||=
begin
Expand Down
8 changes: 8 additions & 0 deletions lib/lhm/atomic_switcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,13 @@ def validate
def execute
@connection.execute(atomic_switch, should_retry: true, log_prefix: LOG_PREFIX)
end

def update_state_before_execute
Lhm.progress.update_state(Lhm::STATE_SWITCHING_TABLES)
end

def update_state_after_execute
Lhm.progress.update_state(Lhm::STATE_SWITCHED_TABLES)
end
end
end
28 changes: 21 additions & 7 deletions lib/lhm/chunker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Schmidt
require 'lhm/command'
require 'lhm/sql_helper'
require 'lhm/printer'
require 'lhm/chunk_insert'
require 'lhm/chunk_finder'

Expand Down Expand Up @@ -38,9 +37,11 @@ def initialize(migration, connection = nil, options = {})
end

def execute
@start_time = Time.now
start_time = Time.now

return if @chunk_finder.table_empty?
Lhm.progress.update_before_copy(@chunk_finder.start, @chunk_finder.limit)

@next_to_insert = @start
while @next_to_insert <= @limit || (@start == @limit)
stride = @throttler.stride
Expand All @@ -50,11 +51,12 @@ def execute
affected_rows = ChunkInsert.new(@migration, @connection, bottom, top, @retry_options).insert_and_return_count_of_rows_created
expected_rows = top - bottom + 1

# Only log the chunker progress every 5 minutes instead of every iteration
# Only log the chunker progress every 1 minute instead of every iteration
current_time = Time.now
if current_time - @start_time > (5 * 60)
Lhm.logger.info("Inserted #{affected_rows} rows into the destination table from #{bottom} to #{top}")
@start_time = current_time
if current_time - start_time > 60
Lhm.logger.info("Inserted #{Lhm.progress.rows_written} rows into the destination table
upto #{top} with a speed of #{Lhm.progress.copy_speed}")
start_time = current_time
end

if affected_rows < expected_rows
Expand All @@ -67,6 +69,7 @@ def execute

@next_to_insert = top + 1
@printer.notify(bottom, @limit)
Lhm.progress.update_during_copy(affected_rows)

break if @start == @limit
end
Expand All @@ -76,6 +79,18 @@ def execute
raise
end

def update_state_before_execute
Lhm.progress.update_state(Lhm::STATE_COPYING)
end

def update_state_after_execute
Lhm.progress.update_state(Lhm::STATE_COPYING_DONE)
end

def update_state_when_revert
Lhm.progress.update_state(Lhm::STATE_COPYING_FAILED)
end

private

def raise_on_non_pk_duplicate_warning
Expand Down Expand Up @@ -110,6 +125,5 @@ def validate
return if @chunk_finder.table_empty?
@chunk_finder.validate
end

end
end
31 changes: 30 additions & 1 deletion lib/lhm/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ def run(&block)

if block_given?
before
update_state_before_block
block.call(self)
after
update_state_after_block
else
execute
update_state_before_execute
value = execute
update_state_after_execute

value
end
rescue => e
Lhm.logger.error "Error in class=#{self.class}, reverting. exception=#{e.class} message=#{e.message}"
update_state_when_revert

revert
raise
end
Expand All @@ -31,6 +39,27 @@ def validate
def revert
end

# This method is called by the inherited class to update the state when there is an error raised
# either during the execution of class.execute or inside a block being ran by the class.
def update_state_when_revert
end

# This method is called by the inherited class before the class.execute method starts.
def update_state_before_execute
end

# This method is called by the inherited class after the class.execute methods completes
def update_state_after_execute
end

# This method is called before a block is ran by the inherited class.
def update_state_before_block
end

# This method is called after a block is ran by the inherited class.
def update_state_after_block
end

def execute
raise NotImplementedError.new(self.class.name)
end
Expand Down
13 changes: 13 additions & 0 deletions lib/lhm/entangler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ def revert
after
end

def update_state_before_block
Lhm.progress.update_state(Lhm::STATE_SETUP_DONE)
end

def update_state_when_revert
Lhm.progress.update_state(Lhm::STATE_SETUP_FAILED)
end

def update_state_after_block
Lhm.progress.update_state(Lhm::STATE_TRIGGERS_DROPPED)
end


private

def strip(sql)
Expand Down
12 changes: 12 additions & 0 deletions lib/lhm/locked_switcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,17 @@ def execute
@connection.execute(tagged(stmt))
end
end

def update_state_before_execute
Lhm.progress.update_state(Lhm::STATE_SWITCHING_TABLES)
end

def update_state_after_execute
Lhm.progress.update_state(Lhm::STATE_SWITCHED_TABLES)
end

def update_state_when_revert
Lhm.progress.update_state(Lhm::STATE_SWITCHING_TABLES_FAILED)
end
end
end
4 changes: 4 additions & 0 deletions lib/lhm/migrator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,9 @@ def assert_valid_idx_name(index_name)
raise ArgumentError, 'index_name must be a string or symbol'
end
end

def update_state_when_revert
Lhm.progress.update_state(Lhm::STATE_SETUP_FAILED)
end
end
end
3 changes: 3 additions & 0 deletions lib/lhm/printer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def notify(lowest, highest)
write(message)
end

def notify_progress(progress)
end

def end
write('100% complete')
end
Expand Down
Loading