Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8128dfa
feat(core): add cancel_generation() to ModelOutputThunk
planetf1 Apr 27, 2026
f26cce7
feat(stdlib): add stream_with_chunking() with per-chunk validation (#…
planetf1 Apr 27, 2026
93e7587
test(stdlib): add StreamingMockBackend and streaming orchestration tests
planetf1 Apr 27, 2026
a5d358c
docs: add streaming_chunking example (#901)
planetf1 Apr 27, 2026
39f18a4
docs(stdlib): add Args section to StreamChunkingResult class docstring
planetf1 Apr 28, 2026
36173cb
docs(stdlib): add Raises section to stream_with_chunking docstring
planetf1 Apr 28, 2026
ea6bdb0
fix(stdlib): stream_with_chunking passes one chunk per stream_validat…
planetf1 Apr 28, 2026
35df77f
docs(stdlib): fix example for delta semantics and note validator latency
planetf1 Apr 28, 2026
61448a9
feat(stdlib): flush trailing chunk fragment at end of stream
planetf1 Apr 28, 2026
def10b6
fix(stdlib): address review feedback on streaming validation
planetf1 Apr 28, 2026
da41a06
fix(stdlib): address second-round review feedback
planetf1 Apr 28, 2026
74c009d
docs(stdlib): add Args and Returns sections to chunker flush overrides
planetf1 Apr 28, 2026
3fb501e
fix(stdlib): address third-round review feedback
planetf1 Apr 29, 2026
5850f92
fix(stdlib): stash orchestrator exception and narrow finally except
planetf1 May 1, 2026
4f508fd
feat(core): add cancelled flag on ModelOutputThunk
planetf1 May 5, 2026
5075a47
docs(stdlib): note ChunkingStrategy is text-only
planetf1 May 5, 2026
f0f93b3
test(stdlib): assert cancelled flag reflects cancellation state
planetf1 May 5, 2026
18bfe02
fix(stdlib): address psschwei review comments on streaming
planetf1 May 11, 2026
7fc40a4
fix(stdlib): clone requirements before backend start; cancel peer val…
planetf1 May 13, 2026
d8018dd
fix(core,hf): cooperative cancel via StoppingCriteria backed by threa…
planetf1 May 13, 2026
bf9a62b
fix(stdlib,core,hf): three pre-merge correctness fixes
planetf1 May 13, 2026
9a715d6
fix: address second-review feedback on bf9a62bc
planetf1 May 13, 2026
f3e3501
docs(core): add Raises section to cancel_generation() docstring
planetf1 May 13, 2026
2f2e352
docs(agents): add docstring quality gate to self-review checklist
planetf1 May 13, 2026
66260fe
fix: address review feedback from psschwei + jakelorocco
planetf1 May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ Use the tool's common name (e.g., GitHub Copilot, Cursor, etc.).
3. New functions typed with concise docstrings?
4. Unit tests added for new functionality?
5. Avoided over-engineering?
6. If the diff adds `raise` statements to library code (`mellea/` but not `test/`), run the docstring quality gate before pushing:
```bash
uv run python tooling/docs-autogen/audit_coverage.py --docs-dir docs/docs/api --quality --fail-on-quality --threshold 100 --orphans
```
Every new `raise` in a public function requires a matching `Raises:` entry — the `build-and-validate` CI job enforces this with `--fail-on-quality`.

## 11. Writing Tests

Expand Down
110 changes: 110 additions & 0 deletions docs/examples/streaming/streaming_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# pytest: ollama, e2e
Comment thread
planetf1 marked this conversation as resolved.

"""Streaming generation with per-chunk validation using stream_with_chunking().

Demonstrates:
- Subclassing Requirement to override stream_validate() for early-exit checks
- Calling stream_with_chunking() with sentence-level chunking
- Consuming validated chunks via astream() as they arrive
- Awaiting full completion with acomplete() to access final_validations and full_text
"""

import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import (
PartialValidationResult,
Requirement,
ValidationResult,
)
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import stream_with_chunking

# Crude sentence-terminator detector. A run of ``.``/``!``/``?`` counts once
# (so "..." and "!!!" are a single terminator). Good enough for an example;
# production code might use spaCy/NLTK for proper sentence segmentation.
_SENTENCE_END = re.compile(r"[.!?]+")


class MaxSentencesReq(Requirement):
"""Fails if the model generates more than *limit* sentences mid-stream.

Counts sentence terminators in the chunk *text* rather than counting
``stream_validate`` calls. This makes the requirement **chunker-agnostic**:
the same instance behaves correctly with sentence, word, or paragraph
chunking, because the semantics depend on content, not on the chunker's
structural decisions.

When writing your own streaming requirements, prefer this content-driven
pattern over coupling the requirement to a specific chunker. Reach for
chunker-coupled logic only when the requirement is genuinely a property
of chunk boundaries (e.g. "no chunk longer than N tokens").
"""

def __init__(self, limit: int) -> None:
super().__init__()
self._limit = limit
self._count = 0

def format_for_llm(self) -> str:
return f"The response must be at most {self._limit} sentences long."

async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
self._count += len(_SENTENCE_END.findall(chunk))
if self._count > self._limit:
return PartialValidationResult(
"fail",
reason=f"Response exceeded {self._limit} sentence limit mid-stream",
)
return PartialValidationResult("unknown")
Comment thread
jakelorocco marked this conversation as resolved.

async def validate(
self,
backend: Backend,
ctx: Context,
*,
format: type | None = None,
model_options: dict | None = None,
) -> ValidationResult:
return ValidationResult(result=True)
Comment on lines +65 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think validate and stream_validate should return equivalent results for most requirements.



async def main() -> None:
from mellea.stdlib.session import start_session

m = start_session()
backend = m.backend
ctx = m.ctx

action = Instruction(
"Write a short paragraph about the water cycle in exactly two sentences."
)
req = MaxSentencesReq(limit=3)

result = await stream_with_chunking(
action, backend, ctx, quick_check_requirements=[req], chunking="sentence"
)

print("Streaming chunks as they arrive:")
async for chunk in result.astream():
print(f" CHUNK: {chunk!r}")

await result.acomplete()

print(f"\nCompleted normally: {result.completed}")
print(f"Full text: {result.full_text!r}")

if result.streaming_failures:
for _req, pvr in result.streaming_failures:
print(f"Streaming failure: {pvr.reason}")

if result.final_validations:
for vr in result.final_validations:
print(f"Final validation: {'PASS' if vr.as_bool() else 'FAIL'}")


asyncio.run(main())
69 changes: 69 additions & 0 deletions mellea/backends/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.cache_utils import DynamicCache
from transformers.generation.logits_process import LogitsProcessorList
from transformers.generation.stopping_criteria import (
StoppingCriteria,
StoppingCriteriaList,
)
from transformers.generation.streamers import AsyncTextIteratorStreamer
from transformers.generation.utils import GenerateDecoderOnlyOutput
from transformers.modeling_utils import PreTrainedModel
Expand Down Expand Up @@ -74,6 +78,45 @@
)
from .utils import to_chat, to_tool_calls


class _EventStoppingCriteria(StoppingCriteria):
"""StoppingCriteria that signals the model to stop when a threading.Event is set.

Used by LocalHFBackend to implement cooperative cancellation: when
``cancel_generation`` is called, it sets the backing event via
``_cancel_hook`` before cancelling the asyncio task, giving the HF
``model.generate`` thread a chance to exit cleanly rather than running
to completion.
"""

def __init__(self, event: threading.Event) -> None:
self._event = event

def __call__(self, input_ids: Any, scores: Any, **kwargs: Any) -> bool: # type: ignore[override]
return self._event.is_set()


def _install_cancel_stopping_criteria(
generate_options: dict[str, Any], streaming_kwargs: dict[str, Any]
) -> threading.Event:
"""Wire a cooperative-cancel event into the generate call's stopping criteria.

Pops any caller-supplied ``stopping_criteria`` from *generate_options* (to
avoid passing it twice via both ``**generate_options`` and
``**streaming_kwargs``), prepends an :class:`_EventStoppingCriteria` backed
by a fresh ``threading.Event``, and stores the merged list in
*streaming_kwargs*. Returns the event so the caller can arm
``output._cancel_hook = event.set``.
"""
cancel_event = threading.Event()
user_sc = generate_options.pop("stopping_criteria", None)
streaming_kwargs["stopping_criteria"] = StoppingCriteriaList(
[_EventStoppingCriteria(cancel_event)]
+ (list(user_sc) if user_sc is not None else [])
)
return cancel_event


"""A configuration type for the unhappy path: Tokenizer * Model * torch device string

Huggingface backends can initialize themselves from a model string if the transformers `Auto*` classes can be used. Therefore, a TransformersTorchConfig usually isn't required. However, sometimes a model needs special care to instantiate properly, or a custom device type needs to bse used. Instead of trying to do a lot of partial magic, we basically have two modaliites: either the constructor can figure out everything from the model_id, or the user has to provide an entire config.
Expand Down Expand Up @@ -839,6 +882,15 @@ async def _generate_from_context_with_kv_cache(
# Filter out chat template-only options before passing to generate()
generate_options = self._filter_chat_template_only_options(model_options)

# Only install cooperative-cancel plumbing on the streaming path.
# Non-streaming calls have no orchestrator calling cancel_generation(),
# so the hook would be dead code and the StoppingCriteria would silently
# wrap any user-supplied stopping_criteria on every decode step.
if stream:
_cancel_event = _install_cancel_stopping_criteria(
generate_options, streaming_kwargs
)

linearized_ctx = ctx.view_for_generation()
assert linearized_ctx is not None
_input_text, input_ids, merged_cache, attention_mask = (
Expand Down Expand Up @@ -867,6 +919,10 @@ async def _generate_from_context_with_kv_cache(
)

output = ModelOutputThunk(None)
# Arm the cancel hook before creating tasks so a cancel racing
# task creation still finds the hook set.
if stream:
output._cancel_hook = _cancel_event.set
output._start = datetime.datetime.now()
output._context = ctx.view_for_generation()
output._action = action
Expand Down Expand Up @@ -1002,6 +1058,15 @@ async def _generate_from_context_standard(
# Filter out chat template-only options before passing to generate()
generate_options = self._filter_chat_template_only_options(model_options)

# Only install cooperative-cancel plumbing on the streaming path.
# Non-streaming calls have no orchestrator calling cancel_generation(),
# so the hook would be dead code and the StoppingCriteria would silently
# wrap any user-supplied stopping_criteria on every decode step.
if stream:
_cancel_event = _install_cancel_stopping_criteria(
generate_options, streaming_kwargs
)

chat_response = asyncio.to_thread(
self._generate_with_adapter_lock,
"", # Empty for no adapters.
Expand All @@ -1016,6 +1081,10 @@ async def _generate_from_context_standard(
)

output = ModelOutputThunk(None)
# Arm the cancel hook before creating tasks so a cancel racing
# task creation still finds the hook set.
if stream:
output._cancel_hook = _cancel_event.set
output._start = datetime.datetime.now()
output._context = ctx.view_for_generation()
output._action = action
Expand Down
Loading
Loading