@@ -94,7 +94,7 @@ describe('QueueingEventProcessor', async () => {
9494 await expect ( processor . onRunning ( ) ) . resolves . not . toThrow ( ) ;
9595 } ) ;
9696
97- it ( 'should start dispatchRepeater and failedEventRepeater' , ( ) => {
97+ it ( 'should start failedEventRepeater' , ( ) => {
9898 const eventDispatcher = getMockDispatcher ( ) ;
9999 const dispatchRepeater = getMockRepeater ( ) ;
100100 const failedEventRepeater = getMockRepeater ( ) ;
@@ -107,7 +107,6 @@ describe('QueueingEventProcessor', async () => {
107107 } ) ;
108108
109109 processor . start ( ) ;
110- expect ( dispatchRepeater . start ) . toHaveBeenCalledOnce ( ) ;
111110 expect ( failedEventRepeater . start ) . toHaveBeenCalledOnce ( ) ;
112111 } ) ;
113112
@@ -167,14 +166,33 @@ describe('QueueingEventProcessor', async () => {
167166
168167 processor . start ( ) ;
169168 await processor . onRunning ( ) ;
170- for ( let i = 0 ; i < 100 ; i ++ ) {
169+ for ( let i = 0 ; i < 99 ; i ++ ) {
171170 const event = createImpressionEvent ( `id-${ i } ` ) ;
172171 await processor . process ( event ) ;
173172 }
174173
175174 expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 0 ) ;
176175 } ) ;
177176
177+ it ( 'should start the dispatchRepeater if it is not running' , async ( ) => {
178+ const eventDispatcher = getMockDispatcher ( ) ;
179+ const dispatchRepeater = getMockRepeater ( ) ;
180+
181+ const processor = new BatchEventProcessor ( {
182+ eventDispatcher,
183+ dispatchRepeater,
184+ batchSize : 100 ,
185+ } ) ;
186+
187+ processor . start ( ) ;
188+ await processor . onRunning ( ) ;
189+
190+ const event = createImpressionEvent ( 'id-1' ) ;
191+ await processor . process ( event ) ;
192+
193+ expect ( dispatchRepeater . start ) . toHaveBeenCalledOnce ( ) ;
194+ } ) ;
195+
178196 it ( 'should dispatch events if queue is full and clear queue' , async ( ) => {
179197 const eventDispatcher = getMockDispatcher ( ) ;
180198 const mockDispatch : MockInstance < typeof eventDispatcher . dispatchEvent > = eventDispatcher . dispatchEvent ;
@@ -190,30 +208,33 @@ describe('QueueingEventProcessor', async () => {
190208 await processor . onRunning ( ) ;
191209
192210 let events : ProcessableEvent [ ] = [ ] ;
193- for ( let i = 0 ; i < 100 ; i ++ ) {
211+ for ( let i = 0 ; i < 99 ; i ++ ) {
194212 const event = createImpressionEvent ( `id-${ i } ` ) ;
195213 events . push ( event ) ;
196214 await processor . process ( event ) ;
197215 }
198216
199217 expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 0 ) ;
200218
201- let event = createImpressionEvent ( 'id-100' ) ;
219+ let event = createImpressionEvent ( 'id-99' ) ;
220+ events . push ( event ) ;
202221 await processor . process ( event ) ;
203-
222+
204223 expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 1 ) ;
205224 expect ( eventDispatcher . dispatchEvent . mock . calls [ 0 ] [ 0 ] ) . toEqual ( buildLogEvent ( events ) ) ;
206225
207- events = [ event ] ;
208- for ( let i = 101 ; i < 200 ; i ++ ) {
226+ events = [ ] ;
227+
228+ for ( let i = 100 ; i < 199 ; i ++ ) {
209229 const event = createImpressionEvent ( `id-${ i } ` ) ;
210230 events . push ( event ) ;
211231 await processor . process ( event ) ;
212232 }
213233
214234 expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 1 ) ;
215235
216- event = createImpressionEvent ( 'id-200' ) ;
236+ event = createImpressionEvent ( 'id-199' ) ;
237+ events . push ( event ) ;
217238 await processor . process ( event ) ;
218239
219240 expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 2 ) ;
@@ -257,6 +278,40 @@ describe('QueueingEventProcessor', async () => {
257278 expect ( eventDispatcher . dispatchEvent . mock . calls [ 1 ] [ 0 ] ) . toEqual ( buildLogEvent ( [ newEvent ] ) ) ;
258279 } ) ;
259280
281+ it ( 'should flush queue immediately regardless of batchSize, if event processor is disposable' , async ( ) => {
282+ const eventDispatcher = getMockDispatcher ( ) ;
283+ const mockDispatch : MockInstance < typeof eventDispatcher . dispatchEvent > = eventDispatcher . dispatchEvent ;
284+ mockDispatch . mockResolvedValue ( { } ) ;
285+
286+ const dispatchRepeater = getMockRepeater ( ) ;
287+ const failedEventRepeater = getMockRepeater ( ) ;
288+
289+ const processor = new BatchEventProcessor ( {
290+ eventDispatcher,
291+ dispatchRepeater,
292+ failedEventRepeater,
293+ batchSize : 100 ,
294+ } ) ;
295+
296+ processor . makeDisposable ( ) ;
297+ processor . start ( ) ;
298+ await processor . onRunning ( ) ;
299+
300+ const events : ProcessableEvent [ ] = [ ] ;
301+ const event = createImpressionEvent ( 'id-1' ) ;
302+ events . push ( event ) ;
303+ await processor . process ( event ) ;
304+
305+ expect ( eventDispatcher . dispatchEvent ) . toHaveBeenCalledTimes ( 1 ) ;
306+ expect ( eventDispatcher . dispatchEvent . mock . calls [ 0 ] [ 0 ] ) . toEqual ( buildLogEvent ( events ) ) ;
307+ expect ( dispatchRepeater . reset ) . toHaveBeenCalledTimes ( 1 ) ;
308+ expect ( dispatchRepeater . start ) . not . toHaveBeenCalled ( ) ;
309+ expect ( failedEventRepeater . start ) . not . toHaveBeenCalled ( ) ;
310+ // eslint-disable-next-line @typescript-eslint/ban-ts-comment
311+ // @ts -ignore
312+ expect ( processor . retryConfig ?. maxRetries ) . toEqual ( 5 ) ;
313+ } ) ;
314+
260315 it ( 'should store the event in the eventStore with increasing ids' , async ( ) => {
261316 const eventDispatcher = getMockDispatcher ( ) ;
262317 const eventStore = getMockSyncCache < EventWithId > ( ) ;
0 commit comments