Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
DFNS_PATH: ${{ github.workspace }}/modflow6/doc/mf6io/mf6ivar/dfn
MODFLOW_DEVTOOLS_AUTO_SYNC: 0
# use --dist loadfile to so tests requiring pytest-virtualenv run on the same worker
run: uv run pytest -v -n auto --dist loadfile --durations 0 --ignore test_download.py --ignore test_models.py --ignore dfns/test_dfns_registry.py
run: uv run pytest -v -n auto --dist loadfile --durations 0 --ignore test_download.py --ignore test_models.py --ignore dfns/test_registry.py

- name: Run network-dependent tests
# only invoke the GH API on one OS and Python version
Expand All @@ -153,7 +153,7 @@ jobs:
TEST_PROGRAMS_REPO: MODFLOW-ORG/modflow6
TEST_PROGRAMS_REF: develop
TEST_PROGRAMS_SOURCE: modflow6
run: uv run pytest -v -n auto --dist loadgroup --durations 0 test_download.py test_models.py dfns/test_dfns_registry.py
run: uv run pytest -v -n auto --dist loadgroup --durations 0 test_download.py test_models.py dfns/test_registry.py

rtd:
name: Docs
Expand Down
212 changes: 212 additions & 0 deletions autotest/dfns/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Tests for the DFNs CLI"""

from pathlib import Path
from unittest.mock import patch

import pytest

from modflow_devtools.dfns.__main__ import main
from modflow_devtools.dfns.registry import RemoteDfnRegistry


@pytest.fixture
def cache_root(tmp_path):
"""Redirect the global DFN cache to a temp directory for test isolation."""
with patch.object(RemoteDfnRegistry, "base_cache_path", return_value=tmp_path):
yield tmp_path


def _populate_cache(cache_root: Path, release_id: str) -> Path:
"""Write a minimal fake DFN file so the registry looks cached."""
repo, tag = release_id.split("@")
tag_dir = cache_root / repo / tag
tag_dir.mkdir(parents=True, exist_ok=True)
(tag_dir / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")
return tag_dir


def _one_registry(release_id: str) -> dict:
return {release_id: RemoteDfnRegistry(release_id=release_id)}


def test_main_help(capsys):
result = main([])
assert result == 0
assert capsys.readouterr().out # some help text printed


def test_info_not_cached(cache_root, capsys):
with patch.object(
RemoteDfnRegistry,
"load_default",
return_value=_one_registry("MODFLOW-ORG/modflow6@6.6.0"),
):
result = main(["info"])
assert result == 0
assert "Not cached" in capsys.readouterr().out


def test_info_cached(cache_root, capsys):
release_id = "MODFLOW-ORG/modflow6@6.6.0"
_populate_cache(cache_root, release_id)
with patch.object(RemoteDfnRegistry, "load_default", return_value=_one_registry(release_id)):
result = main(["info"])
assert result == 0
assert "Cached" in capsys.readouterr().out


def test_info_latest_shows_resolved_tag(cache_root, capsys):
"""For @latest releases, info shows the resolved version that is cached."""
release_id = "MODFLOW-ORG/modflow6@latest"
# simulate a resolved "v6.7.0" tag already on disk
repo, _ = release_id.split("@")
tag_dir = cache_root / repo / "v6.7.0"
tag_dir.mkdir(parents=True)
(tag_dir / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")

with patch.object(RemoteDfnRegistry, "load_default", return_value=_one_registry(release_id)):
result = main(["info"])
assert result == 0
out = capsys.readouterr().out
assert "Cached" in out
assert "latest" in out
assert "v6.7.0" in out


def test_info_multiple_registries(cache_root, capsys):
"""info lists each registry entry."""
ids = ["MODFLOW-ORG/modflow6@6.5.0", "MODFLOW-ORG/modflow6@6.6.0"]
_populate_cache(cache_root, ids[0])
# ids[1] deliberately not cached
registries = {rid: RemoteDfnRegistry(release_id=rid) for rid in ids}
with patch.object(RemoteDfnRegistry, "load_default", return_value=registries):
result = main(["info"])
assert result == 0
out = capsys.readouterr().out
assert "6.5.0" in out
assert "6.6.0" in out


def test_clean_removes_cache(cache_root, capsys):
release_id = "MODFLOW-ORG/modflow6@6.6.0"
_populate_cache(cache_root, release_id)
assert any(cache_root.rglob("*.toml"))

result = main(["clean"])
assert result == 0
assert not cache_root.exists()
assert "clean" in capsys.readouterr().out.lower()


def test_clean_idempotent(cache_root):
"""clean on a non-existent / empty cache silently succeeds."""
result = main(["clean"])
assert result == 0


def test_sync_populates_cache(cache_root, capsys):
release_id = "MODFLOW-ORG/modflow6@6.6.0"
registry = RemoteDfnRegistry(release_id=release_id)

def _fake_sync(force=False):
path = registry.cache_path
path.mkdir(parents=True, exist_ok=True)
(path / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")

with (
patch.object(RemoteDfnRegistry, "load_default", return_value={release_id: registry}),
patch.object(RemoteDfnRegistry, "sync", side_effect=_fake_sync),
):
result = main(["sync"])

assert result == 0
assert registry.cache_path.exists()
assert any(registry.cache_path.iterdir())
assert "Synced" in capsys.readouterr().out


def test_sync_force_flag(cache_root):
"""--force is forwarded to registry.sync."""
release_id = "MODFLOW-ORG/modflow6@6.6.0"
registry = RemoteDfnRegistry(release_id=release_id)
calls: list[bool] = []

def _capturing_sync(force=False):
calls.append(force)
path = registry.cache_path
path.mkdir(parents=True, exist_ok=True)
(path / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")

with (
patch.object(RemoteDfnRegistry, "load_default", return_value={release_id: registry}),
patch.object(RemoteDfnRegistry, "sync", side_effect=_capturing_sync),
):
main(["sync", "--force"])

assert calls == [True]


def test_sync_default_no_force(cache_root):
"""sync without --force passes force=False."""
release_id = "MODFLOW-ORG/modflow6@6.6.0"
registry = RemoteDfnRegistry(release_id=release_id)
calls: list[bool] = []

def _capturing_sync(force=False):
calls.append(force)
path = registry.cache_path
path.mkdir(parents=True, exist_ok=True)
(path / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")

with (
patch.object(RemoteDfnRegistry, "load_default", return_value={release_id: registry}),
patch.object(RemoteDfnRegistry, "sync", side_effect=_capturing_sync),
):
main(["sync"])

assert calls == [False]


def test_sync_error_returns_nonzero(cache_root, capsys):
release_id = "MODFLOW-ORG/modflow6@6.6.0"
registry = RemoteDfnRegistry(release_id=release_id)

with (
patch.object(RemoteDfnRegistry, "load_default", return_value={release_id: registry}),
patch.object(RemoteDfnRegistry, "sync", side_effect=ConnectionError("network down")),
):
result = main(["sync"])

assert result != 0
assert "network down" in capsys.readouterr().err


def test_roundtrip(cache_root, capsys):
"""info (not cached) → sync → info (cached) → clean → info (not cached)."""
release_id = "MODFLOW-ORG/modflow6@6.6.0"
registry = RemoteDfnRegistry(release_id=release_id)
registries = {release_id: registry}

def _fake_sync(force=False):
path = registry.cache_path
path.mkdir(parents=True, exist_ok=True)
(path / "gwf-chd.toml").write_text("name = 'gwf-chd'\n")

with (
patch.object(RemoteDfnRegistry, "load_default", return_value=registries),
patch.object(RemoteDfnRegistry, "sync", side_effect=_fake_sync),
):
main(["info"])
assert "Not cached" in capsys.readouterr().out

main(["sync"])
capsys.readouterr() # discard sync output

main(["info"])
assert "Cached" in capsys.readouterr().out

main(["clean"])
capsys.readouterr() # discard clean output

main(["info"])
assert "Not cached" in capsys.readouterr().out
56 changes: 0 additions & 56 deletions autotest/dfns/test_dfns.py

This file was deleted.

38 changes: 21 additions & 17 deletions autotest/test_dfnmap.py → autotest/dfns/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import pytest
import yaml

from modflow_devtools.dfn import Dfn, fetch_dfns
from modflow_devtools.dfnmap import migrate
from modflow_devtools.markers import requires_pkg
from modflow_devtools.dfns import fetch_dfns, migrate

FORMATS = ["yaml", "toml", "json"]
MF6_OWNER = "MODFLOW-ORG"
Expand Down Expand Up @@ -37,30 +35,36 @@ def dfn_dir(module_tmpdir):


@pytest.fixture(scope="module", params=FORMATS)
def converted_v2(request, dfn_dir, module_tmpdir):
def v1_1(request, dfn_dir, module_tmpdir):
fmt = request.param
out = module_tmpdir / f"v1_1-{fmt}"
migrate(dfn_dir, out, schema_version="1.1", fmt=fmt)
return out, fmt


@pytest.fixture(scope="module", params=FORMATS)
def v2(request, dfn_dir, module_tmpdir):
fmt = request.param
out = module_tmpdir / f"v2-{fmt}"
migrate(dfn_dir, out, schema_version="2", fmt=fmt)
return out, fmt


@requires_pkg("boltons")
def test_convert_v2(converted_v2):
out, fmt = converted_v2
def test_migrate_v1_1(v1_1):
out, fmt = v1_1
files = list(out.glob(f"*.{fmt}"))
assert files
for p in files:
data = _load(p, fmt)
assert data["name"] == p.stem
assert data["schema_version"] == "2"
assert data["schema_version"] == "1.1"


@requires_pkg("boltons")
def test_roundtrip(converted_v2):
"""Verify Dfn.load can read v2-schema files in any format."""
out, fmt = converted_v2
mode = "rb" if fmt == "toml" else "r"
for p in out.glob(f"*.{fmt}"):
with p.open(mode) as f:
dfn = Dfn.load(f, name=p.stem, version=fmt)
assert any(dfn)
def test_migrate_v2(v2):
out, fmt = v2
files = list(out.glob(f"*.{fmt}"))
assert files
for p in files:
data = _load(p, fmt)
assert data["name"] == p.stem
assert data["schema_version"] == "2"
Loading
Loading