Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions docs/how-to/sqlalchemy-repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,119 @@ def _make_repo() -> SqlAlchemyBookRepository:
ensure_schema(executor)
return SqlAlchemyBookRepository(executor)
```

---

## 6. Atomic multi-write operations with `transactional()`

When a UseCase needs to write to multiple tables atomically, use `SqlAlchemyTransactionManager.transactional()` together with `_in_tx` repository methods.

### Define `_in_tx` methods on the interface

Add dedicated methods that accept an explicit `executor` parameter. These are called only inside a `transactional()` callback — never outside one.

```python
from nene2.database import DatabaseQueryExecutorInterface
from abc import ABC, abstractmethod

class AccountRepositoryInterface(ABC):
@abstractmethod
def find_by_id(self, account_id: int) -> Account | None: ...

# _in_tx variants — executor is provided by the transactional() callback
@abstractmethod
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None: ...

@abstractmethod
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None: ...
```

### Implement `_in_tx` methods in the SQLAlchemy repository

The `_in_tx` methods use the passed-in `executor` instead of `self._executor`, so they share the same connection and participate in the same transaction.

```python
class SqlAlchemyAccountRepository(AccountRepositoryInterface):
def __init__(self, executor: SqlAlchemyQueryExecutor) -> None:
self._executor = executor

def find_by_id(self, account_id: int) -> Account | None:
row = self._executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None

def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
row = executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None

def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
executor.write(
"UPDATE accounts SET balance_cents = balance_cents + :delta WHERE id = :id",
{"delta": delta_cents, "id": account_id},
)
```

### Wire the UseCase with `SqlAlchemyTransactionManager`

```python
from nene2.database import SqlAlchemyTransactionManager

engine = create_engine(cfg.db_url, connect_args={"check_same_thread": False})
transaction_manager = SqlAlchemyTransactionManager(engine)

transfer_use_case = TransferUseCase(transaction_manager, account_repo, transfer_repo)
```

### Implement InMemory `_in_tx` for unit tests

The InMemory implementation ignores the executor — operations go directly to the in-memory store. `InMemoryTransactionManager` calls the callback immediately with a no-op executor.

```python
from nene2.database import DatabaseQueryExecutorInterface, DatabaseTransactionManagerInterface
from collections.abc import Callable

class InMemoryAccountRepository(AccountRepositoryInterface):
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
return self._accounts.get(account_id)

def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
account = self._accounts[account_id]
self._accounts[account_id] = Account(
id=account.id, name=account.name, balance_cents=account.balance_cents + delta_cents
)

class _NoOpExecutor(DatabaseQueryExecutorInterface):
def fetch_all(self, sql: str, params: dict[str, object] | None = None) -> list[dict[str, object]]:
return []
def fetch_one(self, sql: str, params: dict[str, object] | None = None) -> dict[str, object] | None:
return None
def write(self, sql: str, params: dict[str, object] | None = None) -> int:
return 0

class InMemoryTransactionManager(DatabaseTransactionManagerInterface):
def transactional[T](self, callback: Callable[[DatabaseQueryExecutorInterface], T]) -> T:
return callback(_NoOpExecutor())
def begin(self) -> None: pass
def commit(self) -> None: pass
def rollback(self) -> None: pass
```

> **Rollback on exception**: `SqlAlchemyTransactionManager.transactional()` uses `engine.begin()` — any exception inside the callback triggers an automatic rollback. Domain exceptions (`AccountNotFoundException`, etc.) propagate normally after rollback.
```
52 changes: 52 additions & 0 deletions docs/ja/reference/framework-modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,58 @@ result = mgr.transactional(
)
```

#### `transactional()` とリポジトリパターンの組み合わせ(`_in_tx` パターン)

複数テーブルへの書き込みを原子的に行う UseCase では、リポジトリインターフェースに `_in_tx` サフィックスのメソッドを定義し、`transactional()` コールバックから渡された `executor` を受け取ります。

**リポジトリインターフェース:**

```python
from nene2.database import DatabaseQueryExecutorInterface
from abc import ABC, abstractmethod

