@@ -2,29 +2,36 @@ module Stackify
22 class MsgsQueue < SizedQueue
33 include MonitorMixin
44
5+ attr_accessor :worker
6+
57 CHUNK_MIN_WEIGHT = 50
68 ERROR_SIZE = 10
79 LOG_SIZE = 1
810 DELAY_WAITING = 1
911
1012 def initialize
1113 super ( Stackify . configuration . queue_max_size )
14+ start_worker
15+ end
16+
17+ alias :old_push :push
18+
19+ def start_worker
1220 if Stackify ::Utils . is_mode_on? Stackify ::MODES [ :logging ]
1321 @send_interval = ScheduleDelay . new
14- worker = MsgsQueueWorker . new
22+ @ worker = MsgsQueueWorker . new
1523 task = update_send_interval_task
16- worker . async_perform @send_interval , task
24+ @ worker. async_perform @send_interval , task
1725 else
1826 Stackify . internal_log :warn , '[MsgsQueue]: Logging is disabled at configuration!'
1927 end
2028 end
2129
22- alias :old_push :push
23-
2430 def push_remained_msgs
31+ Stackify . internal_log :debug , "[MsgsQueue] push_remained_msgs() alive? = #{ @worker . alive? } "
2532 wait_until_all_workers_will_add_msgs
2633 self . synchronize do
27- Stackify . internal_log :info , 'All remained logs are going to be sent'
34+ Stackify . internal_log :info , '[MsgsQueue] All remained logs are going to be sent'
2835 Stackify . shutdown_all
2936 if self . length > 0
3037 Stackify . logs_sender . send_logs ( pop_all )
@@ -34,6 +41,11 @@ def push_remained_msgs
3441 end
3542
3643 def add_msg msg
44+ Stackify . internal_log :debug , "[MsgsQueue] add_msg() Is worker <#{ @worker . name } > alive? = #{ @worker . alive? } "
45+ if !@worker . alive?
46+ start_worker
47+ Stackify . internal_log :debug , "[MsgsQueue] add_msg() Newly created worker <#{ @worker . name } >"
48+ end
3749 self . synchronize do
3850 Stackify ::Utils . do_only_if_authorized_and_mode_is_on Stackify ::MODES [ :logging ] do
3951 old_push ( msg )
@@ -73,7 +85,6 @@ def update_send_interval_task
7385 Stackify ::ScheduleTask . new properties do
7486 processed_count = calculate_processed_msgs_count
7587 i = @send_interval . update_by_sent_num! processed_count
76- Stackify . internal_log :debug , "MsgsQueue: send_interval is updated to #{ i } "
7788 i
7889 end
7990 end
@@ -104,7 +115,6 @@ def push_one_chunk
104115 break
105116 end
106117 end
107-
108118 Stackify . logs_sender . send_logs ( chunk ) if chunk . length > 0
109119 chunk_weight
110120 end
0 commit comments