Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14,553 changes: 14,553 additions & 0 deletions schema/single_site.json

Large diffs are not rendered by default.

1,433 changes: 0 additions & 1,433 deletions schema/single_site_matching.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ description = "A library of curricula for the VrForaging task."
authors = [
{name = "Bruno Cruz", email = "bruno.cruz@alleninstitute.org"},
{name = "Tiffany Ona", email = "tiffany.ona@alleninstitute.org"},
{name = "Galen Lynch", email = "galen@galenlynch.com"},
]
license = "MIT"
version = "1.0.0rc6"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Single Site curriculum

A **single-site, non-baited** patch-foraging bandit, similar to the dynamic
foraging uncoupled, unbaited task. Each patch is one odor-marked reward site (a
single accept/reject decision; no stored/baited reward). Two reward odors carry
block-switching reward probabilities, so the animal must track relative action
value. Code: `metrics.py`, `stages.py`, `policies.py`, `helpers.py`, `curriculum.py`.

## Stages

Four stages, in order; the first three shape, the last is terminal.

```
learn_to_stop → learn_to_choose → probability_grid_short_delay → probability_grid_long_delay
```

| stage | goal | environment | within-session updater | stop / delay |
|---|---|---|---|---|
| `learn_to_stop` | a real stop in one session | 2 odors A,B, both `p_reward=1.0` | `STOP_VELOCITY_THRESHOLD` 60→8 (gain ×0.93) | stop **1.0 s** (fixed); delay 0.5 s |
| `learn_to_choose` | high-contrast discrimination | alternating `(0.9, 0.1)` / `(0.1, 0.9)` blocks | `REWARD_DELAY_OFFSET` 0→0.3 (+0.002) | stop 1.0 s; delay 0.5→0.8 s |
| `probability_grid_short_delay` | grid + grow patience | 13-block band + distractor C (occ `0.475/0.475/0.05`) | `REWARD_DELAY_OFFSET` 0→1.5 (+0.01) | stop 1.0 s; delay `0.2 + Exp(0.4)`, [0.2, 2.5] s + the ramp |
| `probability_grid_long_delay` | terminal / analysis | same 13-block band | none | stop 1.0 s; delay `0.2 + Exp(2.1)`, [0.2, 7.0] s (stationary) |

Reward is 7 µL; velocity threshold is 8 cm/s from `learn_to_choose` on; `learn_to_stop`
starts at 60.

## Transition gates

| from → to | fires when |
|---|---|
| `learn_to_stop` → `learn_to_choose` | `last_stop_threshold_updater ≤ 8`, `n_seen ≥ 250`, `n_visited ≥ 150` |
| `learn_to_choose` → `probability_grid_short_delay` | `last_reward_delay_offset_updater ≥ 0.25`, `n_seen ≥ 200`, `n_visited ≥ 50`, `visit_ratio ≤ 0.7` |
| `probability_grid_short_delay` → `probability_grid_long_delay` | `last_reward_delay_offset_updater ≥ 1.3`, `n_seen ≥ 300`, `n_visited ≥ 100`, `0.3 ≤ visit_ratio ≤ 0.7` |

`visit_ratio = n_patches_visited / n_patches_seen`.

## Cross-session policies (start policies)

Applied on the next in-session day; on a stage transition the task resets to its
defaults and these apply from the following session.

- `p_learn_to_stop` — seed `STOP_VELOCITY_THRESHOLD` from the prior session end ×1.2
(eased), clamped to [8, 60].
- `p_reward_water_gate` — hold `p_reward = 1.0` while the prior session collected
< 0.6 mL water; drop to 0.8 once the animal reliably earns. Keys on water actually
collected, so a non-earning animal is never penalized.
- `p_learn_to_run` — ease `learn_to_stop` geometry from compressed toward full,
scaled by prior locomotion (`n_patches_seen / 150`).
- `p_seed_reward_delay` — seed `REWARD_DELAY_OFFSET` from the prior session end ×0.8.

## Probability-grid band

