Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions splitio/events/events_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""sdk internal events task."""
import logging
import threading
import abc

_LOGGER = logging.getLogger(__name__)

class EventsTaskBase(object, metaclass=abc.ABCMeta):
"""task template."""

@abc.abstractmethod
def is_running(self):
"""Return whether the task is running."""

@abc.abstractmethod
def start(self):
"""Start task."""

@abc.abstractmethod
def stop(self):
"""Stop task."""

class EventsTask(EventsTaskBase):
"""sdk internal events processing task."""

_centinel = object()

def __init__(self, notify_internal_events, internal_events_queue):
"""
Class constructor.

:param synchronize_segment: handler to perform segment synchronization on incoming event
:type synchronize_segment: function

:param segment_queue: queue with segment updates notifications
:type segment_queue: queue
"""
self._internal_events_queue = internal_events_queue
self._handler = notify_internal_events
self._running = False
self._worker = None

def is_running(self):
"""Return whether the working is running."""
return self._running

def _run(self):
"""Run worker handler."""
while self.is_running():
event = self._internal_events_queue.get()
if not self.is_running():
break

if event == self._centinel:
continue

_LOGGER.debug('Processing sdk internal event: %s', event.internal_event)
try:
self._handler(event.internal_event, event.metadata)
except Exception:
_LOGGER.error('Exception raised in events manager')
_LOGGER.debug('Exception information: ', exc_info=True)

def start(self):
"""Start worker."""
if self.is_running():
_LOGGER.debug('Worker is already running')
return
self._running = True

_LOGGER.debug('Starting Event Task worker')
self._worker = threading.Thread(target=self._run, name='EventsTaskWorker', daemon=True)
self._worker.start()

def stop(self):
"""Stop worker."""
_LOGGER.debug('Stopping Event Task worker')
if not self.is_running():
_LOGGER.debug('Worker is not running. Ignoring.')
return
self._running = False
self._internal_events_queue.put(self._centinel)
23 changes: 23 additions & 0 deletions splitio/models/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,29 @@ def notification_type(self):
def split_name(self):
return self._split_name

class SdkInternalEventNotification(object): # pylint: disable=too-many-instance-attributes
"""SdkInternalEventNotification model object."""

def __init__(self, internal_event, metadata):
"""
Class constructor.

:param internal_event: internal event object
:type channel: SdkInternalEvent
:param metadata: metadata associated with event
:type change_number: EventsMetadata

"""
self._internal_event = internal_event
self._metadata = metadata

@property
def internal_event(self):
return self._internal_event

@property
def metadata(self):
return self._metadata

_NOTIFICATION_MAPPERS = {
Type.SPLIT_UPDATE: lambda c, d: SplitChangeNotification(c, Type.SPLIT_UPDATE, d['changeNumber']),
Expand Down
74 changes: 74 additions & 0 deletions tests/events/test_events_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""EventsManager test module."""
import pytest
import queue
import time

from splitio.models.events import SdkInternalEvent
from splitio.models.notification import SdkInternalEventNotification
from splitio.events.events_metadata import EventsMetadata
from splitio.events.events_metadata import SdkEventType
from splitio.events.events_task import EventsTask


class EventsTaskTests(object):
"""Tests for EventsManager."""

internal_event = None
metadata = None

def test_firing_events(self):
events_queue = queue.Queue()
events_task = EventsTask(self._event_callback, events_queue)

events_task.start()
assert events_task.is_running()

metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" })
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, metadata))
time.sleep(.5)
assert self.internal_event == SdkInternalEvent.SDK_READY
self._verify_metadata(metadata)

self._reset_flags()
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata))
time.sleep(.5)
assert self.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
self._verify_metadata(metadata)

events_task.stop()
time.sleep(.5)
assert not events_task.is_running()

def test_on_error(self):
events_queue = queue.Queue()

def handler_sync(internal_event, metadata):
raise Exception('some')

events_task = EventsTask(handler_sync, events_queue)
events_task.start()
assert events_task.is_running()

events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))

with pytest.raises(Exception):
events_task._handler()

assert events_task.is_running()
events_task.stop()
time.sleep(1)
assert not events_task.is_running()

def _reset_flags(self):
self.internal_event = None
self.metadata = None

def _event_callback(self, internal_event, metadata):
self.internal_event = internal_event
self.metadata = metadata

def _verify_metadata(self, metadata):
assert metadata.get_type() == self.metadata.get_type()
assert metadata.get_names() == self.metadata.get_names()