One of the bottlenecks in scaling MLTrace occurs when there is a large stream of requests. One of the ways to achieve this is by asynchronously writing the entries to database. Currently, this process is synchronous, thus it becomes difficult to scale.
Implementation
The Celery Task Queue will be deployed with Redis as a broker, this should ensure ordering of writes, and multiple workers can be deployed to speed up the tasks.
Creating a central celery task manager to coordinate multiple tasks that need to be performed. Codeblock will be similar to
app = Flask(__name__)
def make_celery(app):
celery = Celery(
app.import_name,
broker=<BROKER_URL>, backend=<BACKEND_URL>,
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
This should also help developers to offload any future tasks to the task queue.
In client.py, create a new function asyn_create_component with following code
@celery.task
def async_create_component(
name: str, description: str, owner: str, tags: typing.List[str] = []
):
""" Same content as create_component function """
Todos to enable integration
Storage / DB Layer
API Layer
No changes since this would go under the hood. Optional → adding a tag to keep a run synchronously.
Query Layer
No changes in UI layer
Benchmarking / Testing
Deployment
One of the bottlenecks in scaling
MLTraceoccurs when there is a large stream of requests. One of the ways to achieve this is by asynchronously writing the entries to database. Currently, this process is synchronous, thus it becomes difficult to scale.Implementation
The Celery Task Queue will be deployed with Redis as a broker, this should ensure ordering of writes, and multiple workers can be deployed to speed up the tasks.
Creating a central celery task manager to coordinate multiple tasks that need to be performed. Codeblock will be similar to
This should also help developers to offload any future tasks to the task queue.
In
client.py, create a new functionasyn_create_componentwith following codeTodos to enable integration
Storage / DB Layer
async_create_componentfunction with@celery.taskoperator to enable async writes.API Layer
No changes since this would go under the hood. Optional → adding a tag to keep a run synchronously.
Query Layer
No changes in UI layer
Benchmarking / Testing
Deployment
docker-compose.