Blocks are `(p_A, p_B)` pairs from the 5×5 grid over `{0.1, 0.3, 0.5, 0.7, 0.9}`,
kept only where the **summed** reward probability is in `{0.8, 1.0, 1.2}` → **13
blocks** (vs the full 24). This holds environmental reward rate roughly constant
(per-site offered rate 0.38–0.57) while preserving relative-value contrast
`|p_A − p_B|` up to 0.8 — so an unlucky block can't reward-starve the animal, and
no rich-everywhere block kills the incentive to skip.

## Corridor geometry (cm)

| stage | reward site | inter-site | inter-patch |
|---|---|---|---|
| `learn_to_stop` (compressed → full) | 25 → 40 | 10 → 15 | `25 + Exp(50)`, [25,90] → `50 + Exp(120)`, [50,150] |
| later stages | 50 | 15 | `30 + Exp(60)`, [30,190] |

## Metrics (`metrics_from_dataset`)

`n_patches_visited` = `ChoiceFeedback` count · `n_patches_seen` = `ActivePatch` count ·
`last_{stop_threshold,reward_delay_offset}_updater` = last value of the in-session
updater · `total_water_consumed` = summed `GiveReward` (mL).
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import Any, Type, TypeVar

import aind_behavior_curriculum
import pydantic
from aind_behavior_curriculum import (
StageTransition,
Trainer,
TrainerState,
create_curriculum,
)
from aind_behavior_vr_foraging.task_logic import AindVrForagingTaskLogic

from .. import __semver__
from ..cli import CurriculumCliArgs, CurriculumSuggestion
from ..utils import metrics_from_dataset_path, trainer_state_from_file
from .metrics import SingleSiteMetrics
from .stages import (
make_s_learn_to_choose,
make_s_learn_to_stop,
make_s_probability_grid_long_delay,
make_s_probability_grid_short_delay,
)

CURRICULUM_NAME = "SingleSite"
PKG_LOCATION = ".".join(__name__.split(".")[:-1])

TModel = TypeVar("TModel", bound=pydantic.BaseModel)


# ============================================================
# Stage transitions
# ============================================================


def st_s_learn_to_stop_to_s_learn_to_choose(metrics: SingleSiteMetrics) -> bool:
# Stop duration is fixed at 1.0 s now, so graduation is gated on the velocity
# threshold reaching its floor plus enough qualifying stops in a session.
if metrics.last_stop_threshold_updater is None:
return False
return (
(metrics.last_stop_threshold_updater <= 8)
and (metrics.n_patches_seen >= 250)
and (metrics.n_patches_visited >= 150)
)


def st_s_learn_to_choose_to_s_probability_grid_short_delay(metrics: SingleSiteMetrics) -> bool:
# Graduate once the subject is discriminating (visiting <= 70% of patches seen)
# and the within-session reward delay has started to grow.
if metrics.last_reward_delay_offset_updater is None or metrics.n_patches_seen == 0:
return False
visit_ratio = metrics.n_patches_visited / metrics.n_patches_seen
return (
(metrics.n_patches_seen >= 200)
and (metrics.n_patches_visited >= 50)
and (visit_ratio <= 0.7)
and (metrics.last_reward_delay_offset_updater >= 0.25)
)


def st_s_probability_grid_short_delay_to_s_probability_grid_long_delay(metrics: SingleSiteMetrics) -> bool:
# probability_grid_short ramps REWARD_DELAY_OFFSET 0 -> 1.5 (folded from the old
# three_contrast); graduate to the long-delay stage once delay is grown and the
# subject is harvesting in the 0.3-0.7 visit-ratio band.
if metrics.last_reward_delay_offset_updater is None or metrics.n_patches_seen == 0:
return False
visit_ratio = metrics.n_patches_visited / metrics.n_patches_seen
return (
(metrics.n_patches_seen >= 300)
and (metrics.n_patches_visited >= 100)
and (0.3 <= visit_ratio <= 0.7)
and (metrics.last_reward_delay_offset_updater >= 1.3)
)


