Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 308 additions & 0 deletions add-optional-cache.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ basic operations.

Existing backends are listed below; more might come in the future.

See also :doc:`store_caching` for optional Store-level caching with a secondary backend.

posixfs
-------

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

installation
store
store_caching
backends
servers
changes
Expand Down
2 changes: 2 additions & 0 deletions docs/store.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ API can be much simpler:

Store operations (and per-op timing and volume) are logged at DEBUG log level.

See also :doc:`store_caching` for optional Store-level caching with a secondary backend.

Keys
----

Expand Down
49 changes: 49 additions & 0 deletions docs/store_caching.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
Store caching
=============

The ``Store`` can optionally use a second backend as a local cache for selected
namespaces, which is especially useful when the primary backend is remote
slower or otherwise more "expensive" than the cache.

Configuration
-------------

- ``cache_url`` or ``cache_backend``: where cached data is stored
- ``cache``: mapping of namespace to cache mode

Cache modes are configured with ``CacheMode`` or string aliases:

- ``CacheMode.C_OFF`` or ``"off"``: bypass cache completely.
- ``CacheMode.C_MIRROR`` or ``"mirror"``: always read from primary backend,
but update the cache after successful primary backend reads and writes.
- ``CacheMode.C_CACHE`` or ``"cache"``: read-through + write-through.
For now, only content-hash addressed namespaces should use this mode.

Behavior
--------

- Cache keys are identical to primary backend keys (same nesting).
- Soft-deleted items are cached under the same ``.del`` name as primary.
- Soft delete/undelete (``move(delete=True|undelete=True)``) renames cache
entries in lockstep with primary backend names.
- Cache failures are non-fatal and logged as warnings.

Limitations
-----------

- No cache eviction.
- No proactive cache validation/revalidation.
- If an object is deleted in the primary backend by another client, the local
cache will still have a stale object.

Statistics
----------

``Store.stats`` includes cache counters:

- ``cache_hits``
- ``cache_misses``
- ``cache_errors``
- ``cache_bytes_read``
- ``cache_bytes_written``
- ``cache_hit_ratio``
170 changes: 166 additions & 4 deletions src/borgstore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from binascii import hexlify
from collections import Counter
from contextlib import contextmanager
import enum
import logging
import os
import time
Expand All @@ -30,6 +31,23 @@
logger = logging.getLogger(__name__)


class CacheMode(enum.Enum):
C_OFF = "off"
C_CACHE = "cache"
C_MIRROR = "mirror"

@classmethod
def from_str(cls, value):
if isinstance(value, cls):
return value
if isinstance(value, str):
try:
return cls(value.lower())
except ValueError as err:
raise ValueError(f"unknown CacheMode: {value!r}") from err
raise ValueError(f"unknown CacheMode: {value!r}")


