From 21b0f2594abdb86015b98c22a22bb8837b39a6b5 Mon Sep 17 00:00:00 2001 From: Tony Burns Date: Fri, 2 Jan 2026 01:07:55 -0500 Subject: [PATCH] feat: add dictionary-like access to Table and Transaction Add support for dictionary operations on tables and transactions: - `table[key]`, `table[key] = record`, `del table[key]` - `pop()`, `popitem()`, `setdefault()`, `update()` Consolidate ReadableMixin into TableMixin which provides the full interface for both Table and Transaction classes. Register TableMixin as a virtual MutableMapping subclass to maintain JSONLT's sorted iteration semantics while passing isinstance checks. --- CHANGELOG.md | 4 + README.md | 32 ++- src/jsonlt/_mixin.py | 473 +++++++++++++++++++++++++++++++++ src/jsonlt/_readable.py | 241 ----------------- src/jsonlt/_table.py | 18 +- src/jsonlt/_transaction.py | 15 +- tests/unit/test_table.py | 204 +++++++++++++- tests/unit/test_transaction.py | 223 +++++++++++++++- 8 files changed, 952 insertions(+), 258 deletions(-) create mode 100644 src/jsonlt/_mixin.py delete mode 100644 src/jsonlt/_readable.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2af455f..a73d724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [PEP 440](https://peps.python.org/pep-0440/). ## [Unreleased] +### Added + +- Dictionary-like access for `Table` and `Transaction` (`table[key]`, `table[key] = record`, `del table[key]`, `pop`, `popitem`, `setdefault`, `update`) + ## [0.1.0] - 2025-12-31 ### Added diff --git a/README.md b/README.md index 0b8edfc..9b9658f 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,34 @@ except ConflictError as e: print(f"Conflict on key: {e.key}") ``` +## Dictionary-like access + +Tables can be used like dictionaries: + +```python +# Get a record (raises KeyError if not found) +user = table["alice"] + +# Set a record (key in record must match) +table["alice"] = {"id": "alice", "role": "admin"} + +# Delete a record +del table["bob"] + +# Check membership +if "alice" in table: + print("Found alice") + +# Iterate over keys +for key in table: + print(key) + +# Get record count +print(len(table)) +``` + +Methods like `pop()`, `setdefault()`, and `update()` also work. The `keys()`, `values()`, and `items()` methods return sorted lists rather than views to maintain JSONLT's deterministic key ordering. + ## Finding records ```python @@ -171,7 +199,7 @@ table.reload() | `clear()` | Remove all records | | `reload()` | Reload from disk | -The `Table` class also supports `len(table)`, `key in table`, and `for record in table`. +Tables support `table[key]`, `table[key] = record`, `del table[key]`, `len(table)`, `key in table`, and `for key in table`. ### Transaction @@ -184,6 +212,8 @@ The `Table` class also supports `len(table)`, `key in table`, and `for record in | `commit()` | Write to disk | | `abort()` | Discard changes | +Transactions support the same dictionary-like access as tables. + ### Exceptions All exceptions inherit from `JSONLTError`: diff --git a/src/jsonlt/_mixin.py b/src/jsonlt/_mixin.py new file mode 100644 index 0000000..b054841 --- /dev/null +++ b/src/jsonlt/_mixin.py @@ -0,0 +1,473 @@ +"""Mixin class providing full table interface. + +This module provides TableMixin, an abstract base class that implements +all read operations, write operation signatures, and MutableMapping +interface for both Table and Transaction classes. +""" + +from abc import ABC, ABCMeta, abstractmethod +from collections.abc import Iterable, Iterator, Mapping, MutableMapping +from functools import cmp_to_key +from typing import TYPE_CHECKING, ClassVar, TypeGuard, cast, overload + +from ._exceptions import InvalidKeyError +from ._keys import Key, compare_keys +from ._records import extract_key + +if TYPE_CHECKING: + from collections.abc import Callable + + from ._json import JSONObject + from ._keys import KeySpecifier + +__all__ = ["TableMixin"] + + +class TableMixin(ABC): + """Mixin providing full table interface including MutableMapping. + + Subclasses must implement: + - _get_state(): Returns the dict[Key, JSONObject] to read from + - _prepare_read(): Called before each public read operation + - _get_key_specifier(): Return the key specifier + - put(record): Insert or update a record + - delete(key): Delete a record by key + + Subclasses must also have a `_cached_sorted_keys: list[Key] | None` slot. + """ + + __slots__: ClassVar[tuple[str, ...]] = () + + # Subclasses must have this as a slot attribute + _cached_sorted_keys: list[Key] | None + + @abstractmethod + def _get_state(self) -> "dict[Key, JSONObject]": + """Return the state dictionary to read from.""" + ... + + @abstractmethod + def _prepare_read(self) -> None: + """Perform any required setup before a read operation.""" + ... + + @abstractmethod + def _get_key_specifier(self) -> "KeySpecifier": + """Return the key specifier for this table. + + Returns: + The key specifier. + + Raises: + InvalidKeyError: If no key specifier is set. + """ + ... + + @abstractmethod + def put(self, record: "JSONObject") -> None: + """Insert or update a record. + + Args: + record: The record to insert/update. Must contain key fields. + + Raises: + InvalidKeyError: If record is missing key fields, has invalid key + values, or contains $-prefixed fields. + LimitError: If key or record size exceeds limits. + """ + ... + + @abstractmethod + def delete(self, key: Key) -> bool: + """Delete a record by key. Returns whether it existed. + + Args: + key: The key to delete. + + Returns: + True if the key existed, False otherwise. + + Raises: + InvalidKeyError: If key arity doesn't match specifier. + """ + ... + + def _sorted_keys(self) -> list[Key]: + """Return keys sorted by JSONLT key ordering.""" + if self._cached_sorted_keys is None: + self._cached_sorted_keys = sorted( + self._get_state().keys(), key=cmp_to_key(compare_keys) + ) + return self._cached_sorted_keys + + def _sorted_records(self) -> "list[JSONObject]": + """Return records sorted by key order.""" + state = self._get_state() + return [state[k] for k in self._sorted_keys()] + + @staticmethod + def _validate_key(key: Key) -> None: + """Validate that a key is not an empty tuple. + + Args: + key: The key to validate. + + Raises: + InvalidKeyError: If the key is an empty tuple. + """ + if isinstance(key, tuple) and len(key) == 0: + msg = "empty tuple is not a valid key" + raise InvalidKeyError(msg) + + @staticmethod + def _is_valid_tuple_key( + key: tuple[object, ...], + ) -> "TypeGuard[tuple[str | int, ...]]": + """Check if a tuple is a valid Key tuple (all elements are str or int).""" + return all(isinstance(k, (str, int)) for k in key) + + @overload + def get(self, key: Key) -> "JSONObject | None": ... # pragma: no cover + + @overload + def get( + self, key: Key, default: "JSONObject" + ) -> "JSONObject": ... # pragma: no cover + + @overload + def get( + self, key: Key, default: None + ) -> "JSONObject | None": ... # pragma: no cover + + def get(self, key: Key, default: "JSONObject | None" = None) -> "JSONObject | None": + """Get a record by key. + + Args: + key: The key to look up. + default: Value to return if key not found. Defaults to None. + + Returns: + The record if found, otherwise the default value. + + Raises: + InvalidKeyError: If the key is an empty tuple. + """ + self._validate_key(key) + self._prepare_read() + result = self._get_state().get(key) + if result is None: + return default + return result + + def has(self, key: Key) -> bool: + """Check if a key exists. + + Args: + key: The key to check. + + Returns: + True if the key exists, False otherwise. + + Raises: + InvalidKeyError: If the key is an empty tuple. + """ + self._validate_key(key) + self._prepare_read() + return key in self._get_state() + + def all(self) -> "list[JSONObject]": + """Get all records in key order. + + Returns: + A list of all records, sorted by key. + """ + self._prepare_read() + return self._sorted_records() + + def keys(self) -> list[Key]: + """Get all keys in key order. + + Returns: + A list of all keys, sorted. + """ + self._prepare_read() + return self._sorted_keys() + + def values(self) -> "list[JSONObject]": + """Get all records in key order. + + Returns: + A list of all records, sorted by key. + """ + self._prepare_read() + state = self._get_state() + return [state[k] for k in self._sorted_keys()] + + def items(self) -> "list[tuple[Key, JSONObject]]": + """Get all key-value pairs in key order. + + Returns: + A list of (key, record) tuples, sorted by key. + """ + self._prepare_read() + state = self._get_state() + return [(k, state[k]) for k in self._sorted_keys()] + + def count(self) -> int: + """Get the number of records. + + Returns: + The number of records. + """ + self._prepare_read() + return len(self._get_state()) + + @overload + def find( + self, + predicate: "Callable[[JSONObject], bool]", + ) -> "list[JSONObject]": ... # pragma: no cover + + @overload + def find( + self, + predicate: "Callable[[JSONObject], bool]", + *, + limit: int, + ) -> "list[JSONObject]": ... # pragma: no cover + + def find( + self, + predicate: "Callable[[JSONObject], bool]", + *, + limit: "int | None" = None, + ) -> "list[JSONObject]": + """Find records matching a predicate. + + Records are returned in key order. + + Args: + predicate: A function that takes a record and returns True if + it should be included. + limit: Maximum number of records to return. + + Returns: + A list of matching records, in key order. + """ + self._prepare_read() + results: list[JSONObject] = [] + for record in self._sorted_records(): + if predicate(record): + results.append(record) + if limit is not None and len(results) >= limit: + break + return results + + def find_one( + self, + predicate: "Callable[[JSONObject], bool]", + ) -> "JSONObject | None": + """Find the first record matching a predicate. + + Records are checked in key order. + + Args: + predicate: A function that takes a record and returns True. + + Returns: + The first matching record, or None if no match. + """ + self._prepare_read() + for record in self._sorted_records(): + if predicate(record): + return record + return None + + def __getitem__(self, key: Key) -> "JSONObject": + """Get a record by key. + + Args: + key: The key to look up. + + Returns: + The record associated with the key. + + Raises: + KeyError: If the key does not exist. + InvalidKeyError: If the key is an empty tuple. + """ + self._validate_key(key) + self._prepare_read() + state = self._get_state() + if key not in state: + raise KeyError(key) + return state[key] + + def __contains__(self, key: object) -> bool: + """Check if a key exists. + + Args: + key: The key to check. Must be a valid Key type. + + Returns: + True if the key exists, False otherwise. + """ + self._prepare_read() + state = self._get_state() + if isinstance(key, str): + return key in state + if isinstance(key, int): + return key in state + if isinstance(key, tuple): + tuple_key = cast("tuple[object, ...]", key) + if self._is_valid_tuple_key(tuple_key): + return tuple_key in state + return False + + def __setitem__(self, key: Key, value: "JSONObject") -> None: + """Store a record. + + The key parameter must match the key extracted from the record. + This ensures consistency between the lookup key and the record's + actual key fields. + + Args: + key: The key for the record. + value: The record to store. Must contain key fields. + + Raises: + InvalidKeyError: If the key does not match the record's key, + or if the record is invalid. + """ + key_specifier = self._get_key_specifier() + extracted_key = extract_key(value, key_specifier) + if extracted_key != key: + msg = ( + f"key mismatch: provided key {key!r} does not match " + f"record key {extracted_key!r}" + ) + raise InvalidKeyError(msg) + self.put(value) + + def __delitem__(self, key: Key) -> None: + """Delete a record by key. + + Args: + key: The key to delete. + + Raises: + KeyError: If the key does not exist. + """ + existed = self.delete(key) + if not existed: + raise KeyError(key) + + def __iter__(self) -> "Iterator[Key]": + """Iterate over keys in sorted order. + + Returns: + An iterator over all keys. + """ + return iter(self._sorted_keys()) + + def __len__(self) -> int: + """Return the number of records. + + Returns: + The count of records. + """ + return len(self._get_state()) + + def pop(self, key: Key, *args: "JSONObject") -> "JSONObject": + """Remove and return record for key. + + Args: + key: The key to remove. + *args: Optional default value if key not found. + + Returns: + The removed record, or default if provided and key not found. + + Raises: + KeyError: If key not found and no default provided. + """ + if len(args) > 1: + msg = f"pop expected at most 2 arguments, got {1 + len(args)}" + raise TypeError(msg) + try: + value = self[key] + except KeyError: + if args: + return args[0] + raise + del self[key] + return value + + def popitem(self) -> "tuple[Key, JSONObject]": + """Remove and return an arbitrary (key, record) pair. + + Returns: + A (key, record) tuple. + + Raises: + KeyError: If the table is empty. + """ + try: + key = next(iter(self)) + except StopIteration: + msg = "popitem(): table is empty" + raise KeyError(msg) from None + value = self[key] + del self[key] + return key, value + + def setdefault(self, key: Key, default: "JSONObject") -> "JSONObject": + """Get record for key, setting it to default if not present. + + Note: Unlike dict.setdefault(), a default value is required because + JSONLT records must contain key fields. + + Args: + key: The key to look up. + default: The record to insert if key not found. Must contain key fields. + + Returns: + The existing record if found, otherwise the default (after insertion). + + Raises: + InvalidKeyError: If default record's key doesn't match the provided key. + """ + try: + return self[key] + except KeyError: + self[key] = default + return default + + def update( + self, + other: ( + "Mapping[Key, JSONObject] | Iterable[tuple[Key, JSONObject]] | None" + ) = None, + /, + **kwargs: "JSONObject", + ) -> None: + """Update table from mapping/iterable and/or keyword arguments. + + Args: + other: A mapping or iterable of (key, record) pairs. + **kwargs: Additional key=record pairs (keys must be strings). + """ + if other is not None: + if hasattr(other, "keys"): + mapping = cast("Mapping[Key, JSONObject]", other) + for key in mapping: + self[key] = mapping[key] + else: + iterable = cast("Iterable[tuple[Key, JSONObject]]", other) + for key, value in iterable: + self[key] = value + for str_key, value in kwargs.items(): + self[str_key] = value + + +_ = cast("ABCMeta", cast("object", MutableMapping)).register(TableMixin) diff --git a/src/jsonlt/_readable.py b/src/jsonlt/_readable.py deleted file mode 100644 index 4de93dd..0000000 --- a/src/jsonlt/_readable.py +++ /dev/null @@ -1,241 +0,0 @@ -"""Mixin class for readable table-like objects. - -This module provides ReadableMixin, an abstract base class that implements -read operations for both Table and Transaction classes. -""" - -from abc import ABC, abstractmethod -from functools import cmp_to_key -from typing import TYPE_CHECKING, ClassVar, TypeGuard, cast, overload - -from ._keys import Key, compare_keys - -if TYPE_CHECKING: - from collections.abc import Callable, Iterator - - from ._json import JSONObject - - -class ReadableMixin(ABC): - """Abstract mixin providing read operations for table-like objects. - - Subclasses must implement: - - _get_state(): Returns the dict[Key, JSONObject] to read from - - _prepare_read(): Called before each public read operation - - Subclasses must also have a `_cached_sorted_keys: list[Key] | None` slot. - """ - - __slots__: ClassVar[tuple[str, ...]] = () - - # --- Abstract methods for subclasses --- - - @abstractmethod - def _get_state(self) -> "dict[Key, JSONObject]": - """Return the state dictionary to read from.""" - ... - - @abstractmethod - def _prepare_read(self) -> None: - """Perform any required setup before a read operation.""" - ... - - # Subclasses must have this as a slot attribute - _cached_sorted_keys: list[Key] | None - - # --- Private helpers --- - - def _sorted_keys(self) -> list[Key]: - """Return keys sorted by JSONLT key ordering.""" - if self._cached_sorted_keys is None: - self._cached_sorted_keys = sorted( - self._get_state().keys(), key=cmp_to_key(compare_keys) - ) - return self._cached_sorted_keys - - def _sorted_records(self) -> "list[JSONObject]": - """Return records sorted by key order.""" - state = self._get_state() - return [state[k] for k in self._sorted_keys()] - - @staticmethod - def _is_valid_tuple_key( - key: tuple[object, ...], - ) -> "TypeGuard[tuple[str | int, ...]]": - """Check if a tuple is a valid Key tuple (all elements are str or int).""" - return all(isinstance(k, (str, int)) for k in key) - - @staticmethod - def _validate_key(key: Key) -> None: - """Validate that a key is not an empty tuple. - - Args: - key: The key to validate. - - Raises: - InvalidKeyError: If the key is an empty tuple. - """ - if isinstance(key, tuple) and len(key) == 0: - from ._exceptions import InvalidKeyError # noqa: PLC0415 - - msg = "empty tuple is not a valid key" - raise InvalidKeyError(msg) - - # --- Read methods --- - - def get(self, key: Key) -> "JSONObject | None": - """Get a record by key. - - Args: - key: The key to look up. - - Returns: - The record if found, None otherwise. - - Raises: - InvalidKeyError: If the key is an empty tuple. - """ - self._validate_key(key) - self._prepare_read() - return self._get_state().get(key) - - def has(self, key: Key) -> bool: - """Check if a key exists. - - Args: - key: The key to check. - - Returns: - True if the key exists, False otherwise. - - Raises: - InvalidKeyError: If the key is an empty tuple. - """ - self._validate_key(key) - self._prepare_read() - return key in self._get_state() - - def all(self) -> "list[JSONObject]": - """Get all records in key order. - - Returns: - A list of all records, sorted by key. - """ - self._prepare_read() - return self._sorted_records() - - def keys(self) -> list[Key]: - """Get all keys in key order. - - Returns: - A list of all keys, sorted. - """ - self._prepare_read() - return self._sorted_keys() - - def items(self) -> "list[tuple[Key, JSONObject]]": - """Get all key-value pairs in key order. - - Returns: - A list of (key, record) tuples, sorted by key. - """ - self._prepare_read() - state = self._get_state() - return [(k, state[k]) for k in self._sorted_keys()] - - def count(self) -> int: - """Get the number of records. - - Returns: - The number of records. - """ - self._prepare_read() - return len(self._get_state()) - - def __len__(self) -> int: - """Return the number of records.""" - return self.count() - - def __contains__(self, key: object) -> bool: - """Check if a key exists. - - Args: - key: The key to check. Must be a valid Key type. - - Returns: - True if the key exists, False otherwise. - """ - if isinstance(key, str): - return self.has(key) - if isinstance(key, int): - return self.has(key) - if isinstance(key, tuple): - tuple_key = cast("tuple[object, ...]", key) - if self._is_valid_tuple_key(tuple_key): - return self.has(tuple_key) - return False - - def __iter__(self) -> "Iterator[JSONObject]": - """Iterate over all records in key order.""" - yield from self.all() - - @overload - def find( - self, - predicate: "Callable[[JSONObject], bool]", - ) -> "list[JSONObject]": ... # pragma: no cover - - @overload - def find( - self, - predicate: "Callable[[JSONObject], bool]", - *, - limit: int, - ) -> "list[JSONObject]": ... # pragma: no cover - - def find( - self, - predicate: "Callable[[JSONObject], bool]", - *, - limit: "int | None" = None, - ) -> "list[JSONObject]": - """Find records matching a predicate. - - Records are returned in key order. - - Args: - predicate: A function that takes a record and returns True if - it should be included. - limit: Maximum number of records to return. - - Returns: - A list of matching records, in key order. - """ - self._prepare_read() - results: list[JSONObject] = [] - for record in self._sorted_records(): - if predicate(record): - results.append(record) - if limit is not None and len(results) >= limit: - break - return results - - def find_one( - self, - predicate: "Callable[[JSONObject], bool]", - ) -> "JSONObject | None": - """Find the first record matching a predicate. - - Records are checked in key order. - - Args: - predicate: A function that takes a record and returns True. - - Returns: - The first matching record, or None if no match. - """ - self._prepare_read() - for record in self._sorted_records(): - if predicate(record): - return record - return None diff --git a/src/jsonlt/_table.py b/src/jsonlt/_table.py index 38e7bd3..f9a0eeb 100644 --- a/src/jsonlt/_table.py +++ b/src/jsonlt/_table.py @@ -30,7 +30,7 @@ validate_key_arity, validate_key_length, ) -from ._readable import ReadableMixin +from ._mixin import TableMixin from ._reader import parse_table_content from ._records import build_tombstone, extract_key, validate_record from ._state import compute_logical_state @@ -44,7 +44,7 @@ __all__ = ["Table"] -class Table(ReadableMixin): +class Table(TableMixin): """A JSONLT table backed by a file. The Table class provides the primary interface for working with JSONLT @@ -564,6 +564,19 @@ def _require_key_specifier(self) -> KeySpecifier: raise InvalidKeyError(msg) return self._key_specifier + @override + def _get_key_specifier(self) -> KeySpecifier: + """Return the key specifier for this table. + + Returns: + The key specifier. + + Raises: + InvalidKeyError: If no key specifier is set. + """ + return self._require_key_specifier() + + @override def put(self, record: "JSONObject") -> None: """Insert or update a record. @@ -663,6 +676,7 @@ def _write_with_lock( raise FileError(msg) from None return self._write_with_lock(line, key, record, _retries=_retries + 1) + @override def delete(self, key: Key) -> bool: """Delete a record by key. diff --git a/src/jsonlt/_transaction.py b/src/jsonlt/_transaction.py index 986e3ab..859a2bc 100644 --- a/src/jsonlt/_transaction.py +++ b/src/jsonlt/_transaction.py @@ -14,7 +14,7 @@ from ._exceptions import LimitError, TransactionError from ._json import serialize_json, utf8_byte_length from ._keys import Key, KeySpecifier, validate_key_arity, validate_key_length -from ._readable import ReadableMixin +from ._mixin import TableMixin from ._records import build_tombstone, extract_key, validate_record if TYPE_CHECKING: @@ -22,7 +22,7 @@ from ._table import Table -class Transaction(ReadableMixin): +class Transaction(TableMixin): """A transaction for atomic operations on a JSONLT table. Transactions provide snapshot isolation: reads see a consistent snapshot @@ -131,6 +131,16 @@ def _prepare_read(self) -> None: """Ensure the transaction is still active.""" self._require_active() + @override + def _get_key_specifier(self) -> KeySpecifier: + """Return the key specifier for this transaction. + + Returns: + The key specifier. + """ + return self._key_specifier + + @override def put(self, record: "JSONObject") -> None: """Insert or update a record in the transaction. @@ -172,6 +182,7 @@ def put(self, record: "JSONObject") -> None: self._snapshot[key] = record_copy self._cached_sorted_keys = None + @override def delete(self, key: Key) -> bool: """Delete a record by key in the transaction. diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 7e33cf6..fe9b42b 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1,4 +1,5 @@ import time +from collections.abc import MutableMapping from typing import TYPE_CHECKING import pytest @@ -9,6 +10,9 @@ from collections.abc import Callable from pathlib import Path + from jsonlt._json import JSONObject + from jsonlt._keys import Key + from tests.fakes.fake_filesystem import FakeFileSystem @@ -966,18 +970,18 @@ def test_contains_with_invalid_tuple_returns_false(self, tmp_path: "Path") -> No assert (1, 3.14) not in table assert (None, "x") not in table - def test_iter_yields_records_in_key_order(self, tmp_path: "Path") -> None: + def test_iter_yields_keys_in_key_order(self, tmp_path: "Path") -> None: table_path = tmp_path / "test.jsonlt" # Write in reverse order _ = table_path.write_text('{"id": "c"}\n{"id": "a"}\n{"id": "b"}\n') table = Table(table_path, key="id") - records = list(table) + keys = list(table) - assert len(records) == 3 - assert records[0] == {"id": "a"} - assert records[1] == {"id": "b"} - assert records[2] == {"id": "c"} + assert len(keys) == 3 + assert keys[0] == "a" + assert keys[1] == "b" + assert keys[2] == "c" def test_iter_on_empty_table(self, make_table: "Callable[..., Table]") -> None: table = make_table() @@ -1257,3 +1261,191 @@ def test_compact_recreates_deleted_file( table.compact() assert table_path in fake_fs.files + + +class TestTableMutableMapping: + def test_getitem_existing_key(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + result = table["alice"] + + assert result == {"id": "alice", "role": "admin"} + + def test_getitem_missing_key_raises_keyerror( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(KeyError) as exc_info: + _ = table["nonexistent"] + + assert exc_info.value.args[0] == "nonexistent" + + def test_setitem_with_matching_key( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + table["alice"] = {"id": "alice", "role": "admin"} + + assert table.get("alice") == {"id": "alice", "role": "admin"} + + def test_setitem_with_mismatched_key_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(InvalidKeyError, match="key mismatch"): + table["alice"] = {"id": "bob", "role": "admin"} + + def test_delitem_existing_key(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + del table["alice"] + + assert table.get("alice") is None + + def test_delitem_missing_key_raises_keyerror( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(KeyError) as exc_info: + del table["nonexistent"] + + assert exc_info.value.args[0] == "nonexistent" + + def test_isinstance_mutablemapping( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + assert isinstance(table, MutableMapping) + + def test_values_returns_records_in_key_order(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "bob", "v": 2}\n{"id": "alice", "v": 1}\n') + table = Table(table_path, key="id") + + result = table.values() + + assert result == [{"id": "alice", "v": 1}, {"id": "bob", "v": 2}] + + def test_pop_existing_key(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + result = table.pop("alice") + + assert result == {"id": "alice", "role": "admin"} + assert "alice" not in table + + def test_pop_missing_key_with_default( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + default: JSONObject = {"id": "default", "role": "none"} + + result = table.pop("nonexistent", default) + + assert result == default + + def test_pop_missing_key_without_default_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(KeyError): + _ = table.pop("nonexistent") + + def test_pop_too_many_arguments_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(TypeError, match="pop expected at most 2 arguments"): + _ = table.pop("key", {}, {}) + + def test_popitem_returns_first_key_value_pair(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "bob", "v": 2}\n{"id": "alice", "v": 1}\n') + table = Table(table_path, key="id") + + result = table.popitem() + + # Returns first in sorted order (alice comes before bob) + assert result == ("alice", {"id": "alice", "v": 1}) + assert "alice" not in table + assert "bob" in table + + def test_popitem_empty_table_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with pytest.raises(KeyError, match="table is empty"): + _ = table.popitem() + + def test_setdefault_existing_key_returns_existing(self, tmp_path: "Path") -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + default: JSONObject = {"id": "alice", "role": "user"} + + result = table.setdefault("alice", default) + + assert result == {"id": "alice", "role": "admin"} + + def test_setdefault_missing_key_inserts_and_returns( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + default: JSONObject = {"id": "alice", "role": "admin"} + + result = table.setdefault("alice", default) + + assert result == default + assert table.get("alice") == default + + def test_update_with_mapping(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + mapping: dict[Key, JSONObject] = { + "alice": {"id": "alice", "role": "admin"}, + "bob": {"id": "bob", "role": "user"}, + } + + table.update(mapping) + + assert table.get("alice") == {"id": "alice", "role": "admin"} + assert table.get("bob") == {"id": "bob", "role": "user"} + + def test_update_with_iterable(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + items: list[tuple[str, JSONObject]] = [ + ("alice", {"id": "alice", "role": "admin"}), + ("bob", {"id": "bob", "role": "user"}), + ] + + table.update(items) + + assert table.get("alice") == {"id": "alice", "role": "admin"} + assert table.get("bob") == {"id": "bob", "role": "user"} + + def test_update_with_kwargs(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + + table.update(alice={"id": "alice", "role": "admin"}) + + assert table.get("alice") == {"id": "alice", "role": "admin"} + + def test_update_with_none_is_noop(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + + table.update(None) + + assert table.count() == 0 diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 89cc08d..f7a9a7f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -1,3 +1,4 @@ +from collections.abc import MutableMapping from pathlib import Path from typing import TYPE_CHECKING @@ -17,6 +18,7 @@ from os import stat_result from jsonlt._json import JSONObject + from jsonlt._keys import Key class TestTransactionCreation: @@ -820,19 +822,19 @@ def test_contains_with_invalid_tuple_returns_false(self, tmp_path: "Path") -> No assert (1, 3.14) not in tx assert (None, "x") not in tx - def test_iter_yields_records_in_key_order(self, tmp_path: "Path") -> None: + def test_iter_yields_keys_in_key_order(self, tmp_path: "Path") -> None: table_path = tmp_path / "test.jsonlt" # Write in reverse order _ = table_path.write_text('{"id": "c"}\n{"id": "a"}\n{"id": "b"}\n') table = Table(table_path, key="id") with table.transaction() as tx: - records = list(tx) + keys = list(tx) - assert len(records) == 3 - assert records[0] == {"id": "a"} - assert records[1] == {"id": "b"} - assert records[2] == {"id": "c"} + assert len(keys) == 3 + assert keys[0] == "a" + assert keys[1] == "b" + assert keys[2] == "c" def test_iter_on_empty_transaction( self, make_table: "Callable[..., Table]" @@ -1091,3 +1093,212 @@ def test_conflict_error_repr_with_tuple_key(self, tmp_path: "Path") -> None: result = repr(exc_info.value) assert "ConflictError(" in result assert "key=('acme', 1)" in result + + +class TestTransactionMutableMapping: + def test_getitem_existing_key(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + with table.transaction() as tx: + result = tx["alice"] + assert result == {"id": "alice", "role": "admin"} + + def test_getitem_missing_key_raises_keyerror( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with table.transaction() as tx: + with pytest.raises(KeyError) as exc_info: + _ = tx["nonexistent"] + assert exc_info.value.args[0] == "nonexistent" + + def test_setitem_with_matching_key( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with table.transaction() as tx: + tx["alice"] = {"id": "alice", "role": "admin"} + assert tx.get("alice") == {"id": "alice", "role": "admin"} + + assert table.get("alice") == {"id": "alice", "role": "admin"} + + def test_setitem_with_mismatched_key_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with ( + table.transaction() as tx, + pytest.raises(InvalidKeyError, match="key mismatch"), + ): + tx["alice"] = {"id": "bob", "role": "admin"} + + def test_delitem_existing_key(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + with table.transaction() as tx: + del tx["alice"] + assert tx.get("alice") is None + + assert table.get("alice") is None + + def test_delitem_missing_key_raises_keyerror( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with table.transaction() as tx: + with pytest.raises(KeyError) as exc_info: + del tx["nonexistent"] + assert exc_info.value.args[0] == "nonexistent" + + def test_isinstance_mutablemapping( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with table.transaction() as tx: + assert isinstance(tx, MutableMapping) + + def test_values_returns_records_in_key_order(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "bob", "v": 2}\n{"id": "alice", "v": 1}\n') + table = Table(table_path, key="id") + + with table.transaction() as tx: + result = tx.values() + assert result == [{"id": "alice", "v": 1}, {"id": "bob", "v": 2}] + + def test_pop_existing_key(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + + with table.transaction() as tx: + result = tx.pop("alice") + assert result == {"id": "alice", "role": "admin"} + assert "alice" not in tx + + assert "alice" not in table + + def test_pop_missing_key_with_default( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + default: JSONObject = {"id": "default", "role": "none"} + + with table.transaction() as tx: + result = tx.pop("nonexistent", default) + assert result == default + + def test_pop_missing_key_without_default_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with table.transaction() as tx, pytest.raises(KeyError): + _ = tx.pop("nonexistent") + + def test_pop_too_many_arguments_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with ( + table.transaction() as tx, + pytest.raises(TypeError, match="pop expected at most 2 arguments"), + ): + _ = tx.pop("key", {}, {}) + + def test_popitem_returns_first_key_value_pair(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "bob", "v": 2}\n{"id": "alice", "v": 1}\n') + table = Table(table_path, key="id") + + with table.transaction() as tx: + result = tx.popitem() + # Returns first in sorted order (alice comes before bob) + assert result == ("alice", {"id": "alice", "v": 1}) + assert "alice" not in tx + assert "bob" in tx + + def test_popitem_empty_table_raises( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + + with ( + table.transaction() as tx, + pytest.raises(KeyError, match="table is empty"), + ): + _ = tx.popitem() + + def test_setdefault_existing_key_returns_existing(self, tmp_path: Path) -> None: + table_path = tmp_path / "test.jsonlt" + _ = table_path.write_text('{"id": "alice", "role": "admin"}\n') + table = Table(table_path, key="id") + default: JSONObject = {"id": "alice", "role": "user"} + + with table.transaction() as tx: + result = tx.setdefault("alice", default) + assert result == {"id": "alice", "role": "admin"} + + def test_setdefault_missing_key_inserts_and_returns( + self, make_table: "Callable[..., Table]" + ) -> None: + table = make_table() + default: JSONObject = {"id": "alice", "role": "admin"} + + with table.transaction() as tx: + result = tx.setdefault("alice", default) + assert result == default + assert tx.get("alice") == default + + assert table.get("alice") == default + + def test_update_with_mapping(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + mapping: dict[Key, JSONObject] = { + "alice": {"id": "alice", "role": "admin"}, + "bob": {"id": "bob", "role": "user"}, + } + + with table.transaction() as tx: + tx.update(mapping) + assert tx.get("alice") == {"id": "alice", "role": "admin"} + assert tx.get("bob") == {"id": "bob", "role": "user"} + + assert table.get("alice") == {"id": "alice", "role": "admin"} + + def test_update_with_iterable(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + items: list[tuple[str, JSONObject]] = [ + ("alice", {"id": "alice", "role": "admin"}), + ("bob", {"id": "bob", "role": "user"}), + ] + + with table.transaction() as tx: + tx.update(items) + assert tx.get("alice") == {"id": "alice", "role": "admin"} + + assert table.get("bob") == {"id": "bob", "role": "user"} + + def test_update_with_kwargs(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + + with table.transaction() as tx: + tx.update(alice={"id": "alice", "role": "admin"}) + assert tx.get("alice") == {"id": "alice", "role": "admin"} + + def test_update_with_none_is_noop(self, make_table: "Callable[..., Table]") -> None: + table = make_table() + + with table.transaction() as tx: + tx.update(None) + assert tx.count() == 0