# ============================================================
# Curriculum definition
# ============================================================

curriculum_class: Type[aind_behavior_curriculum.Curriculum[AindVrForagingTaskLogic]] = create_curriculum(
CURRICULUM_NAME, __semver__, (AindVrForagingTaskLogic,), pkg_location=PKG_LOCATION
)
CURRICULUM = curriculum_class()

CURRICULUM.add_stage_transition(
make_s_learn_to_stop(),
make_s_learn_to_choose(),
StageTransition(st_s_learn_to_stop_to_s_learn_to_choose),
)
CURRICULUM.add_stage_transition(
make_s_learn_to_choose(),
make_s_probability_grid_short_delay(),
StageTransition(st_s_learn_to_choose_to_s_probability_grid_short_delay),
)
CURRICULUM.add_stage_transition(
make_s_probability_grid_short_delay(),
make_s_probability_grid_long_delay(),
StageTransition(st_s_probability_grid_short_delay_to_s_probability_grid_long_delay),
)

# ==============================================================================
# Create a Trainer that uses the curriculum to bootstrap suggestions
# ==============================================================================

TRAINER = Trainer(CURRICULUM)


def run_curriculum(args: CurriculumCliArgs) -> CurriculumSuggestion[TrainerState[Any], Any]:
metrics: aind_behavior_curriculum.Metrics
trainer_state = trainer_state_from_file(args.input_trainer_state, TRAINER)
metrics = metrics_from_dataset_path(args.data_directory, trainer_state)
trainer_state = TRAINER.evaluate(trainer_state, metrics)
return CurriculumSuggestion(trainer_state=trainer_state, metrics=metrics, version=__semver__)
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Reusable builders for the single-site curriculum: small task-logic constructors
plus numeric helpers. Stage definitions live in ``stages.py``; this module is only
construction plumbing."""

from typing import Any, Optional

from aind_behavior_services.task import distributions
from aind_behavior_vr_foraging import task_logic

#: Water volume (microlitres) delivered per harvested reward.
REWARD_AMOUNT_UL: float = 7.0


def clamp(value: float, minimum: float, maximum: float) -> float:
return max(minimum, min(value, maximum))


def lerp(start: float, end: float, fraction: float) -> float:
return start + (end - start) * fraction


def make_default_operation_control(velocity_threshold: float) -> task_logic.OperationControl:
"""Operation control shared by every stage: movable spout off, audio effectively
silent, no odor flow, only the stop-velocity threshold varies."""
return task_logic.OperationControl(
movable_spout_control=task_logic.MovableSpoutControl(enabled=False),
audio_control=task_logic.AudioControl(duration=0.2, frequency=9999),
odor_control=task_logic.OdorControl(),
position_control=task_logic.PositionControl(
frequency_filter_cutoff=5,
velocity_threshold=velocity_threshold,
),
)


def make_reward_delay(offset: float, mean: float, max_delay: float) -> distributions.ExponentialDistribution:
"""Stochastic reward delay: ``offset + Exp(mean)`` truncated to ``[offset, max_delay]`` (s)."""
return distributions.ExponentialDistribution(
distribution_parameters=distributions.ExponentialDistributionParameters(rate=1.0 / mean),
scaling_parameters=distributions.ScalingParameters(offset=offset),
truncation_parameters=distributions.TruncationParameters(min=offset, max=max_delay),
)


def make_patch(
label: str,
state_index: int,
odor_index: int,
p_reward: float,
stop_duration: float = 0.5,
reward_amount: float = REWARD_AMOUNT_UL,
inter_site_length: float = 15,
reward_site_length: float = 40,
inter_patch_min_length: float = 50,
inter_patch_mean_length: float = 150,
inter_patch_max_length: float = 500,
delay: Optional[distributions.Distribution] = None,
) -> task_logic.Patch:
"""A single odor-marked reward site. One accept/reject decision per patch
(``OnChoice``/``OnRejection`` count 1); reward is non-baited."""
if delay is None:
delay = task_logic.scalar_value(0.5)
return task_logic.Patch(
label=label,
state_index=state_index,
odor_specification=[1 if i == odor_index else 0 for i in range(3)],
patch_terminators=[
task_logic.PatchTerminatorOnChoice(count=task_logic.scalar_value(1)),
task_logic.PatchTerminatorOnRejection(count=task_logic.scalar_value(1)),
],
reward_specification=task_logic.RewardSpecification(
amount=task_logic.scalar_value(reward_amount),
probability=task_logic.scalar_value(p_reward),
available=task_logic.scalar_value(999999),
delay=delay,
operant_logic=task_logic.OperantLogic(
is_operant=False,
stop_duration=stop_duration,
time_to_collect_reward=100000,
grace_distance_threshold=10,
),
),
patch_virtual_sites_generator=task_logic.PatchVirtualSitesGenerator(
inter_patch=task_logic.VirtualSiteGenerator(
render_specification=task_logic.RenderSpecification(contrast=1),
label=task_logic.VirtualSiteLabels.INTERPATCH,
length_distribution=distributions.ExponentialDistribution(
distribution_parameters=distributions.ExponentialDistributionParameters(
rate=1.0 / inter_patch_mean_length
),
scaling_parameters=distributions.ScalingParameters(offset=inter_patch_min_length),
truncation_parameters=distributions.TruncationParameters(
min=inter_patch_min_length,
max=inter_patch_max_length,
),
),
),
inter_site=task_logic.VirtualSiteGenerator(
render_specification=task_logic.RenderSpecification(contrast=0.5),
label=task_logic.VirtualSiteLabels.INTERSITE,
length_distribution=task_logic.scalar_value(inter_site_length),
),
reward_site=task_logic.VirtualSiteGenerator(
render_specification=task_logic.RenderSpecification(contrast=0.5),
label=task_logic.VirtualSiteLabels.REWARDSITE,
length_distribution=task_logic.scalar_value(reward_site_length),
),
),
)


def make_block(
p_rewards: tuple[float, Optional[float], Optional[float]],
n_min_patches: int = 100,
block_length_exp_mean: float = 25,
block_length_max: Optional[float] = None,
first_state_occupancy: Optional[list[float]] = None,
make_patch_kwargs: Optional[dict[str, Any]] = None,
) -> task_logic.Block:
"""A block of OdorA/B/C patches. ``p_rewards`` gives each odor's reward
probability; a ``None`` entry omits that odor (so ``(1.0, 1.0, None)`` is a
two-odor block). Block length is ``n_min_patches + Exp(block_length_exp_mean)``
truncated to ``[n_min_patches, block_length_max]`` patches."""
make_patch_kwargs = make_patch_kwargs or {}
patches = [make_patch(label="OdorA", state_index=0, odor_index=0, p_reward=p_rewards[0], **make_patch_kwargs)]
if p_rewards[1] is not None:
patches.append(
make_patch(label="OdorB", state_index=1, odor_index=1, p_reward=p_rewards[1], **make_patch_kwargs)
)
if p_rewards[2] is not None:
patches.append(
make_patch(label="OdorC", state_index=2, odor_index=2, p_reward=p_rewards[2], **make_patch_kwargs)
)

if first_state_occupancy is None:
first_state_occupancy = [1.0 / len(patches)] * len(patches)
if block_length_max is None:
block_length_max = n_min_patches + 50
return task_logic.Block(
environment=task_logic.MarkovEnvironment(
first_state_occupancy=list(first_state_occupancy),
transition_matrix=[list(first_state_occupancy) for _ in range(len(patches))],
patches=patches,
),
end_conditions=[
task_logic.BlockEndConditionPatchCount(
value=distributions.ExponentialDistribution(
distribution_parameters=distributions.ExponentialDistributionParameters(
rate=1 / block_length_exp_mean
),
scaling_parameters=distributions.ScalingParameters(offset=n_min_patches),
truncation_parameters=distributions.TruncationParameters(min=n_min_patches, max=block_length_max),
)
)
],
)
Loading
Loading