Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions arq_admin/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,16 @@ async def _get_job_id_to_status_map(self) -> Dict[str, JobStatus]:
job_ids_with_prefixes = (match.groupdict() for match in regex_matches_from_arq_keys if match is not None)

job_ids_to_scores = {key[0].decode('utf-8'): key[1] for key in job_ids_with_scores}
job_ids_in_queue = set(job_ids_to_scores.keys())
job_ids_to_prefixes = dict(sorted(
# not only ensure that we don't get key error but also filter out stuff that's not a client job
([key['job_id'], key['prefix']] for key in job_ids_with_prefixes if key['prefix'] in PREFIX_PRIORITY),
(
[key['job_id'], key['prefix']]
for key in job_ids_with_prefixes
if key['prefix'] in PREFIX_PRIORITY and (
key['job_id'] in job_ids_in_queue or key['prefix'] == 'result'
)
),
# make sure that more specific indices go after less specific ones
key=lambda job_id_with_prefix: PREFIX_PRIORITY[job_id_with_prefix[-1]],
))
Expand All @@ -189,4 +196,4 @@ def _get_job_status_from_raw_data(self, prefix: str, zscore: Optional[int]) -> J
return JobStatus.in_progress
if zscore:
return JobStatus.deferred if zscore > timestamp_ms() else JobStatus.queued
return JobStatus.not_found
return JobStatus.not_found # pragma: nocover
4 changes: 2 additions & 2 deletions arq_admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class QueueListView(ListView):

def get_queryset(self) -> List[QueueStats]:
result = asyncio.run(self._gather_queues())
return result
return result # pragma: nocover

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
Expand Down Expand Up @@ -60,7 +60,7 @@ def job_status(self) -> str:
def get_queryset(self) -> List[JobInfo]:
queue_name = self.kwargs['queue_name'] # pragma: no cover
jobs = asyncio.run(self._get_queue_jobs(queue_name))
return sorted(jobs, key=attrgetter('enqueue_time'))
return sorted(jobs, key=attrgetter('enqueue_time')) # pragma: nocover

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
Expand Down
34 changes: 33 additions & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from django.contrib.messages import get_messages
from django.http import HttpResponseRedirect
from django.template.response import TemplateResponse
from django.test import AsyncClient
from django.test import AsyncClient, override_settings
from django.urls import reverse

from arq_admin.queue import Queue
from tests.settings import REDIS_SETTINGS


@pytest.mark.asyncio()
Expand Down Expand Up @@ -125,3 +126,34 @@ async def test_post_job_abort_view(
assert len(messages) == 1
message = messages[0]
assert message.tags == message_tag


@pytest.mark.asyncio()
@pytest.mark.django_db()
@pytest.mark.usefixtures('django_login')
@override_settings(ARQ_QUEUES={
default_queue_name: REDIS_SETTINGS,
'arq:queue2': REDIS_SETTINGS,
})
async def test_two_queues_detail_views(async_client: AsyncClient, redis: ArqRedis) -> None:
second_queue_name = 'arq:queue2'
import arq_admin.settings as arq_admin_settings
from django.conf import settings as django_settings
arq_admin_settings.ARQ_QUEUES = django_settings.ARQ_QUEUES

await redis.enqueue_job('successful_task', _job_id='job1', _queue_name=default_queue_name)
await redis.enqueue_job('successful_task', _job_id='job2', _queue_name=second_queue_name)

# Check detail view for default queue
url1 = reverse('arq_admin:all_jobs', kwargs={'queue_name': default_queue_name})
result1 = await async_client.get(url1)
assert isinstance(result1, TemplateResponse)
assert len(result1.context_data['object_list']) == 1
assert result1.context_data['object_list'][0].job_id == 'job1'

# Check detail view for second queue
url2 = reverse('arq_admin:all_jobs', kwargs={'queue_name': second_queue_name})
result2 = await async_client.get(url2)
assert isinstance(result2, TemplateResponse)
assert len(result2.context_data['object_list']) == 1
assert result2.context_data['object_list'][0].job_id == 'job2'