from __future__ import annotations
from typing import TYPE_CHECKING, Generic, TypeVar
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from types import TracebackType
from typing import Any, Self
from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.abc.store import Store
T_Store = TypeVar("T_Store", bound=Store)
[docs]
class WrapperStore(Store, Generic[T_Store]):
"""
Store that wraps an existing Store.
By default all of the store methods are delegated to the wrapped store instance, which is
accessible via the ``._store`` attribute of this class.
Use this class to modify or extend the behavior of the other store classes.
"""
_store: T_Store
def __init__(self, store: T_Store) -> None:
self._store = store
[docs]
@classmethod
async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
store = store_cls(*args, **kwargs)
await store._open()
return cls(store=store)
def __enter__(self) -> Self:
return type(self)(self._store.__enter__())
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
return self._store.__exit__(exc_type, exc_value, traceback)
async def _open(self) -> None:
await self._store._open()
async def _ensure_open(self) -> None:
await self._store._ensure_open()
[docs]
async def is_empty(self, prefix: str) -> bool:
return await self._store.is_empty(prefix)
@property
def _is_open(self) -> bool:
return self._store._is_open
@_is_open.setter
def _is_open(self, value: bool) -> None:
raise NotImplementedError("WrapperStore must be opened via the `_open` method")
[docs]
async def clear(self) -> None:
return await self._store.clear()
@property
def read_only(self) -> bool:
return self._store.read_only
def _check_writable(self) -> None:
return self._store._check_writable()
def __eq__(self, value: object) -> bool:
return type(self) is type(value) and self._store.__eq__(value._store) # type: ignore[attr-defined]
def __str__(self) -> str:
return f"wrapping-{self._store}"
def __repr__(self) -> str:
return f"WrapperStore({self._store.__class__.__name__}, '{self._store}')"
[docs]
async def get(
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
return await self._store.get(key, prototype, byte_range)
[docs]
async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
return await self._store.get_partial_values(prototype, key_ranges)
[docs]
async def exists(self, key: str) -> bool:
return await self._store.exists(key)
[docs]
async def set(self, key: str, value: Buffer) -> None:
await self._store.set(key, value)
[docs]
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
return await self._store.set_if_not_exists(key, value)
async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
await self._store._set_many(values)
@property
def supports_writes(self) -> bool:
return self._store.supports_writes
@property
def supports_deletes(self) -> bool:
return self._store.supports_deletes
[docs]
async def delete(self, key: str) -> None:
await self._store.delete(key)
@property
def supports_listing(self) -> bool:
return self._store.supports_listing
[docs]
def list(self) -> AsyncIterator[str]:
return self._store.list()
[docs]
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
return self._store.list_prefix(prefix)
[docs]
def list_dir(self, prefix: str) -> AsyncIterator[str]:
return self._store.list_dir(prefix)
[docs]
async def delete_dir(self, prefix: str) -> None:
return await self._store.delete_dir(prefix)
[docs]
def close(self) -> None:
self._store.close()
async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
async for req in self._store._get_many(requests):
yield req