Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions tests/test_configure_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _invoke(runner, extra_args: list[str], side_effect=None):
):
mock_exec = _executor_mock()
if side_effect:
mock_exec.stream.side_effect = side_effect
mock_exec.watch_logs.side_effect = side_effect
MockExecutor.return_value = mock_exec
result = runner.invoke(
app,
Expand All @@ -48,18 +48,19 @@ def test_watch_streams_journalctl_after_configure(runner):
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once_with("journalctl --user -u service-myapp-production -f")
mock_exec.watch_logs.assert_called_once_with("service", "service-myapp-production")


def test_no_watch_does_not_stream(runner):
result, mock_exec = _invoke(runner, [])

assert result.exit_code == 0
mock_exec.stream.assert_not_called()
mock_exec.watch_logs.assert_not_called()


def test_watch_handles_keyboard_interrupt(runner):
result, mock_exec = _invoke(runner, ["--watch"], side_effect=KeyboardInterrupt)
# KeyboardInterrupt is handled inside Executor.watch_logs; command exits cleanly
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once()
mock_exec.watch_logs.assert_called_once()
78 changes: 78 additions & 0 deletions tests/test_executor_watch_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from typing import Any
from unittest.mock import MagicMock, patch

from trobz_deploy.utils.executor import Executor, ExecutorError


def _executor() -> Any:
executor = Executor(None)
executor.capture = MagicMock() # type: ignore[method-assign]
executor.run = MagicMock() # type: ignore[method-assign]
executor.stream = MagicMock() # type: ignore[method-assign]
return executor


def _streamed_command(executor: Any) -> str:
return executor.stream.call_args[0][0]


def test_non_odoo_type_streams_journalctl_only_in_its_color():
executor = _executor()

executor.watch_logs("python", "service-myapp-production")

cmd = _streamed_command(executor)
assert cmd == (
'journalctl --user -u service-myapp-production -f -o short-iso | stdbuf -oL sed "s/.*/\x1b[36m&\x1b[0m/"'
)
executor.capture.assert_not_called()
executor.run.assert_not_called()


def test_odoo_type_merges_each_log_in_its_own_color():
executor = _executor()
executor.capture.side_effect = [
"/home/deploy", # echo $HOME
"/home/deploy/odoo-foo-staging/log/odoo.log", # grep logfile from odoo.conf
]
executor.run.side_effect = None # `test -f log/upgrade.log` succeeds

executor.watch_logs("odoo", "odoo-foo-staging")

cmd = _streamed_command(executor)
assert cmd == (
"( journalctl --user -u odoo-foo-staging -f -o short-iso"
' | stdbuf -oL sed "s/.*/\x1b[36m&\x1b[0m/"'
" & tail -f /home/deploy/odoo-foo-staging/log/odoo.log"
' | stdbuf -oL sed "s/.*/\x1b[32m&\x1b[0m/"'
" & tail -f /home/deploy/odoo-foo-staging/log/upgrade.log"
' | stdbuf -oL sed "s/.*/\x1b[33m&\x1b[0m/"'
" & wait )"
)


def test_odoo_type_without_log_files_streams_journalctl_only():
executor = _executor()
executor.capture.side_effect = [
"/home/deploy", # echo $HOME
"", # no logfile configured
]
executor.run.side_effect = ExecutorError("not found") # `test -f log/upgrade.log` fails

executor.watch_logs("odoo", "odoo-foo-staging")

cmd = _streamed_command(executor)
assert cmd == ('journalctl --user -u odoo-foo-staging -f -o short-iso | stdbuf -oL sed "s/.*/\x1b[36m&\x1b[0m/"')


def test_keyboard_interrupt_is_swallowed():
executor = _executor()
executor.capture.side_effect = ExecutorError("unreachable")
executor.stream.side_effect = KeyboardInterrupt

with patch("trobz_deploy.utils.executor.typer.echo") as mock_echo:
executor.watch_logs("python", "service-myapp-production")

mock_echo.assert_called_once_with()
11 changes: 6 additions & 5 deletions tests/test_restart_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _invoke(runner, extra_args: list[str], side_effect=None):
):
mock_exec = _executor_mock()
if side_effect:
mock_exec.stream.side_effect = side_effect
mock_exec.watch_logs.side_effect = side_effect
MockExecutor.return_value = mock_exec
result = runner.invoke(
app,
Expand All @@ -45,18 +45,19 @@ def test_watch_streams_journalctl_after_restart(runner):
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once_with("journalctl --user -u service-myapp-production -f")
mock_exec.watch_logs.assert_called_once_with("python", "service-myapp-production")


def test_no_watch_does_not_stream(runner):
result, mock_exec = _invoke(runner, [])

assert result.exit_code == 0
mock_exec.stream.assert_not_called()
mock_exec.watch_logs.assert_not_called()


def test_watch_handles_keyboard_interrupt(runner):
result, mock_exec = _invoke(runner, ["--watch"], side_effect=KeyboardInterrupt)
# KeyboardInterrupt is handled inside Executor.watch_logs; command exits cleanly
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once()
mock_exec.watch_logs.assert_called_once()
11 changes: 6 additions & 5 deletions tests/test_status_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _invoke(runner, extra_args: list[str], side_effect=None):
):
mock_exec = _executor_mock()
if side_effect:
mock_exec.stream.side_effect = side_effect
mock_exec.watch_logs.side_effect = side_effect
MockExecutor.return_value = mock_exec
result = runner.invoke(
app,
Expand All @@ -44,18 +44,19 @@ def test_watch_streams_journalctl_after_status(runner):
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once_with("journalctl --user -u service-myapp-production -f")
mock_exec.watch_logs.assert_called_once_with("python", "service-myapp-production")


def test_no_watch_does_not_stream(runner):
result, mock_exec = _invoke(runner, [])

assert result.exit_code == 0
mock_exec.stream.assert_not_called()
mock_exec.watch_logs.assert_not_called()


def test_watch_handles_keyboard_interrupt(runner):
result, mock_exec = _invoke(runner, ["--watch"], side_effect=KeyboardInterrupt)
# KeyboardInterrupt is handled inside Executor.watch_logs; command exits cleanly
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once()
mock_exec.watch_logs.assert_called_once()
11 changes: 6 additions & 5 deletions tests/test_update_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _invoke(runner, extra_args: list[str], side_effect=None):
):
mock_exec = _executor_mock()
if side_effect:
mock_exec.stream.side_effect = side_effect
mock_exec.watch_logs.side_effect = side_effect
MockExecutor.return_value = mock_exec
result = runner.invoke(
app,
Expand All @@ -39,18 +39,19 @@ def test_watch_streams_journalctl_after_success(runner):
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once_with("journalctl --user -u service-myapp-production -f")
mock_exec.watch_logs.assert_called_once_with("service", "service-myapp-production")


def test_no_watch_does_not_stream(runner):
result, mock_exec = _invoke(runner, [])

assert result.exit_code == 0
mock_exec.stream.assert_not_called()
mock_exec.watch_logs.assert_not_called()


def test_watch_handles_keyboard_interrupt(runner):
result, mock_exec = _invoke(runner, ["--watch"], side_effect=KeyboardInterrupt)
# KeyboardInterrupt is handled inside Executor.watch_logs; command exits cleanly
result, mock_exec = _invoke(runner, ["--watch"])

assert result.exit_code == 0
mock_exec.stream.assert_called_once()
mock_exec.watch_logs.assert_called_once()
15 changes: 2 additions & 13 deletions trobz_deploy/command/configure.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
from __future__ import annotations

from enum import Enum
from typing import Annotated, Any

import typer

from trobz_deploy.utils.config import load_config, resolve_options
from trobz_deploy.utils.config import DeployType, load_config, resolve_options
from trobz_deploy.utils.executor import Executor, ExecutorError
from trobz_deploy.utils.render import render_unit
from trobz_deploy.utils.venv import setup_odoo_venv, setup_package_venv, setup_python_venv


class DeployType(str, Enum):
odoo = "odoo"
python = "python"
service = "service"


def _is_git_repo(executor: Executor, path: str) -> bool:
try:
executor.run(f"test -d {path}/.git")
Expand Down Expand Up @@ -212,8 +205,4 @@ def configure( # noqa: C901
typer.secho(f"\nInstance {instance_name!r} configured successfully.", fg="green")

if watch:
typer.secho("\nWatching service logs (Ctrl+C to stop)…", fg="cyan")
try:
executor.stream(f"journalctl --user -u {instance_name} -f")
except KeyboardInterrupt:
typer.echo()
executor.watch_logs(eff_type, instance_name)
28 changes: 20 additions & 8 deletions trobz_deploy/command/restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@

import typer

from trobz_deploy.utils.config import load_config
from trobz_deploy.utils.config import DeployType, load_config, resolve_options
from trobz_deploy.utils.executor import Executor, ExecutorError


def restart(
ctx: typer.Context,
instance_name: Annotated[str, typer.Argument()],
ssh_host: Annotated[str | None, typer.Argument()] = None,
deploy_type: Annotated[
DeployType | None,
typer.Option("--type", help="Deployment type (auto-detected from instance name prefix if omitted)."),
] = None,
ssh_port: Annotated[int | None, typer.Option("-p", "--port", help="SSH port on the remote host.")] = None,
watch: Annotated[
bool,
Expand All @@ -20,9 +24,21 @@ def restart(
) -> None:
"""Restart the systemd unit for a deployment instance."""
cfg = load_config(ctx.obj["config"], instance_name)
try:
opts = resolve_options(
cfg,
instance_name,
ssh_host=ssh_host,
ssh_port=ssh_port,
deploy_type=deploy_type.value if deploy_type else None,
)
except ValueError as exc:
typer.echo(typer.style(str(exc), fg="red"), err=True)
raise typer.Exit(code=1) from exc

eff_ssh_host: str | None = ssh_host if ssh_host is not None else cfg.get("ssh_host")
eff_ssh_port: int | None = ssh_port if ssh_port is not None else cfg.get("ssh_port")
eff_ssh_host: str | None = opts.get("ssh_host")
eff_ssh_port: int | None = opts.get("ssh_port")
eff_type: str = opts["type"]

executor = Executor(eff_ssh_host, ctx.obj["verbose"], ssh_port=eff_ssh_port)

Expand All @@ -36,8 +52,4 @@ def restart(
typer.secho(f"\nInstance {instance_name!r} restarted.", fg="green")

if watch:
typer.secho("\nWatching service logs (Ctrl+C to stop)…", fg="cyan")
try:
executor.stream(f"journalctl --user -u {instance_name} -f")
except KeyboardInterrupt:
typer.echo()
executor.watch_logs(eff_type, instance_name)
32 changes: 22 additions & 10 deletions trobz_deploy/command/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import typer

from trobz_deploy.utils.config import load_config
from trobz_deploy.utils.config import DeployType, load_config, resolve_options
from trobz_deploy.utils.executor import Executor, ExecutorError


Expand Down Expand Up @@ -42,6 +42,10 @@ def status(
ctx: typer.Context,
instance_name: Annotated[str, typer.Argument()],
ssh_host: Annotated[str | None, typer.Argument()] = None,
deploy_type: Annotated[
DeployType | None,
typer.Option("--type", help="Deployment type (auto-detected from instance name prefix if omitted)."),
] = None,
ssh_port: Annotated[int | None, typer.Option("-p", "--port", help="SSH port on the remote host.")] = None,
watch: Annotated[
bool,
Expand All @@ -50,13 +54,25 @@ def status(
) -> None:
"""Show status of a deployment instance."""
cfg = load_config(ctx.obj["config"], instance_name)
try:
opts = resolve_options(
cfg,
instance_name,
ssh_host=ssh_host,
ssh_port=ssh_port,
deploy_type=deploy_type.value if deploy_type else None,
)
except ValueError as exc:
typer.echo(typer.style(str(exc), fg="red"), err=True)
raise typer.Exit(code=1) from exc

# Resolve ssh_host/ssh_port: CLI arg > config value
eff_ssh_host: str | None = ssh_host if ssh_host is not None else cfg.get("ssh_host")
eff_ssh_port: int | None = ssh_port if ssh_port is not None else cfg.get("ssh_port")
eff_ssh_host: str | None = opts.get("ssh_host")
eff_ssh_port: int | None = opts.get("ssh_port")
eff_type: str = opts["type"]

executor = Executor(eff_ssh_host, ctx.obj["verbose"], ssh_port=eff_ssh_port)
instance_path = f"$HOME/{instance_name}"
home_dir = executor.capture("echo $HOME")
instance_path = f"{home_dir}/{instance_name}"

# Step 2: Verify instance directory exists
try:
Expand Down Expand Up @@ -87,8 +103,4 @@ def status(
typer.echo(f"Unit: {unit_line}")

if watch:
typer.secho("\nWatching service logs (Ctrl+C to stop)…", fg="cyan")
try:
executor.stream(f"journalctl --user -u {instance_name} -f")
except KeyboardInterrupt:
typer.echo()
executor.watch_logs(eff_type, instance_name)
Loading