class AccountRepositoryInterface(ABC):
# 通常メソッド — self._executor を使う(自動コミット)
@abstractmethod
def find_by_id(self, account_id: int) -> Account | None: ...

# _in_tx バリアント — transactional() コールバック内からのみ呼ぶ
@abstractmethod
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None: ...

@abstractmethod
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta: int
) -> None: ...
```

**UseCase(送金の例):**

```python
from nene2.database import DatabaseQueryExecutorInterface, DatabaseTransactionManagerInterface

class TransferUseCase:
def execute(self, input_: TransferInput) -> Transfer:
def _run(executor: DatabaseQueryExecutorInterface) -> Transfer:
source = self._accounts.find_by_id_in_tx(executor, input_.from_account_id)
if source is None:
raise AccountNotFoundException(input_.from_account_id)
if source.balance_cents < input_.amount_cents:
raise InsufficientBalanceException(...)

self._accounts.update_balance_in_tx(executor, input_.from_account_id, -input_.amount_cents)
self._accounts.update_balance_in_tx(executor, input_.to_account_id, input_.amount_cents)
return self._transfers.create_in_tx(executor, ...)

return self._tx.transactional(_run)
```

`transactional()` は内部で `engine.begin()` を使用します — コールバック内で例外が発生した場合、自動的にロールバックされます。

詳細なパターンと InMemory テスト実装は [sqlalchemy-repository.md](../how-to/sqlalchemy-repository.md) を参照してください。

---

## nene2.mcp
Expand Down
62 changes: 62 additions & 0 deletions docs/reference/framework-modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,68 @@ result = mgr.transactional(
)
```

#### Combining `transactional()` with the Repository pattern

When a UseCase needs to perform multiple writes atomically, define `_in_tx` variants on the repository interface that accept an explicit `executor`. The UseCase passes the transaction-bound executor from the callback to each `_in_tx` method.

**Repository interface:**

```python
from nene2.database import DatabaseQueryExecutorInterface
from abc import ABC, abstractmethod

class AccountRepositoryInterface(ABC):
# Standard methods — use self._executor (auto-commit)
@abstractmethod
def find_by_id(self, account_id: int) -> Account | None: ...

# _in_tx variants — call only inside a transactional() callback
@abstractmethod
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None: ...

@abstractmethod
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta: int
) -> None: ...
```

**UseCase (atomic transfer example):**

```python
from nene2.database import DatabaseQueryExecutorInterface, DatabaseTransactionManagerInterface

class TransferUseCase:
def __init__(
self,
transaction_manager: DatabaseTransactionManagerInterface,
account_repo: AccountRepositoryInterface,
transfer_repo: TransferRepositoryInterface,
) -> None:
self._tx = transaction_manager
self._accounts = account_repo
self._transfers = transfer_repo

def execute(self, input_: TransferInput) -> Transfer:
def _run(executor: DatabaseQueryExecutorInterface) -> Transfer:
source = self._accounts.find_by_id_in_tx(executor, input_.from_account_id)
if source is None:
raise AccountNotFoundException(input_.from_account_id)
if source.balance_cents < input_.amount_cents:
raise InsufficientBalanceException(...)

self._accounts.update_balance_in_tx(executor, input_.from_account_id, -input_.amount_cents)
self._accounts.update_balance_in_tx(executor, input_.to_account_id, input_.amount_cents)
return self._transfers.create_in_tx(executor, input_.from_account_id, input_.to_account_id, input_.amount_cents)

return self._tx.transactional(_run)
```

`transactional()` uses `engine.begin()` internally — any exception inside the callback triggers an automatic rollback.

**Testing with InMemory:** Implement `DatabaseTransactionManagerInterface` with a no-op executor that calls the callback directly. The `_in_tx` methods on the InMemory repository ignore the executor and operate on their in-memory store.

---

## nene2.mcp
Expand Down
Loading