Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 21 additions & 46 deletions pysus/api/_impl/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
from typing import Literal

import pandas as pd
from anyio import to_thread
from pysus.api.client import PySUS
from pysus.api.ducklake.catalog import CatalogDataset, CatalogFile, DatasetGroup
from pysus.api.types import State
from sqlalchemy.orm import joinedload
from tqdm import tqdm


Expand Down Expand Up @@ -302,66 +299,44 @@ def list_files(
"CNES",
"CIHA",
],
client: Literal["FTP", "DadosGov"] | None = None,
group: str | None = None,
state: str | None = None,
year: int | list[int] | None = None,
month: int | list[int] | None = None,
**kwargs,
) -> pd.DataFrame:
"""List catalog files for a dataset, filtered by group/state/year/month."""
"""List catalog files filtered by client, group, state, year, and month."""

async def _list():
async with PySUS() as pysus:
ducklake = await pysus.get_ducklake()
if ducklake._Session is None:
await ducklake.connect()

def _query():
with ducklake._Session() as session:
q = session.query(CatalogFile).options(
joinedload(CatalogFile.dataset),
joinedload(CatalogFile.group),
)

if dataset:
q = q.join(CatalogDataset).filter(
CatalogDataset.name == dataset.lower()
)
years = [year] if isinstance(year, int) else (year or [None])
months = [month] if isinstance(month, int) else (month or [None])

if group:
q = q.join(DatasetGroup).filter(
DatasetGroup.name == group
records = []
for y in years:
for m in months:
records.extend(
await pysus.query(
client=client,
dataset=dataset,
group=group,
state=state,
year=y,
month=m,
)

if state:
q = q.filter(CatalogFile.state == state.upper())

years = [year] if isinstance(year, int) else (year or [])
months = (
[month] if isinstance(month, int) else (month or [])
)

if years:
q = q.filter(CatalogFile.year.in_(years))
if months:
q = q.filter(CatalogFile.month.in_(months))

results = q.all()
session.expunge_all()
return results

records = await to_thread.run_sync(_query)

return [
{
"name": r.path.split("/")[-1],
"path": r.path,
"name": str(r.path).split("/")[-1],
"path": str(r.path),
"dataset": r.dataset.name if r.dataset else None,
"group": r.group.name if r.group else None,
"year": r.year,
"month": r.month,
"state": r.state,
"modify": r.origin_modified,
"year": r.record.year,
"month": r.record.month,
"state": r.record.state,
"modify": r.record.origin_modified,
}
for r in records
]
Expand Down
2 changes: 2 additions & 0 deletions pysus/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ def get_completed_remote_paths(self) -> set[str]:

async def query(
self,
client: Literal["DadosGov", "FTP"] | None = None,
dataset: str | None = None,
group: str | None = None,
state: str | None = None,
Expand All @@ -426,6 +427,7 @@ async def query(
await self.get_ducklake()
if self._ducklake is not None:
return await self._ducklake.query(
client=client,
dataset=dataset,
group=group,
state=state,
Expand Down
29 changes: 27 additions & 2 deletions pysus/api/ducklake/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from collections.abc import Callable
from pathlib import Path
from typing import Any
from typing import Any, Literal

import boto3
import httpx
Expand Down Expand Up @@ -334,13 +334,14 @@ def _upload():

async def query(
self,
client: Literal["FTP", "DadosGov"] | None = None,
dataset: str | None = None,
group: str | None = None,
state: str | None = None,
year: int | None = None,
month: int | None = None,
) -> list[File]:
"""Query catalog files by dataset, group, state, year, and/or month."""
"""Filter catalog files by client, dataset, group, state, year."""
if not self._Session:
await self.connect()

Expand Down Expand Up @@ -380,6 +381,30 @@ def _query():
return results

records = await to_thread.run_sync(_query)

if client:
prefix = f"public/data/{client.lower()}/"
records = [r for r in records if r.path.startswith(prefix)]
else:
ftp = [r for r in records if r.path.startswith("public/data/ftp/")]
dadosgov = [
r for r in records if r.path.startswith("public/data/dadosgov/")
]
ftp_keys = set()
for r in ftp:
stem = Path(r.path).stem
key = (r.dataset_id, r.year, r.month, stem)
ftp_keys.add(key)

def has_ftp_match(r):
stem = Path(r.path).stem
if stem.endswith(".csv"):
stem = stem[:-4]
key = (r.dataset_id, r.year, r.month, stem)
return key in ftp_keys

records = ftp + [r for r in dadosgov if not has_ftp_match(r)]

return [
File(
path=r.path,
Expand Down
5 changes: 5 additions & 0 deletions pysus/management/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ async def upload(

if row:
dataset_id = row[0]
origin_val = "'FTP'" if is_ftp else "'API'"
cursor.execute(
f"UPDATE pysus.datasets SET origin = {origin_val} "
f"WHERE id = {dataset_id}"
)
else:
cursor.execute("SELECT MAX(id) FROM pysus.datasets")
max_id = cursor.fetchone()[0]
Expand Down
15 changes: 12 additions & 3 deletions pysus/tests/api/ducklake/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,29 @@ async def test_is_authenticated_false_no_credentials(self):

@pytest.mark.asyncio
async def test_is_authenticated_with_credentials(self):
from unittest.mock import patch

client = DuckLake()
await client.login(access_key="key", secret_key="secret")
with patch.object(client, "_load_catalog"):
await client.login(access_key="key", secret_key="secret")
assert client._is_authenticated is True

@pytest.mark.asyncio
async def test_login_sets_credentials(self):
from unittest.mock import patch

client = DuckLake()
await client.login(access_key="key", secret_key="secret")
with patch.object(client, "_load_catalog"):
await client.login(access_key="key", secret_key="secret")
assert client.credentials is not None

@pytest.mark.asyncio
async def test_login_creates_s3_client(self):
from unittest.mock import patch

client = DuckLake()
await client.login(access_key="key", secret_key="secret")
with patch.object(client, "_load_catalog"):
await client.login(access_key="key", secret_key="secret")
assert client._s3_client is not None
client._s3_client = None

Expand Down
3 changes: 3 additions & 0 deletions pysus/tests/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ async def test_query_with_dataset(self, test_db_path, tmp_path):
result = await client.query(dataset="sinan")

mock_ducklake.query.assert_called_once_with(
client=None,
dataset="sinan",
group=None,
state=None,
Expand All @@ -227,6 +228,7 @@ async def test_query_with_group(self, test_db_path):
await client.query(dataset="sinan", group="DENGUE")

mock_ducklake.query.assert_called_once_with(
client=None,
dataset="sinan",
group="DENGUE",
state=None,
Expand Down Expand Up @@ -258,6 +260,7 @@ async def test_query_with_all_params(self, test_db_path):
)

mock_ducklake.query.assert_called_once_with(
client=None,
dataset="sinasc",
group="DC",
state="SP",
Expand Down
Loading