Skip to content
This repository was archived by the owner on Mar 27, 2023. It is now read-only.

Commit 98fd75a

Browse files
committed
changed celery default queue, cleaned up core views, fixed celery queue name in local development
1 parent 0d60383 commit 98fd75a

File tree

9 files changed

+35
-49
lines changed

9 files changed

+35
-49
lines changed

.env.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ GOOGLE_OAUTH2_SECRET=abcde-fghij
1818
SOCIAL_AUTH_FACEBOOK_KEY = ''
1919
SOCIAL_AUTH_FACEBOOK_SECRET = ''
2020

21-
CELERY_BROKER_URL=redis://redis:6379
22-
CELERY_RESULT_BACKEND=redis://redis:6379
21+
CELERY_BROKER_URL=redis://redis:6379/0
22+
CELERY_RESULT_BACKEND=redis://redis:6379/1
2323

2424
DJANGO_EMAIL_HOST=mailhog
2525
DJANGO_EMAIL_PORT=1025

awscdk/awscdk/celery_default.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
aws_ecs as ecs,
55
aws_logs as logs,
66
aws_cloudformation as cloudformation,
7-
aws_sqs as sqs,
87
)
98

109

backend/apps/core/management/commands/__init__.py

Whitespace-only changes.

backend/apps/core/management/commands/watch_celery.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,32 @@
66
https://avilpage.com/2017/05/how-to-auto-reload-celery-workers-in-development.html
77
"""
88

9+
import os
910
import shlex
1011
import subprocess
1112

1213
from django.core.management.base import BaseCommand
1314
from django.utils import autoreload
1415

1516

16-
def restart_celery():
17-
cmd = "pkill -9 celery"
17+
def restart_celery(queue=None, concurrency=None):
18+
cmd = 'pkill -9 celery'
1819
subprocess.call(shlex.split(cmd))
19-
cmd = "celery worker --app=backend.celery_app:app --loglevel=info --concurrency=2 --max-memory-per-child=150000" # noqa
20+
cmd = f"celery worker --app=backend.celery_app:app --loglevel=info -Q {queue} -n worker-{queue}@%h --concurrency={os.environ.get('CONCURRENT_WORKERS', 2)} --max-memory-per-child=150000" # noqa
2021
subprocess.call(shlex.split(cmd))
2122

2223

2324
class Command(BaseCommand):
25+
def add_arguments(self, parser):
26+
parser.add_argument(
27+
'-q', '--queue', nargs=1, default='celery', type=str
28+
)
29+
parser.add_argument('-c', '--concurrency', type=str)
30+
2431
def handle(self, *args, **options):
25-
print("Starting celery worker with autoreload...")
26-
autoreload.run_with_reloader(restart_celery)
32+
queue = options['queue'][0]
33+
concurrency = options['concurrency'] or 1
34+
print('Starting celery worker with autoreload...')
35+
autoreload.run_with_reloader(
36+
restart_celery, queue=queue, concurrency=concurrency
37+
)

backend/apps/core/tasks.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ class BaseTask(celery.Task):
1515
@task(bind=True, base=BaseTask)
1616
def debug_task(self):
1717
time.sleep(10)
18-
print("Task is done")
1918

2019

2120
@periodic_task(
@@ -30,8 +29,8 @@ def debug_periodic_task():
3029
@task(bind=True, base=BaseTask)
3130
def send_test_email_task(self):
3231
send_mail(
33-
"Subject here",
34-
"Here is the message.",
32+
"Email subject",
33+
"Email message.",
3534
"from@example.com",
3635
["to@example.com"],
3736
fail_silently=False,
@@ -40,6 +39,5 @@ def send_test_email_task(self):
4039

4140
@task(bind=True, base=BaseTask)
4241
def sleep_task(self, seconds):
43-
print("sleeping")
4442
time.sleep(int(seconds))
4543
return f"Slept {seconds} seconds"

backend/apps/core/urls.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
from . import views
44

55
urlpatterns = [
6-
path("", views.home, name="home"),
7-
path("hello-world", views.hello_world, name="hello-world"),
8-
path("debug-task/", views.debug_task_view, name="debug-task",),
6+
path("health-check/", views.health_check, name="health-check"),
97
path("celery/sleep-task/", views.sleep_task_view, name="sleep-task"),
108
path(
119
"debug/send-test-email/",

backend/apps/core/views.py

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,32 @@ def get(self, request):
1818
value = r.get("cached_value")
1919

2020
if value:
21-
print(value)
2221
count = value
2322
return JsonResponse({"count": count})
2423

2524
def post(self, request):
2625
new_count = int(request.data["count"])
27-
print("setting value to cache:")
28-
print(new_count)
2926
r.set("cached_value", new_count)
3027
new_count = r.get("cached_value")
31-
print("value from cache is...")
32-
print(new_count)
3328
return JsonResponse({"count": new_count})
3429

3530
def delete(self, request):
3631
r.delete("cached_value")
3732
return JsonResponse({"count": r.get("cached_value")})
3833

3934

40-
def hello_world(request):
41-
response = JsonResponse(
42-
{
43-
"message": "Hello, World!",
44-
"git_sha": os.environ.get("GIT_SHA", "<git SHA>"),
45-
"debug": settings.DEBUG,
46-
"format": "JSON",
47-
"ssm_param": os.environ.get("MY_PARAM", "param_value"),
48-
}
49-
)
50-
return response
51-
52-
53-
def home(request):
54-
response = JsonResponse({"message": "Root"})
35+
def health_check(request):
36+
response = JsonResponse({"message": "OK"})
5537
return response
5638

5739

58-
def debug_task_view(request):
59-
debug_task.delay()
60-
return JsonResponse({"message": "Task sent to queue."})
61-
62-
6340
@api_view(["POST"])
6441
def sleep_task_view(request):
6542
sleep_seconds = request.data.get("seconds")
66-
print(sleep_seconds)
67-
sleep_task.delay(sleep_seconds)
68-
return JsonResponse({"message": "Sleep task submitted"})
43+
sleep_task.apply_async(
44+
[sleep_seconds], queue=settings.CELERY_QUEUE_DEFAULT
45+
)
46+
return JsonResponse({"message": f"Sleep task submitted ({sleep_seconds})"})
6947

7048

7149
def send_test_email(request):

backend/backend/settings/base.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import os
1414

15+
from kombu import Queue
1516
import redis
1617

1718
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
@@ -202,8 +203,12 @@
202203
CELERY_ACCEPT_CONTENT = ["application/json"]
203204
CELERY_TASK_SERIALIZER = "json"
204205
CELERY_RESULT_SERIALIZER = "json"
205-
CELERY_BROKER_URL = f"redis://{REDIS_SERVICE_HOST}:6379/0" # noqa
206-
CELERY_RESULT_BACKEND = f"redis://{REDIS_SERVICE_HOST}:6379/0" # noqa
206+
CELERY_BROKER_URL = f"redis://{REDIS_SERVICE_HOST}:6379/1" # noqa
207+
CELERY_RESULT_BACKEND = f"redis://{REDIS_SERVICE_HOST}:6379/2" # noqa
208+
209+
CELERY_QUEUE_DEFAULT = 'default'
210+
211+
CELERY_QUEUES = (Queue(CELERY_QUEUE_DEFAULT, routing_key='default'),)
207212

208213

209214
AUTH_USER_MODEL = "accounts.CustomUser"

docker-compose.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,8 @@ services:
128128

129129
celery:
130130
<<: *backend
131-
build:
132-
context: ./backend
133-
dockerfile: scripts/dev/Dockerfile
134131
container_name: celery
135-
command: bash -c 'python3 manage.py watch_celery'
132+
command: bash -c 'python3 manage.py watch_celery --queue default --concurrency=2'
136133
volumes:
137134
- ./backend:/code
138135
ports: []

0 commit comments

Comments
 (0)