@@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor";
1818import { getBatchedAsync , getBatchedSync , Store } from "../utils/cache/store" ;
1919import { EventDispatcher , EventDispatcherResponse , LogEvent } from "./event_dispatcher/event_dispatcher" ;
2020import { buildLogEvent } from "./event_builder/log_event" ;
21- import { BackoffController , ExponentialBackoff , IntervalRepeater , Repeater } from "../utils/repeater/repeater" ;
21+ import { BackoffController , ExponentialBackoff , Repeater } from "../utils/repeater/repeater" ;
2222import { LoggerFacade } from '../logging/logger' ;
2323import { BaseService , ServiceState , StartupLog } from "../service" ;
24- import { Consumer , Fn , Producer } from "../utils/type" ;
24+ import { Consumer , Fn , Maybe , Producer } from "../utils/type" ;
2525import { RunResult , runWithRetry } from "../utils/executor/backoff_retry_runner" ;
2626import { isSuccessStatusCode } from "../utils/http_request_handler/http_util" ;
2727import { EventEmitter } from "../utils/event_emitter/event_emitter" ;
@@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131import { OptimizelyError } from "../error/optimizly_error" ;
3232import { sprintf } from "../utils/fns" ;
3333import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service" ;
34+ import { EVENT_STORE_FULL } from "../message/log_message" ;
3435
3536export const DEFAULT_MIN_BACKOFF = 1000 ;
3637export const DEFAULT_MAX_BACKOFF = 32000 ;
38+ export const MAX_EVENTS_IN_STORE = 500 ;
3739
3840export type EventWithId = {
3941 id : string ;
4042 event : ProcessableEvent ;
43+ notStored ?: boolean ;
4144} ;
4245
4346export type RetryConfig = {
@@ -59,7 +62,7 @@ export type BatchEventProcessorConfig = {
5962
6063type EventBatch = {
6164 request : LogEvent ,
62- ids : string [ ] ,
65+ events : EventWithId [ ] ,
6366}
6467
6568export const LOGGER_NAME = 'BatchEventProcessor' ;
@@ -70,11 +73,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7073 private eventQueue : EventWithId [ ] = [ ] ;
7174 private batchSize : number ;
7275 private eventStore ?: Store < EventWithId > ;
76+ private eventCountInStore : Maybe < number > = undefined ;
77+ private maxEventsInStore : number = MAX_EVENTS_IN_STORE ;
7378 private dispatchRepeater : Repeater ;
7479 private failedEventRepeater ?: Repeater ;
7580 private idGenerator : IdGenerator = new IdGenerator ( ) ;
7681 private runningTask : Map < string , RunResult < EventDispatcherResponse > > = new Map ( ) ;
77- private dispatchingEventIds : Set < string > = new Set ( ) ;
82+ private dispatchingEvents : Map < string , EventWithId > = new Map ( ) ;
7883 private eventEmitter : EventEmitter < { dispatch : LogEvent } > = new EventEmitter ( ) ;
7984 private retryConfig ?: RetryConfig ;
8085
@@ -84,11 +89,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
8489 this . closingEventDispatcher = config . closingEventDispatcher ;
8590 this . batchSize = config . batchSize ;
8691 this . eventStore = config . eventStore ;
92+
8793 this . retryConfig = config . retryConfig ;
8894
8995 this . dispatchRepeater = config . dispatchRepeater ;
9096 this . dispatchRepeater . setTask ( ( ) => this . flush ( ) ) ;
9197
98+ this . maxEventsInStore = Math . max ( 2 * config . batchSize , MAX_EVENTS_IN_STORE ) ;
9299 this . failedEventRepeater = config . failedEventRepeater ;
93100 this . failedEventRepeater ?. setTask ( ( ) => this . retryFailedEvents ( ) ) ;
94101 if ( config . logger ) {
@@ -111,7 +118,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
111118 }
112119
113120 const keys = ( await this . eventStore . getKeys ( ) ) . filter (
114- ( k ) => ! this . dispatchingEventIds . has ( k ) && ! this . eventQueue . find ( ( e ) => e . id === k )
121+ ( k ) => ! this . dispatchingEvents . has ( k ) && ! this . eventQueue . find ( ( e ) => e . id === k )
115122 ) ;
116123
117124 const events = await ( this . eventStore . operation === 'sync' ?
@@ -138,7 +145,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
138145 ( currentBatch . length > 0 && ! areEventContextsEqual ( currentBatch [ 0 ] . event , event . event ) ) ) {
139146 batches . push ( {
140147 request : buildLogEvent ( currentBatch . map ( ( e ) => e . event ) ) ,
141- ids : currentBatch . map ( ( e ) => e . id ) ,
148+ events : currentBatch ,
142149 } ) ;
143150 currentBatch = [ ] ;
144151 }
@@ -148,7 +155,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
148155 if ( currentBatch . length > 0 ) {
149156 batches . push ( {
150157 request : buildLogEvent ( currentBatch . map ( ( e ) => e . event ) ) ,
151- ids : currentBatch . map ( ( e ) => e . id ) ,
158+ events : currentBatch ,
152159 } ) ;
153160 }
154161
@@ -163,15 +170,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
163170 }
164171
165172 const events : ProcessableEvent [ ] = [ ] ;
166- const ids : string [ ] = [ ] ;
173+ const eventWithIds : EventWithId [ ] = [ ] ;
167174
168175 this . eventQueue . forEach ( ( event ) => {
169176 events . push ( event . event ) ;
170- ids . push ( event . id ) ;
177+ eventWithIds . push ( event ) ;
171178 } ) ;
172179
173180 this . eventQueue = [ ] ;
174- return { request : buildLogEvent ( events ) , ids } ;
181+ return { request : buildLogEvent ( events ) , events : eventWithIds } ;
175182 }
176183
177184 private async executeDispatch ( request : LogEvent , closing = false ) : Promise < EventDispatcherResponse > {
@@ -185,10 +192,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
185192 }
186193
187194 private dispatchBatch ( batch : EventBatch , closing : boolean ) : void {
188- const { request, ids } = batch ;
195+ const { request, events } = batch ;
189196
190- ids . forEach ( ( id ) => {
191- this . dispatchingEventIds . add ( id ) ;
197+ events . forEach ( ( event ) => {
198+ this . dispatchingEvents . set ( event . id , event ) ;
192199 } ) ;
193200
194201 const runResult : RunResult < EventDispatcherResponse > = this . retryConfig
@@ -205,9 +212,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
205212 this . runningTask . set ( taskId , runResult ) ;
206213
207214 runResult . result . then ( ( res ) => {
208- ids . forEach ( ( id ) => {
209- this . dispatchingEventIds . delete ( id ) ;
210- this . eventStore ?. remove ( id ) ;
215+ events . forEach ( ( event ) => {
216+ this . eventStore ?. remove ( event . id ) ;
217+ if ( ! event . notStored && this . eventCountInStore ) {
218+ this . eventCountInStore -- ;
219+ }
211220 } ) ;
212221 return Promise . resolve ( ) ;
213222 } ) . catch ( ( err ) => {
@@ -216,7 +225,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
216225 this . logger ?. error ( err ) ;
217226 } ) . finally ( ( ) => {
218227 this . runningTask . delete ( taskId ) ;
219- ids . forEach ( ( id ) => this . dispatchingEventIds . delete ( id ) ) ;
228+ events . forEach ( ( event ) => this . dispatchingEvents . delete ( event . id ) ) ;
220229 } ) ;
221230 }
222231
@@ -235,12 +244,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
235244 return Promise . reject ( new OptimizelyError ( SERVICE_NOT_RUNNING , 'BatchEventProcessor' ) ) ;
236245 }
237246
238- const eventWithId = {
247+ const eventWithId : EventWithId = {
239248 id : this . idGenerator . getId ( ) ,
240249 event : event ,
241250 } ;
242251
243- await this . eventStore ?. set ( eventWithId . id , eventWithId ) ;
252+ await this . storeEvent ( eventWithId ) ;
244253
245254 if ( this . eventQueue . length > 0 && ! areEventContextsEqual ( this . eventQueue [ 0 ] . event , event ) ) {
246255 this . flush ( ) ;
@@ -253,7 +262,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
253262 } else if ( ! this . dispatchRepeater . isRunning ( ) ) {
254263 this . dispatchRepeater . start ( ) ;
255264 }
265+ }
266+
267+ private async findEventCountInStore ( ) : Promise < void > {
268+ if ( this . eventStore && this . eventCountInStore === undefined ) {
269+ try {
270+ const keys = await this . eventStore . getKeys ( ) ;
271+ this . eventCountInStore = keys . length ;
272+ } catch ( e ) {
273+ this . logger ?. error ( e ) ;
274+ }
275+ }
276+ }
256277
278+ private async storeEvent ( eventWithId : EventWithId ) : Promise < void > {
279+ await this . findEventCountInStore ( ) ;
280+ if ( this . eventCountInStore !== undefined && this . eventCountInStore >= this . maxEventsInStore ) {
281+ this . logger ?. info ( EVENT_STORE_FULL , eventWithId . event . uuid ) ;
282+ eventWithId . notStored = true ;
283+ return ;
284+ }
285+
286+ await Promise . resolve ( this . eventStore ?. set ( eventWithId . id , eventWithId ) ) . then ( ( ) => {
287+ if ( this . eventCountInStore !== undefined ) {
288+ this . eventCountInStore ++ ;
289+ }
290+ } ) . catch ( ( e ) => {
291+ eventWithId . notStored = true ;
292+ this . logger ?. error ( e ) ;
293+ } ) ;
257294 }
258295
259296 start ( ) : void {
0 commit comments