Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 249 additions & 24 deletions malariagen_data/plasmodium.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
import xarray
import zarr
import ipyleaflet

from malariagen_data.util import (
DIM_ALLELE,
Expand Down Expand Up @@ -50,20 +51,246 @@ def _load_config(self, data_config):
config_content = json.load(json_conf)
return config_content

def sample_metadata(self):
def sample_metadata(
self,
sample_query=None,
sample_query_options=None,
):
"""Access sample metadata and return as pandas dataframe.

Parameters
----------
sample_query : str or None, optional
A pandas query string to filter samples, e.g.
``"QC pass == True"`` or ``"Country == 'Ghana'"``.
Column names with spaces are supported without backticks.
If None, all samples are returned.
sample_query_options : dict or None, optional
A dictionary of arguments passed through to pandas query(),
e.g. ``parser``, ``engine``, ``local_dict``.

Returns
-------
df : pandas.DataFrame
A dataframe of sample metadata on the samples that were sequenced as part of this resource.
Includes the time and place of collection, quality metrics, and accesion numbers.
One row per sample.
A dataframe of sample metadata, one row per sample.
"""
if self._cache_sample_metadata is None:
path = os.path.join(self._path, self.CONF["metadata_path"])
with self._fs.open(path) as f:
self._cache_sample_metadata = pd.read_csv(f, sep="\t", na_values="")
return self._cache_sample_metadata

df = self._cache_sample_metadata

if sample_query is not None:
sample_query_options = sample_query_options or {}
sample_query_options.setdefault("engine", "python")
df = df.query(sample_query, **sample_query_options)
df = df.reset_index(drop=True)

return df.copy()

def plot_samples_interactive_map(
self,
sample_query=None,
sample_query_options=None,
basemap="mapnik",
center=(-2, 20),
zoom=3,
height=500,
width="100%",
min_samples=1,
count_by="Population",
):
"""Plot an interactive map showing sampling locations using ipyleaflet.

One marker is shown per unique first-level administrative division.
Hovering over a marker shows the location name, country, years
collected, and sample counts broken down by the ``count_by`` column
(default ``"Population"``).

Parameters
----------
sample_query : str or None, optional
A pandas query string to filter samples before plotting,
e.g. ``"QC pass == True"`` or ``"Year > 2015"``.
If None, all samples are shown.
sample_query_options : dict or None, optional
A dictionary of arguments passed through to pandas query(),
e.g. ``parser``, ``engine``, ``local_dict``.
basemap : str or dict or TileLayer or TileProvider, optional
Basemap from ipyleaflet or other TileLayer provider. Strings are
abbreviations mapped to corresponding basemaps, available values
are ['mapnik', 'natgeoworldmap', 'opentopomap', 'positron',
'satellite', 'worldimagery', 'worldstreetmap', 'worldtopomap'].
Defaults to 'mapnik'.
center : tuple of (int or float, int or float), optional
Location to center the map. Defaults to (-2, 20).
zoom : int or float, optional
Initial zoom level. Defaults to 3.
height : int or str, optional
Height of the map in pixels or other CSS units. Defaults to 500.
width : int or str, optional
Width of the map in pixels or other CSS units. Defaults to '100%'.
min_samples : int, optional
Minimum number of samples required to show a marker for a given
location. Defaults to 1.
count_by : str, optional
Metadata column to report counts of samples by for each location.
Defaults to ``"Population"``.

Returns
-------
ipyleaflet.Map
"""
if isinstance(height, int):
height = f"{height}px"
if isinstance(width, int):
width = f"{width}px"

df = self.sample_metadata(
sample_query=sample_query,
sample_query_options=sample_query_options,
)
_column_candidates = {
"admin_div": ["Admin level 1", "First-level administrative division"],
"lat": ["Admin level 1 latitude", "Lat"],
"lon": ["Admin level 1 longitude", "Long"],
}

def _resolve_col(key, candidates):
for c in candidates:
if c in df.columns:
return c
raise ValueError(
f"Cannot find a column for {key!r}; tried {candidates}. "
f"Available columns: {sorted(df.columns.tolist())}"
)

col_admin = _resolve_col("admin_div", _column_candidates["admin_div"])
col_lat = _resolve_col("lat", _column_candidates["lat"])
col_lon = _resolve_col("lon", _column_candidates["lon"])

location_key = ["Country", col_admin, col_lat, col_lon]

# Validate count_by column exists
if count_by not in df.columns:
raise ValueError(
f"{count_by!r} is not a column in the sample metadata. "
f"Available columns: {sorted(df.columns.tolist())}"
)

# Build pivot: one row per location, one column per count_by value
df_pivot = df.pivot_table(
index=location_key,
columns=count_by,
values="Sample",
aggfunc="count",
fill_value=0,
observed=True,
)
df_pivot = df_pivot.reset_index()

# Aggregate year and study info per location for tooltip
df_aggs = (
df.groupby(location_key)
.agg(
{
"Year": lambda x: ", ".join(
str(int(y)) for y in sorted(x.dropna().unique())
),
"Study": lambda x: ", ".join(
str(s) for s in sorted(x.dropna().unique())
),
}
)
.reset_index()
)

df_pivot = df_pivot.merge(df_aggs, on=location_key)

# Unique count_by values for building the tooltip breakdown
count_factors = sorted(df[count_by].dropna().unique())

# Basemap abbreviations — inlined from anoph/map_params.py
_basemap_abbrev_candidates = {
"mapnik": lambda: ipyleaflet.basemaps.OpenStreetMap.Mapnik,
"natgeoworldmap": lambda: ipyleaflet.basemaps.Esri.NatGeoWorldMap,
"opentopomap": lambda: ipyleaflet.basemaps.OpenTopoMap,
"positron": lambda: ipyleaflet.basemaps.CartoDB.Positron,
"satellite": lambda: ipyleaflet.basemaps.Gaode.Satellite,
"worldimagery": lambda: ipyleaflet.basemaps.Esri.WorldImagery,
"worldstreetmap": lambda: ipyleaflet.basemaps.Esri.WorldStreetMap,
"worldtopomap": lambda: ipyleaflet.basemaps.Esri.WorldTopoMap,
}

# Resolve basemap — mirrors mosquito basemap handling exactly
if isinstance(basemap, str):
basemap_str = basemap.lower()
if basemap_str not in _basemap_abbrev_candidates:
raise ValueError(
f"Basemap abbreviation not recognised: {basemap_str!r}; "
f"try one of {list(_basemap_abbrev_candidates.keys())}"
)
try:
basemap_provider = _basemap_abbrev_candidates[basemap_str]()
except Exception:
# Fall back to mapnik if provider is unavailable
basemap_provider = ipyleaflet.basemaps.OpenStreetMap.Mapnik
elif basemap is None:
basemap_provider = ipyleaflet.basemaps.OpenStreetMap.Mapnik
else:
# User passed a TileLayer / TileProvider / dict directly
basemap_provider = basemap

# Create map
samples_map = ipyleaflet.Map(
center=center,
zoom=zoom,
basemap=basemap_provider,
)
samples_map.add(ipyleaflet.ScaleControl(position="bottomleft"))
samples_map.layout.height = height
samples_map.layout.width = width

# Add one marker per location
for _, row in df_pivot.iterrows():
lat = row["Admin level 1 latitude"]
lon = row["Admin level 1 longitude"]

# Skip rows with missing coordinates
if pd.isna(lat) or pd.isna(lon):
continue

# Build tooltip — mirrors mosquito API style
title = f"Admin level 1: {row['Admin level 1']}"
title += f"\nCountry: {row['Country']}"
title += f"\nYears: {row['Year']}"
title += f"\nStudies: {row['Study']}"

total = 0
breakdown = []
for factor in count_factors:
if factor in row:
n = row[factor]
total += n
if n > 0:
breakdown.append(f"{n} {factor}")

# Respect min_samples threshold
if total < min_samples:
continue

title += f"\nTotal samples: {total}"
title += f"\nBy {count_by}: " + "; ".join(breakdown)

marker = ipyleaflet.Marker(
location=(lat, lon),
draggable=False,
title=title,
)
samples_map.add(marker)

return samples_map

def _open_variant_calls_zarr(self):
"""Open variant calls zarr.
Expand Down Expand Up @@ -274,29 +501,27 @@ def genome_sequence(self, region="*", inline_array=True, chunks="native"):

"""
genome = self.open_genome()
if type(region) not in [tuple, list] and region != "*" and region is not None:
d = self._subset_genome_sequence_region(

if region == "*" or region is None:
regions = self.contigs
elif isinstance(region, (list, tuple)):
regions = list(region)
else:
regions = [region]

sequences = [
self._subset_genome_sequence_region(
genome=genome,
region=region,
region=r,
inline_array=inline_array,
chunks=chunks,
)
else:
region = tuple(region)
if region == tuple("*"):
region = self.contigs
d = da.concatenate(
[
self._subset_genome_sequence_region(
genome=genome,
region=r,
inline_array=inline_array,
chunks=chunks,
)
for r in region
]
)
return d
for r in regions
]

if len(sequences) == 1:
return sequences[0]
return da.concatenate(sequences)

def genome_features(self, attributes=("ID", "Parent", "Name")):
"""Access genome feature annotations.
Expand Down