Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ license = "MIT"
authors = [{ name = "Flowdacity" }]
requires-python = ">=3.12"
keywords = [
"flowdacity",
"tailback",
"queue",
"job queue",
"task queue",
Expand All @@ -20,7 +22,7 @@ keywords = [
"leaky bucket",
]
classifiers = [
"Development Status :: 3 - Alpha",
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
Expand All @@ -31,6 +33,7 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: System :: Distributed Computing",
"Topic :: Utilities",
"Typing :: Typed",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Verify the package has py.typed marker and type annotations

# Check for py.typed marker file
echo "=== Checking for py.typed marker file ==="
fd -t f '^py\.typed$' src/

# Check for type annotations in Python source files
echo -e "\n=== Checking for type annotations in source files ==="
rg -t py --type-not=python -t py -C2 'def \w+\([^)]*:\s*\w+|^\s*\w+:\s*\w+' src/tailback/ | head -n 50

# Check if __init__.py or main modules have type annotations
echo -e "\n=== Checking key module files ==="
fd -e py -t f . src/tailback/ -x head -n 30 {}

Repository: flowdacity/tailback

Length of output: 9121


🏁 Script executed:

#!/bin/bash
# Analyze type annotation coverage across the codebase

echo "=== Files with type hints (checking for function/variable annotations) ==="
fd -e py -t f . src/tailback/ | while read file; do
  # Count lines with type hints (simple heuristic: contains ': ' in function defs/variables)
  hints=$(grep -E '^\s*(def|class|@dataclass|\w+:).*:.*(\w+|\[.*\]|None|int|str|bool|Any)' "$file" 2>/dev/null | wc -l)
  lines=$(wc -l < "$file")
  if [ "$hints" -gt 0 ]; then
    echo "$file: $hints type-hinted lines / $lines total"
  fi
done

echo -e "\n=== Python version requirement check ==="
rg 'python.*=' pyproject.toml | head -5

echo -e "\n=== Checking function definitions without return type hints ==="
fd -e py -t f . src/tailback/ -x grep -l 'def ' {} \; | while read file; do
  no_return=$(grep -c '^[[:space:]]*def [^:]*:$' "$file" 2>/dev/null || echo 0)
  if [ "$no_return" -gt 0 ]; then
    echo "$file: $no_return functions without return type annotation"
  fi
done

Repository: flowdacity/tailback

Length of output: 1003


Remove the "Typing :: Typed" classifier or add comprehensive type annotations throughout the codebase.

The package has a py.typed marker file but lacks meaningful type annotations. Only 1 type-hinted line exists across the entire codebase, while 89+ functions are missing return type annotations. Per PEP 561, the presence of py.typed signals to type checkers that the package contains inline type hints; without them, this classifier misleads users about the package's type support. Either add comprehensive type annotations or remove this classifier until typing coverage is substantially improved.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pyproject.toml` at line 36, The package advertises typing support via the
"Typing :: Typed" classifier while only a py.typed marker exists and the
codebase lacks real type hints; remove the "Typing :: Typed" classifier from
pyproject.toml (the line containing "Typing :: Typed") unless you plan to add
comprehensive annotations, or conversely add PEP 484 annotations across the code
(start with public APIs and functions missing return annotations) and include a
py.typed marker; specifically either delete the "Typing :: Typed" entry from
pyproject.toml or add inline type hints (parameter and return types) for the
many untyped functions so the classifier accurately reflects the package.

]
Comment on lines 24 to 37
dependencies = ["msgpack>=1.1.2", "redis[hiredis]>=7.1.0"]

Expand Down
122 changes: 76 additions & 46 deletions src/tailback/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details.

from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass
from typing import Any, cast

from tailback.config import TailbackConfig
from tailback.exceptions import BadArgumentException
from tailback.keys import RedisKeys
from tailback.responses import (
RedisValue,
decode_redis_value,
format_dequeue_response,
format_metrics_counts,
Expand All @@ -24,6 +27,9 @@
validate_metrics_arguments,
)

RedisCall = tuple[list[str], list[Any]]
StatusResponse = dict[str, str]


@dataclass(frozen=True)
class ClearQueuePlan:
Expand All @@ -35,40 +41,40 @@ class ClearQueuePlan:
queue_type: str
queue_id: str

def payload_member(self, job_id):
def payload_member(self, job_id: str) -> str:
return "%s:%s:%s" % (self.queue_type, self.queue_id, job_id)


class BaseTailback(object):
"""Shared non-I/O behavior for async and sync Tailback clients."""

def __init__(self, config):
self._r = None
self._scripts = None
self.config = TailbackConfig.from_mapping(config)
self._keys = RedisKeys(self.config.queue.key_prefix)
def __init__(self, config: Mapping[str, Any]) -> None:
self._r: Any = None
self._scripts: Any = None
self.config: TailbackConfig = TailbackConfig.from_mapping(config)
self._keys: RedisKeys = RedisKeys(self.config.queue.key_prefix)

self._key_prefix = self.config.queue.key_prefix
self._job_expire_interval = int(self.config.queue.job_expire_interval)
self._default_job_requeue_limit = int(
self._key_prefix: str = self.config.queue.key_prefix
self._job_expire_interval: int = int(self.config.queue.job_expire_interval)
self._default_job_requeue_limit: int = int(
self.config.queue.default_job_requeue_limit
)

def redis_client(self):
def redis_client(self) -> Any | None:
return self._r

def _current_timestamp(self):
def _current_timestamp(self) -> str:
return str(generate_epoch())

def _build_enqueue_call(
self,
payload,
interval,
job_id,
queue_id,
queue_type,
requeue_limit,
):
payload: Any,
interval: int,
job_id: str,
queue_id: str,
queue_type: str,
requeue_limit: int | None,
) -> RedisCall:
enqueue_args = validate_enqueue_arguments(
payload,
interval,
Expand All @@ -89,54 +95,74 @@ def _build_enqueue_call(
]
return keys, args

def _build_dequeue_call(self, queue_type):
def _build_dequeue_call(self, queue_type: str) -> RedisCall:
validate_dequeue_arguments(queue_type)
return [self._key_prefix, queue_type], [
self._current_timestamp(),
self._job_expire_interval,
]

def _build_finish_call(self, job_id, queue_id, queue_type):
def _build_finish_call(
self,
job_id: str,
queue_id: str,
queue_type: str,
) -> RedisCall:
validate_finish_arguments(job_id, queue_id, queue_type)
return [self._key_prefix, queue_type], [queue_id, job_id]

def _build_interval_call(self, interval, queue_id, queue_type):
def _build_interval_call(
self,
interval: int,
queue_id: str,
queue_type: str,
) -> RedisCall:
validate_interval_arguments(interval, queue_id, queue_type)
keys = [
self._keys.interval_hash,
self._keys.interval_member(queue_type, queue_id),
]
return keys, [interval]

def _build_requeue_call(self, queue_type, timestamp):
def _build_requeue_call(self, queue_type: RedisValue, timestamp: str) -> RedisCall:
queue_type = decode_redis_value(queue_type)
return [self._key_prefix, queue_type], [timestamp]

def _build_global_metrics_call(self):
def _build_global_metrics_call(self) -> RedisCall:
return [self._key_prefix], [self._current_timestamp()]

def _build_queue_metrics_call(self, queue_type, queue_id):
def _build_queue_metrics_call(self, queue_type: str, queue_id: str) -> RedisCall:
return [self._keys.job_queue(queue_type, queue_id)], [self._current_timestamp()]

def _validate_metrics_call(self, queue_type, queue_id):
def _validate_metrics_call(
self,
queue_type: str | None,
queue_id: str | None,
) -> None:
validate_metrics_arguments(queue_type, queue_id)
if not queue_type and queue_id:
raise BadArgumentException(
"`queue_id` should be accompanied by `queue_type`."
)

def _queue_type_metrics_keys(self, queue_type):
def _queue_type_metrics_keys(self, queue_type: str) -> tuple[str, str]:
return (
self._keys.ready_queue_set(queue_type),
self._keys.active_queue_set(queue_type),
)

def _queue_length_key(self, queue_type, queue_id):
def _queue_length_key(self, queue_type: str, queue_id: str) -> str:
validate_get_queue_length_arguments(queue_type, queue_id)
return self._keys.job_queue(queue_type, queue_id)

def _clear_queue_plan(self, queue_type, queue_id):
def _clear_queue_plan(
self,
queue_type: str | None,
queue_id: str | None,
) -> ClearQueuePlan:
validate_clear_queue_arguments(queue_type, queue_id)
queue_type = cast(str, queue_type)
queue_id = cast(str, queue_id)
return ClearQueuePlan(
primary_set=self._keys.ready_queue_set(queue_type),
job_queue=self._keys.job_queue(queue_type, queue_id),
Expand All @@ -147,26 +173,26 @@ def _clear_queue_plan(self, queue_type, queue_id):
queue_id=queue_id,
)

def _finish_response(self, finish_response):
def _finish_response(self, finish_response: int) -> StatusResponse:
if finish_response == 0:
return {"status": "failure"}
return {"status": "success"}

def _interval_response(self, interval_response):
def _interval_response(self, interval_response: int) -> StatusResponse:
if interval_response == 0:
return {"status": "failure"}
return {"status": "success"}

def _dequeue_response(self, dequeue_response):
def _dequeue_response(self, dequeue_response: Sequence[Any]) -> dict[str, Any]:
return format_dequeue_response(dequeue_response)

def _global_metrics_response(
self,
active_queue_types,
ready_queue_types,
enqueue_details,
dequeue_details,
):
active_queue_types: Iterable[RedisValue],
ready_queue_types: Iterable[RedisValue],
enqueue_details: Sequence[Any],
dequeue_details: Sequence[Any],
) -> dict[str, Any]:
enqueue_counts, dequeue_counts = format_metrics_counts(
enqueue_details,
dequeue_details,
Expand All @@ -178,18 +204,22 @@ def _global_metrics_response(
"dequeue_counts": dequeue_counts,
}

def _queue_type_metrics_response(self, ready_queues, active_queues):
def _queue_type_metrics_response(
self,
ready_queues: Iterable[RedisValue],
active_queues: Iterable[RedisValue],
) -> dict[str, Any]:
return {
"status": "success",
"queue_ids": format_queue_ids(ready_queues, active_queues),
}

def _queue_metrics_response(
self,
queue_length,
enqueue_details,
dequeue_details,
):
queue_length: int | str | bytes,
enqueue_details: Sequence[Any],
dequeue_details: Sequence[Any],
) -> dict[str, Any]:
enqueue_counts, dequeue_counts = format_metrics_counts(
enqueue_details,
dequeue_details,
Expand All @@ -201,23 +231,23 @@ def _queue_metrics_response(
"dequeue_counts": dequeue_counts,
}

def _decode_redis_value(self, value):
def _decode_redis_value(self, value: RedisValue) -> str:
return decode_redis_value(value)

def _decode_requeue_job(self, job):
def _decode_requeue_job(self, job: RedisValue) -> tuple[str, str]:
queue_id, job_id = decode_redis_value(job).split(":")
return queue_id, job_id

def _clear_queue_empty_response(self):
def _clear_queue_empty_response(self) -> StatusResponse:
return {"status": "Failure", "message": "No queued calls found"}

def _clear_queue_removed_response(self):
def _clear_queue_removed_response(self) -> StatusResponse:
return {
"status": "Success",
"message": "Successfully removed all queued calls",
}

def _clear_queue_purged_response(self):
def _clear_queue_purged_response(self) -> StatusResponse:
return {
"status": "Success",
"message": "Successfully removed all queued calls and purged related resources",
Expand Down
Loading
Loading