def get_backend(url, permissions=None, quota=None):
"""Parse backend URL and return a backend instance (or None)."""
backend = get_file_backend(url, permissions=permissions, quota=quota)
Expand Down Expand Up @@ -66,6 +84,10 @@ def __init__(
backend: Optional[BackendBase] = None,
levels: Optional[dict] = None,
permissions: Optional[dict] = None,
*,
cache: Optional[dict[str, CacheMode | str]] = None,
cache_url: Optional[str] = None,
cache_backend: Optional[BackendBase] = None,
):
self.url = url
if backend is None and url is not None:
Expand All @@ -76,15 +98,53 @@ def __init__(
raise NoBackendGiven("You need to give a backend instance or a backend url.")
self.backend = backend
self.set_levels(levels)
if cache_url is not None and cache_backend is not None:
raise ValueError("Only one of cache_url and cache_backend can be given.")
cache = cache or {}
normalized_cache = {namespace: CacheMode.from_str(mode) for namespace, mode in cache.items()}
has_enabled_namespaces = any(mode != CacheMode.C_OFF for mode in normalized_cache.values())
configured_namespaces = {namespace for namespace, _ in self.levels}
for namespace, mode in normalized_cache.items():
if mode != CacheMode.C_OFF and namespace not in configured_namespaces:
raise ValueError(f"Invalid cache namespace configuration: {namespace!r} not in levels.")
if has_enabled_namespaces and cache_url is None and cache_backend is None:
raise ValueError("cache_url or cache_backend is required for cache modes other than C_OFF.")
self.cache_backend = cache_backend if cache_backend is not None else None
if self.cache_backend is None and cache_url is not None:
self.cache_backend = get_backend(cache_url)
if self.cache_backend is None:
raise BackendURLInvalid(f"Invalid or unsupported Cache Backend URL: {cache_url}")
self._cache_disabled = False
self.cache = normalized_cache
self.cache_namespaces = [
entry
for entry in sorted(
((namespace, mode) for namespace, mode in normalized_cache.items() if mode != CacheMode.C_OFF),
key=lambda item: len(item[0]),
reverse=True,
)
]
self._stats: Counter = Counter()
# this is to emulate additional latency to what the backend actually offers:
self.latency = float(os.environ.get("BORGSTORE_LATENCY", "0")) / 1e6 # [us] -> [s]
# this is to emulate less bandwidth than what the backend actually offers:
self.bandwidth = float(os.environ.get("BORGSTORE_BANDWIDTH", "0")) / 8 # [bits/s] -> [bytes/s]

def __repr__(self):
if self.cache_backend is not None or self.cache_namespaces:
cache_backend = self.cache_backend.__class__.__name__ if self.cache_backend is not None else None
return (
f"<Store(url={self.url!r}, levels={self.levels!r}, "
f"cache_namespaces={self.cache_namespaces!r}, cache_backend={cache_backend!r})>"
)
return f"<Store(url={self.url!r}, levels={self.levels!r})>"

def _cache_mode_for(self, name: str) -> CacheMode:
for namespace, mode in self.cache_namespaces:
if name.startswith(namespace):
return mode
return CacheMode.C_OFF

def set_levels(self, levels: dict, create: bool = False) -> None:
if not levels or not isinstance(levels, dict):
raise ValueError("No or invalid levels configuration given.")
Expand All @@ -101,9 +161,16 @@ def create_levels(self):
for namespace, levels in self.levels:
namespace = namespace.rstrip("/")
level = max(levels)
cache_enabled = (
self.cache_backend is not None
and not self._cache_disabled
and self._cache_mode_for(f"{namespace}/") in {CacheMode.C_CACHE, CacheMode.C_MIRROR}
)
if level == 0:
# flat, we just need to create the namespace directory:
self.backend.mkdir(namespace)
if cache_enabled:
self.cache_backend.mkdir(namespace)
elif level > 0:
# nested, we only need to create the deepest nesting dir layer,
# any missing parent dirs will be created as needed by backend.mkdir.
Expand All @@ -113,16 +180,22 @@ def create_levels(self):
name = f"{namespace}/{dir}" if namespace else dir
nested_name = nest(name, level)
self.backend.mkdir(nested_name[: -2 * level - 1])
if cache_enabled:
self.cache_backend.mkdir(nested_name[: -2 * level - 1])
else:
raise ValueError(f"Invalid levels: {namespace}: {levels}")

def create(self) -> None:
self.backend.create()
if self.cache_backend is not None and not self._cache_disabled:
self.cache_backend.create()
if self.backend.precreate_dirs:
self.create_levels()

def destroy(self) -> None:
self.backend.destroy()
if self.cache_backend is not None:
self.cache_backend.destroy()

def __enter__(self):
self.open()
Expand All @@ -134,9 +207,20 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def open(self) -> None:
self.backend.open()
if self.cache_backend is not None and not self._cache_disabled:
try:
self.cache_backend.open()
except Exception as err:
logger.warning(f"borgstore: cache open failed, disabling cache: {err!r}")
self._cache_disabled = True

def close(self) -> None:
self.backend.close()
if self.cache_backend is not None:
try:
self.cache_backend.close()
except Exception as err:
logger.warning(f"borgstore: cache close failed: {err!r}")

def quota(self) -> dict:
return self.backend.quota()
Expand Down Expand Up @@ -190,9 +274,65 @@ def stats(self):
for key in "load", "store":
v = st.get(f"{key}_volume", 0)
t = st.get(f"{key}_time", 0)
st[f"{key}_throughput"] = v / t
st[f"{key}_throughput"] = v / t if t else 0
st["cache_hits"] = st.get("cache_hits", 0)
st["cache_misses"] = st.get("cache_misses", 0)
st["cache_errors"] = st.get("cache_errors", 0)
st["cache_bytes_read"] = st.get("cache_bytes_read", 0)
st["cache_bytes_written"] = st.get("cache_bytes_written", 0)
st["cache_disabled"] = self._cache_disabled
cache_total = st["cache_hits"] + st["cache_misses"]
st["cache_hit_ratio"] = st["cache_hits"] / cache_total if cache_total else 0
return st

def _cache_get(self, nested_name: str) -> Optional[bytes]:
if self.cache_backend is None or self._cache_disabled:
return None
try:
value = self.cache_backend.load(nested_name)
except ObjectNotFound:
self._stats["cache_misses"] += 1
return None
except Exception as err:
logger.warning(f"borgstore: cache load failed for {nested_name!r}: {err!r}")
self._stats["cache_errors"] += 1
return None
self._stats["cache_hits"] += 1
self._stats["cache_bytes_read"] += len(value)
return value

def _cache_put(self, nested_name: str, value: bytes) -> None:
if self.cache_backend is None or self._cache_disabled:
return
try:
self.cache_backend.store(nested_name, value)
self._stats["cache_bytes_written"] += len(value)
except Exception as err:
logger.warning(f"borgstore: cache store failed for {nested_name!r}: {err!r}")
self._stats["cache_errors"] += 1

def _cache_invalidate(self, nested_name: str) -> None:
if self.cache_backend is None or self._cache_disabled:
return
try:
self.cache_backend.delete(nested_name)
except ObjectNotFound:
pass
except Exception as err:
logger.warning(f"borgstore: cache delete failed for {nested_name!r}: {err!r}")
self._stats["cache_errors"] += 1

def _cache_move(self, old_nested: str, new_nested: str) -> None:
if self.cache_backend is None or self._cache_disabled:
return
try:
self.cache_backend.move(old_nested, new_nested)
except ObjectNotFound:
pass
except Exception as err:
logger.warning(f"borgstore: cache move failed for {old_nested!r}->{new_nested!r}: {err!r}")
self._stats["cache_errors"] += 1

def _get_levels(self, name):
"""Get levels from the configuration depending on the namespace."""
for namespace, levels in self.levels:
Expand Down Expand Up @@ -227,7 +367,21 @@ def info(self, name: str, *, deleted=False) -> ItemInfo:

def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes:
with self._stats_updater("load", f"load({name!r}, offset={offset}, size={size}, deleted={deleted})"):
result = self.backend.load(self.find(name, deleted=deleted), size=size, offset=offset)
mode = self._cache_mode_for(name)
nested_name = self.find(name, deleted=deleted)
if mode == CacheMode.C_CACHE:
full_value = self._cache_get(nested_name)
if full_value is None:
full_value = self.backend.load(nested_name, size=None, offset=0)
self._cache_put(nested_name, full_value)
elif mode == CacheMode.C_MIRROR:
full_value = self.backend.load(nested_name, size=None, offset=0)
self._cache_put(nested_name, full_value)
else:
result = self.backend.load(nested_name, size=size, offset=offset)
self._stats_update_volume("load", len(result))
return result
result = full_value[offset : (None if size is None else offset + size)]
self._stats_update_volume("load", len(result))
return result

Expand All @@ -236,7 +390,10 @@ def store(self, name: str, value: bytes) -> None:
# - overwrite an existing item (level stays same)
# - write to the last level if no existing item is found.
with self._stats_updater("store", f"store({name!r})"):
self.backend.store(self.find(name), value)
nested_name = self.find(name)
self.backend.store(nested_name, value)
if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}:
self._cache_put(nested_name, value)
self._stats_update_volume("store", len(value))

def hash(self, name: str, algorithm: str = "sha256", *, deleted: bool = False) -> str:
Expand All @@ -250,7 +407,10 @@ def delete(self, name: str, *, deleted=False) -> None:
See also .move(name, delete=True) for "soft" deletion.
"""
with self._stats_updater("delete", f"delete({name!r}, deleted={deleted})"):
self.backend.delete(self.find(name, deleted=deleted))
nested_name = self.find(name, deleted=deleted)
self.backend.delete(nested_name)
if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}:
self._cache_invalidate(nested_name)

def move(
self,
Expand Down Expand Up @@ -287,6 +447,8 @@ def move(
msg = f"rename({name!r}, {new_name!r}, deleted={deleted})"
with self._stats_updater("move", msg + f" [{nested_name!r}, {nested_new_name!r}]"):
self.backend.move(nested_name, nested_new_name)
if self._cache_mode_for(name) in {CacheMode.C_CACHE, CacheMode.C_MIRROR}:
self._cache_move(nested_name, nested_new_name)

def defrag(self, sources, *, target=None, algorithm=None, namespace=None, deleted=False) -> str:
"""
Expand Down
Loading
Loading