diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..af760d9 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +extend-ignore = E203 +line-length = 79 \ No newline at end of file diff --git a/src/docker_volume_analyzer/docker_client.py b/src/docker_volume_analyzer/docker_client.py index 2db36a8..ea45a94 100644 --- a/src/docker_volume_analyzer/docker_client.py +++ b/src/docker_volume_analyzer/docker_client.py @@ -94,3 +94,31 @@ def remove_volume(self, volume_name: str) -> None: raise docker.errors.APIError( f"Failed to remove volume '{volume_name}': {e}" ) from e + + def get_directory_informations_with_find( + self, volume_name: str, directory: str | None = None + ) -> Union[str, None]: + """ + Gets directory information using 'find' command. + + Args: + volume_name (str): Docker volume name. + directory (str): Directory path inside the volume. + + Returns: + str | None: Output of the 'find' with stat command + or None if failed. + """ + path = ( + f"/mnt/docker_volume/{directory}" + if directory + else "/mnt/docker_volume" + ) + + command = [ + "sh", + "-c", + f"find {path} -exec stat -c '%F|%n|%s|%A|%U|%G|%Y' {{}} \\;", + ] + output = self._run_in_container(command, volume_name) + return output if output else None diff --git a/src/docker_volume_analyzer/filesystem.py b/src/docker_volume_analyzer/filesystem.py new file mode 100644 index 0000000..b7cd23e --- /dev/null +++ b/src/docker_volume_analyzer/filesystem.py @@ -0,0 +1,147 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Dict, Optional + + +@dataclass +class FileNode: + """ + Represents a file or directory in the file system. + + Attributes: + name (str): The name of the file or directory. + path (str): The full path of the file or directory. + size (int): The size of the file in bytes. + mtime (datetime): The last modified time of the file or directory. + mode (str): The file mode (permissions) as a string. + user (str): The owner of the file or directory. + group (str): The group owner of the file or directory. + is_directory (bool): True if the node is a directory, + False if it is a file. + childrens (Dict[str, "FileNode"]): A dictionary of child nodes + where the key is the child's name. + parent (Optional["FileNode"]): A reference to the parent node + or None if it is the root. + """ + + name: str + path: str + size: int + mtime: datetime + mode: str + user: str + group: str + is_directory: bool + childrens: Dict[str, "FileNode"] = field(default_factory=dict) + parent: Optional["FileNode"] = None + + +class FileSystem: + """ + Represents a file system structure, allowing for the addition + and retrieval of file nodes. + + Attributes: + root (FileNode): The root node of the file system. + index (Dict[str, FileNode]): A dictionary mapping relative + paths to their corresponding FileNode. + """ + + def __init__(self): + self.root = FileNode( + name="", + path="", + size=0, + mtime=datetime.now(), + mode="", + user="", + group="", + is_directory=True, + ) + self.index = {"": self.root} + + def add_node(self, node: FileNode): + parts = node.path.strip("/").split("/") + current = self.root + full_path = "" + + for i, part in enumerate(parts): + full_path = "/".join(parts[: i + 1]) + if full_path not in self.index: + is_last = i == len(parts) - 1 + n = FileNode( + name=part, + path=full_path, + size=node.size if is_last else 0, + mtime=node.mtime, + mode=node.mode, + user=node.user, + group=node.group, + is_directory=node.is_directory if is_last else True, + parent=current, + ) + current.childrens[part] = n + self.index[full_path] = n + current = self.index[full_path] + + def compute_directory_sizes(self) -> "FileSystem": + """ + Compute the total size of each directory by summing the sizes + of its files, subdirectories, + and the directory's own size (e.g., 4 KB for metadata). + """ + for path in sorted( + self.index.keys(), key=lambda x: x.count("/"), reverse=True + ): + node = self.index[path] + if node.is_directory: + total_size = node.size + + for child in node.childrens.values(): + total_size += child.size + + node.size = total_size + + return self + + +def parse_find_output( + output: str, strip_prefix: str = "/mnt/docker_volume" +) -> FileSystem: + """ + Parses the output of the 'find' command with stat + and builds a FileSystem object. + + Args: + output (str): The output string from the 'find' command, + strip_prefix (str): A prefix to strip from the path in the output + default is '/mnt/docker_volume'. + + Returns: + FileSystem: An instance of FileSystem containing the parsed file nodes. + """ + fs = FileSystem() + for line in output.strip().split("\n"): + try: + type_str, path, size, mode, user, group, mtime = line.split("|") + + path = ( + path[len(strip_prefix) :].lstrip("/") + if path.startswith(strip_prefix) + else path.lstrip("/") + ) + + node = FileNode( + name=path.split("/")[-1], + path=path, + size=int(size), + mtime=datetime.fromtimestamp(int(mtime)), + mode=mode, + user=user, + group=group, + is_directory=(type_str == "directory"), + ) + fs.add_node(node) + except Exception as e: + print(f"Skipping malformed line: {line} ({e})") + return fs diff --git a/src/docker_volume_analyzer/tui.py b/src/docker_volume_analyzer/tui.py index 19a9fd1..5f7a8ab 100644 --- a/src/docker_volume_analyzer/tui.py +++ b/src/docker_volume_analyzer/tui.py @@ -1,7 +1,9 @@ import os from textual.app import App, ComposeResult +from textual.binding import Binding from textual.containers import Container, Horizontal, Vertical +from textual.events import Key from textual.screen import ModalScreen from textual.widgets import Button, DataTable, Footer, Header, Static @@ -19,6 +21,7 @@ class DockerTUI(App): ("ctrl+q", "quit", "Quit"), ("i", "information", "Show information"), ("d", "delete_volume", "Delete volume"), + ("b", "browse", "Browse volume"), ] CSS_PATH = "tui.tcss" @@ -139,6 +142,18 @@ def delete_callback(confirmed: bool) -> None: ) ) + def action_browse(self): + """ + An action to browse the contents of the selected volume. + """ + + table = self.query_one(DataTable) + selected_row = table.cursor_row + + volume_row = table.get_row_at(selected_row) + + self.push_screen(VolumeBrowserScreen(self.app.manager, volume_row[0])) + class VolumeDetailScreen(ModalScreen): """ @@ -242,19 +257,126 @@ def __init__(self, message: str, callback): self.callback = callback def compose(self) -> ComposeResult: - yield Header() with Container(id="dialog"): yield Static(self.message, classes="question") with Horizontal(classes="buttons"): yield Button("No", variant="error", id="no_button") yield Button("Yes", variant="success", id="yes_button") - yield Footer() def on_button_pressed(self, event): - if event.button.id == "yes_button": - self.callback(True) - else: - self.callback(False) + self.callback(event.button.id == "yes_button") + + +class VolumeBrowserScreen(ModalScreen): + """ + A modal screen to browse the contents of a Docker volume. + """ + + BINDINGS = [ + ("b", "back", "Back"), + Binding("enter", "select_cursor", "Select", show=True), + ] + + ICON_DIRECTORY = "📁 " + ICON_FILE = "📄 " + + def __init__(self, volume_manager: VolumeManager, volume_name: str): + super().__init__() + self.volume_name = volume_name + self.volume_manager = volume_manager + self.volume_tree = self.volume_manager.get_volume_tree(volume_name) + self.current_path = "" + + def compose(self) -> ComposeResult: + with Container(id="dialog"): + + yield Static( + f"[b]Browsing volume:[/b] {self.volume_name}", + classes="title", + ) + yield Static( + f"[b]Current path:[/b] {self.current_path}", + id="current_path", + ) + yield DataTable( + id="file_tree", + cursor_type="row", + show_cursor=True, + classes="file-tree", + zebra_stripes=True, + ) + with Container(id="shortcuts", classes="shortcuts-container"): + with Horizontal(classes="shortcuts-list"): + yield Static("[b]Enter:[/b]", classes="shortcut shortcut-key") + yield Static( + "Open selected directory", classes="shortcut shortcut-desc" + ) + yield Static( + "[b]Backspace:[/b]", classes="shortcut shortcut-key" + ) + yield Static( + "Open parent directory", classes="shortcut shortcut-desc" + ) + + def on_mount(self) -> None: + """Load the file tree when the screen is mounted.""" + table = self.query_one(DataTable) + table.add_columns("", "Name", "Size", "Last Modified") + self.load_data() + + def load_data(self) -> None: + + table = self.query_one(DataTable) + table.clear() + directory_informations = self.volume_tree.index.get( + self.current_path, {} + ) + if not directory_informations: + self.query_one(DataTable).add_row( + "No files found in this directory." + ) + return () + + for name, node in directory_informations.childrens.items(): + table.add_row( + ( + f"{self.ICON_DIRECTORY}" + if node.is_directory + else f"{self.ICON_FILE}" + ), + name, + f"{node.size} bytes", + node.mtime.strftime("%Y-%m-%d %H:%M:%S"), + ) + + self.query_one("#current_path").update( + f"[b]Current path:[/b] {self.current_path}/" + ) + + def action_back(self) -> None: + """An action to go back to the previous screen.""" + self.app.pop_screen() + self.app.refresh() + + def on_key(self, event: Key) -> None: + table = self.query_one(DataTable) + if event.key == "enter": + selected = table.cursor_row + row_data = table.get_row_at(selected) + selected_name = row_data[1] + selected_node = self.volume_tree.index.get( + self.current_path, {} + ).childrens.get(selected_name) + + if selected_node and selected_node.is_directory: + self.current_path = ( + f"{self.current_path}/{selected_name}".strip("/") + ) + self.load_data() + elif event.key == "backspace": + self.current_path = os.path.dirname(self.current_path) + table = self.query_one(DataTable) + self.load_data() if __name__ == "__main__": # pragma: no cover diff --git a/src/docker_volume_analyzer/tui.tcss b/src/docker_volume_analyzer/tui.tcss index 5cd6667..efea5f0 100644 --- a/src/docker_volume_analyzer/tui.tcss +++ b/src/docker_volume_analyzer/tui.tcss @@ -17,7 +17,7 @@ Screen { } #dialog { - height: 70%; + height: 90%; background: $panel; color: $text; border: tall $background; @@ -55,3 +55,29 @@ Button { height: auto; color: white; } + +#file_tree{ + height: 100%; +} + + +/* Liste des raccourcis en ligne */ +.shortcuts-list { + align: left middle; + padding: 0 1; +} + +/* Élément individuel de raccourci */ +.shortcut-desc { + color: $footer-description-foreground; + padding-left: 1; + padding-right: 2; +} + +.shortcut{ + width: auto; +} + +.shortcut-key{ + color: $footer-key-foreground; +} diff --git a/src/docker_volume_analyzer/volume_manager.py b/src/docker_volume_analyzer/volume_manager.py index d380660..b1c5f3d 100644 --- a/src/docker_volume_analyzer/volume_manager.py +++ b/src/docker_volume_analyzer/volume_manager.py @@ -1,4 +1,5 @@ from docker_volume_analyzer.docker_client import DockerClient +from docker_volume_analyzer.filesystem import parse_find_output class VolumeManager: @@ -32,7 +33,7 @@ def get_containers_by_volume(self) -> dict: Return all Docker containers and their volumes. Returns: - dict: Dictionary with volume names as keys and + dict: Dictionary with volume names as keys and parse_find_output container information (name, mountpoint, etc.) as values. """ containers_by_volumes = {} @@ -80,3 +81,21 @@ def delete_volume(self, volume_name: str) -> bool: return True except Exception: return False + + def get_volume_tree(self, volume_name: str) -> dict: + """ + Get a tree structure of the files in a Docker volume. + + Args: + volume_name (str): Name of the Docker volume. + + Returns: + dict: A dictionary representing the file tree structure. + """ + find_result = self.client.get_directory_informations_with_find( + volume_name, directory=None + ) + if not find_result: + return {} + + return parse_find_output(find_result).compute_directory_sizes() diff --git a/tests/test_docker_client.py b/tests/test_docker_client.py index 70d3c38..de7432a 100644 --- a/tests/test_docker_client.py +++ b/tests/test_docker_client.py @@ -105,9 +105,7 @@ def test_get_volume_size(): def test_docker_not_available_error(): - # Mock docker.from_env to raise DockerException with patch("docker.from_env", side_effect=docker.errors.DockerException): - # Assert that DockerNotAvailableError is raised with pytest.raises(DockerNotAvailableError): DockerClient() @@ -170,3 +168,122 @@ def test_remove_volume_api_error(): mock_client.volumes.get.assert_called_with("test_volume") mock_volume.remove.assert_called_with(force=True) + + +def test_get_directory_informations_with_find(): + """ + Test the get_directory_informations_with_find method of DockerClient. + """ + mock_client = MagicMock() + mock_client.containers.run.return_value = ( + b"directory|/mnt/docker_volume/test_dir|4096" + b"|drwxr-xr-x|user|group|1633024800\n" + b"file|/mnt/docker_volume/test_dir/file.txt|1024" + b"|-rw-r--r--|user|group|1633024800" + ) + + docker_client = DockerClient() + docker_client.client = mock_client + + output = docker_client.get_directory_informations_with_find( + "test_volume", "test_dir" + ) + + docker_client.client.containers.run.assert_called_with( + image="alpine", + command=[ + "sh", + "-c", + "find /mnt/docker_volume/test_dir -exec stat " + "-c '%F|%n|%s|%A|%U|%G|%Y' {} \\;", + ], + volumes={"test_volume": {"bind": "/mnt/docker_volume", "mode": "ro"}}, + remove=True, + stdout=True, + stderr=False, + ) + + assert output == ( + "directory|/mnt/docker_volume/test_dir|4096" + "|drwxr-xr-x|user|group|1633024800\n" + "file|/mnt/docker_volume/test_dir/file.txt|1024" + "|-rw-r--r--|user|group|1633024800" + ) + + +def test_get_directory_informations_with_find_no_directory(): + """ + Test the get_directory_informations_with_find method of DockerClient + when no directory is specified. + """ + mock_client = MagicMock() + mock_client.containers.run.return_value = ( + b"directory|/mnt/docker_volume|4096" + b"|drwxr-xr-x|user|group|1633024800\n" + b"file|/mnt/docker_volume/file.txt|1024" + b"|-rw-r--r--|user|group|1633024800" + ) + + docker_client = DockerClient() + docker_client.client = mock_client + + output = docker_client.get_directory_informations_with_find("test_volume") + + docker_client.client.containers.run.assert_called_with( + image="alpine", + command=[ + "sh", + "-c", + "find /mnt/docker_volume -exec stat " + "-c '%F|%n|%s|%A|%U|%G|%Y' {} \\;", + ], + volumes={"test_volume": {"bind": "/mnt/docker_volume", "mode": "ro"}}, + remove=True, + stdout=True, + stderr=False, + ) + + assert output == ( + "directory|/mnt/docker_volume|4096" + "|drwxr-xr-x|user|group|1633024800\n" + "file|/mnt/docker_volume/file.txt|1024" + "|-rw-r--r--|user|group|1633024800" + ) + + +def test_get_directory_informations_with_find_error(): + """ + Test the get_directory_informations_with_find method of DockerClient + when an error occurs. + """ + mock_client = MagicMock() + mock_client.containers.run.side_effect = docker.errors.ContainerError( + container=MagicMock(), + exit_status=1, + command="find", + image="alpine", + stderr="error", + ) + + docker_client = DockerClient() + docker_client.client = mock_client + + output = docker_client.get_directory_informations_with_find( + "test_volume", "test_dir" + ) + + docker_client.client.containers.run.assert_called_with( + image="alpine", + command=[ + "sh", + "-c", + "find /mnt/docker_volume/test_dir -exec stat " + "-c '%F|%n|%s|%A|%U|%G|%Y' {} \\;", + ], + volumes={"test_volume": {"bind": "/mnt/docker_volume", "mode": "ro"}}, + remove=True, + stdout=True, + stderr=False, + ) + + assert output is None diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py new file mode 100644 index 0000000..fe6239b --- /dev/null +++ b/tests/test_filesystem.py @@ -0,0 +1,137 @@ +from datetime import datetime + +import pytest + +from docker_volume_analyzer.filesystem import ( + FileNode, + FileSystem, + parse_find_output, +) + + +@pytest.fixture +def now(): + """Fixture for the current datetime.""" + return datetime.now() + + +@pytest.fixture +def make_file(now): + """Fixture to create a FileNode.""" + + def _make(name, path, size): + return FileNode( + name=name, + path=path, + size=size, + mtime=now, + mode="-rw-r--r--", + user="user", + group="group", + is_directory=False, + ) + + return _make + + +@pytest.fixture +def fs(): + """Fixture to create an empty FileSystem.""" + return FileSystem() + + +@pytest.fixture +def sample_output(): + """Sample output from the 'find' command.""" + return ( + "directory|/mnt/docker_volume/dir1|4096" + "|drwxr-xr-x|user|group|1633024800\n" + "file|/mnt/docker_volume/dir1/file1.txt|1024" + "|-rw-r--r--|user|group|1633024800\n" + "file|/mnt/docker_volume/dir1/file2.txt|2048" + "|-rw-r--r--|user|group|1633024800" + ) + + +def test_add_node_to_filesystem(fs, make_file): + """Test adding a node to the FileSystem.""" + node = make_file("file.txt", "dir1/file.txt", 1024) + + fs.add_node(node) + + assert "dir1" in fs.index + assert "dir1/file.txt" in fs.index + assert fs.index["dir1"].is_directory + assert fs.index["dir1/file.txt"].size == 1024 + + +def test_compute_directory_sizes(fs, make_file): + """Test computing directory sizes in the FileSystem.""" + fs.add_node(make_file("file1.txt", "dir1/file1.txt", 1024)) + fs.add_node(make_file("file2.txt", "dir1/file2.txt", 2048)) + + fs.compute_directory_sizes() + + assert fs.index["dir1"].size == 3072 + + +def test_parse_find_output(sample_output): + """Test parsing valid output from the 'find' command.""" + fs = parse_find_output(sample_output) + + assert "dir1" in fs.index + assert "dir1/file1.txt" in fs.index + assert "dir1/file2.txt" in fs.index + assert fs.index["dir1"].is_directory + assert fs.index["dir1/file1.txt"].size == 1024 + assert fs.index["dir1/file2.txt"].size == 2048 + + +@pytest.mark.parametrize( + "output,strip_prefix,expected_keys", + [ + ( + "directory|/mnt/docker_volume/dir1|4096" + "|drwxr-xr-x|user|group|1633024800\n" + "file|/mnt/docker_volume/dir1/file1.txt|1024" + "|-rw-r--r--|user|group|1633024800", + "/mnt/docker_volume", + {"dir1", "dir1/file1.txt"}, + ), + ( + "directory|/mnt/docker_volume/dir2|4096" + "|drwxr-xr-x|user|group|1633024800", + "/mnt/docker_volume", + {"dir2"}, + ), + ], +) +def test_parse_find_output_with_strip_prefix( + output, strip_prefix, expected_keys +): + """Test parsing output with a strip prefix.""" + fs = parse_find_output(output, strip_prefix=strip_prefix) + + non_root_keys = {key for key in fs.index.keys() if key != ""} + assert non_root_keys == expected_keys + + +def test_parse_find_output_malformed_line(): + """Test parsing output with a malformed line.""" + output = ( + "directory|/mnt/docker_volume/dir1|4096" + "|drwxr-xr-x|user|group|1633024800\n" + "malformed_line" + ) + + fs = parse_find_output(output) + + assert "dir1" in fs.index + assert len(fs.index) == 2 + + +def test_parse_find_output_empty(): + """Test parsing an empty output.""" + fs = parse_find_output("") + + assert len(fs.index) == 1 diff --git a/tests/test_tui.py b/tests/test_tui.py index 435b440..aff7ad5 100644 --- a/tests/test_tui.py +++ b/tests/test_tui.py @@ -1,12 +1,14 @@ from unittest.mock import MagicMock, PropertyMock, patch import pytest -from textual.widgets import Button, DataTable +from textual.events import Key +from textual.widgets import Button, DataTable, Static from docker_volume_analyzer.tui import ( ConfirmationScreen, DockerTUI, ErrorScreen, + VolumeBrowserScreen, VolumeDetailScreen, ) @@ -37,7 +39,7 @@ async def test_on_mount(): "docker_volume_analyzer.tui.VolumeManager", return_value=mock_manager ): async with DockerTUI().run_test() as pilot: - await pilot.pause() # Wait for the interface to initialize + await pilot.pause() app = pilot.app table: DataTable = app.query_one( @@ -100,7 +102,7 @@ async def test_action_information(): "#volumes_table", expect_type=DataTable ) table.add_row("volume1", "10GB", 1, "2023-01-01T00:00:00Z") - table.cursor_type = 0 # Move the cursor to the first row + table.cursor_type = 0 app.action_information() assert isinstance(app.screen_stack[-1], VolumeDetailScreen) @@ -151,7 +153,6 @@ async def test_action_information_no_selection(): "#volumes_table", expect_type=DataTable ) - # Patch cursor_row to simulate no row being selected with patch.object( type(table), "cursor_row", new_callable=PropertyMock ) as mock_cursor_row: @@ -159,7 +160,6 @@ async def test_action_information_no_selection(): app.action_information() - # Assertions: no new screen was pushed assert len(app.screen_stack) == 1 assert not isinstance(app.screen_stack[-1], VolumeDetailScreen) @@ -176,10 +176,8 @@ async def test_action_previous_removes_screen_and_refreshes(): screen = VolumeDetailScreen(volume_info) - # Création d'un mock pour app avec les méthodes pop_screen et refresh mock_app = MagicMock() - # Patch la propriété 'app' du screen pour retourner mock_app with patch.object( type(screen), "app", new_callable=PropertyMock ) as mock_app_prop: @@ -218,7 +216,6 @@ async def test_action_delete_volume_with_no_selection(): "#volumes_table", expect_type=DataTable ) - # Patch cursor_row to simulate no row being selected with patch.object( type(table), "cursor_row", new_callable=PropertyMock ) as mock_cursor_row: @@ -226,7 +223,6 @@ async def test_action_delete_volume_with_no_selection(): app.action_delete_volume() - # Assertions: no new screen was pushed assert len(app.screen_stack) == 1 @@ -260,13 +256,10 @@ async def test_action_delete_volume_with_attached_containers(): with patch.object( type(table), "cursor_row", new_callable=PropertyMock ) as mock_cursor_row: - mock_cursor_row.return_value = ( - 0 # Simulate the cursor on the first row - ) + mock_cursor_row.return_value = 0 app.action_delete_volume() - # Assertions: an error screen was pushed assert isinstance(app.screen_stack[-1], ErrorScreen) assert ( app.screen_stack[-1].message @@ -302,19 +295,16 @@ async def test_action_delete_volume_success(): ) table.cursor_type = 0 - # Simulate user confirming the deletion app.action_delete_volume() confirmation_screen = app.screen_stack[-1] assert isinstance(confirmation_screen, ConfirmationScreen) - # Simulate pressing "Yes" on the confirmation screen confirmation_screen.on_button_pressed( Button.Pressed(Button("Yes", id="yes_button")) ) - # Assertions mock_manager.delete_volume.assert_called_once_with("volume1") - assert len(table.rows) == 0 # Volume was removed from the table + assert len(table.rows) == 0 @pytest.mark.asyncio @@ -345,17 +335,14 @@ async def test_action_delete_volume_not_confirmed(): ) table.cursor_type = 0 - # Simulate user confirming the deletion app.action_delete_volume() confirmation_screen = app.screen_stack[-1] assert isinstance(confirmation_screen, ConfirmationScreen) - # Simulate pressing "No" on the confirmation screen confirmation_screen.on_button_pressed( Button.Pressed(Button("No", id="no_button")) ) - # Assertions mock_manager.delete_volume.assert_not_called() assert len(table.rows) == 1 @@ -389,17 +376,14 @@ async def test_action_delete_volume_throw_exception(): ) table.cursor_type = 0 - # Simulate user confirming the deletion app.action_delete_volume() confirmation_screen = app.screen_stack[-1] assert isinstance(confirmation_screen, ConfirmationScreen) - # Simulate pressing "Yes" on the confirmation screen confirmation_screen.on_button_pressed( Button.Pressed(Button("Yes", id="yes_button")) ) - # Assertions: an error screen was pushed assert isinstance(app.screen_stack[-1], ErrorScreen) assert ( app.screen_stack[-1].message @@ -415,17 +399,269 @@ async def test_error_screen_action_back(): error_message = "This is an error message." screen = ErrorScreen(error_message) - # Create a mock app to test screen behavior mock_app = MagicMock() - # Patch the 'app' property of the screen to return the mock app with patch.object( type(screen), "app", new_callable=PropertyMock ) as mock_app_prop: mock_app_prop.return_value = mock_app - # Call the action_back method screen.action_back() - # Assert that the screen was popped mock_app.pop_screen.assert_called_once() + + +@pytest.mark.asyncio +async def test_action_browse(): + """ + Test that the action_browse method pushes the VolumeBrowserScreen + with the correct volume name. + """ + mock_manager = MagicMock() + mock_manager.get_volumes.return_value = { + "volume1": { + "name": "volume1", + "size": "10GB", + "containers": [], + "created_at": "2023-01-01T00:00:00Z", + } + } + + with patch( + "docker_volume_analyzer.tui.VolumeManager", return_value=mock_manager + ): + async with DockerTUI().run_test() as pilot: + app = pilot.app + + app.query_one("#volumes_table", expect_type=DataTable).add_row( + "volume1", "10GB", 0, "2023-01-01T00:00:00Z" + ) + app.action_browse() + await pilot.pause() + + screen = app.screen_stack[-1] + assert isinstance(screen, VolumeBrowserScreen) + assert screen.volume_name == "volume1" + assert screen.volume_manager == mock_manager + assert ( + screen.query_one( + "#current_path", expect_type=Static + ).renderable + == "[b]Current path:[/b] /" + ) + + +@pytest.mark.asyncio +async def test_volume_browser_screen_initialization(): + """ + Test that the VolumeBrowserScreen initializes correctly + with the given volume name and manager. + """ + mock_manager = MagicMock() + mock_manager.get_volume_tree.return_value = MagicMock(index={}) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + + assert screen.volume_name == "test_volume" + assert screen.volume_manager == mock_manager + assert screen.current_path == "" + assert screen.volume_tree == mock_manager.get_volume_tree.return_value + + +@pytest.mark.asyncio +async def test_volume_browser_screen_load_data_empty_directory(): + """ + Test that the VolumeBrowserScreen displays + a message when the directory is empty. + """ + mock_manager = MagicMock() + mock_manager.get_volume_tree.return_value = MagicMock(index={}) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + + with patch.object( + screen, "query_one", return_value=MagicMock() + ) as mock_query: + screen.compose() + screen.load_data() + + mock_query.assert_called_with(DataTable) + mock_query.return_value.clear.assert_called_once() + mock_query.return_value.add_row.assert_called_once_with( + "No files found in this directory." + ) + + +@pytest.mark.asyncio +async def test_volume_browser_screen_load_data_with_files(): + """ + Test that the VolumeBrowserScreen loads + and displays files and directories correctly. + """ + mock_manager = MagicMock() + mock_node = MagicMock() + mock_node.is_directory = True + mock_node.size = 0 + mock_node.mtime.strftime.return_value = "2023-01-01 00:00:00" + + mock_manager.get_volume_tree.return_value = MagicMock( + index={ + "": MagicMock( + childrens={ + "folder1": mock_node, + "file1.txt": MagicMock( + is_directory=False, + size=1024, + mtime=MagicMock( + strftime=lambda fmt: "2023-01-01 00:00:00" + ), + ), + } + ) + } + ) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + + mock_table = MagicMock() + mock_current_path = MagicMock() + with patch.object( + screen, + "query_one", + side_effect=lambda selector: ( + mock_table if selector == DataTable else mock_current_path + ), + ) as mock_query: + screen.load_data() + + mock_query.assert_any_call(DataTable) + mock_table.clear.assert_called_once() + mock_table.add_row.assert_any_call( + "📁 ", "folder1", "0 bytes", "2023-01-01 00:00:00" + ) + mock_table.add_row.assert_any_call( + "📄 ", "file1.txt", "1024 bytes", "2023-01-01 00:00:00" + ) + + mock_query.assert_any_call("#current_path") + mock_current_path.update.assert_called_once_with( + "[b]Current path:[/b] /" + ) + + +@pytest.mark.asyncio +async def test_volume_browser_screen_action_back(): + """ + Test that the action_back method pops the screen and refreshes the app. + """ + mock_app = MagicMock() + screen = VolumeBrowserScreen(MagicMock(), "test_volume") + + with patch.object( + type(screen), "app", new_callable=PropertyMock + ) as mock_app_prop: + mock_app_prop.return_value = mock_app + + screen.action_back() + + mock_app.pop_screen.assert_called_once() + mock_app.refresh.assert_called_once() + + +@pytest.mark.asyncio +async def test_volume_browser_screen_on_key_enter_directory(): + """ + Test that pressing 'enter' navigates into a directory. + """ + mock_manager = MagicMock() + mock_node = MagicMock(is_directory=True) + mock_manager.get_volume_tree.return_value = MagicMock( + index={ + "": MagicMock(childrens={"folder1": mock_node}), + "folder1": MagicMock(childrens={}), + } + ) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + screen.current_path = "" + + with patch.object( + screen, "query_one", return_value=MagicMock() + ) as mock_query: + mock_query.return_value.cursor_row = 0 + mock_query.return_value.get_row_at.return_value = ["📁 ", "folder1"] + + screen.on_key(Key("enter", None)) + + assert screen.current_path == "folder1" + mock_query.return_value.clear.assert_called_once() + + +@pytest.mark.asyncio +async def test_volume_browser_screen_on_key_backspace(): + """ + Test that pressing 'backspace' navigates to the parent directory. + """ + mock_manager = MagicMock() + mock_manager.get_volume_tree.return_value = MagicMock(index={}) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + screen.current_path = "folder1/subfolder" + + with patch.object( + screen, "query_one", return_value=MagicMock() + ) as mock_query: + screen.on_key(Key("backspace", None)) + + assert screen.current_path == "folder1" + mock_query.return_value.clear.assert_called_once() + + +@pytest.mark.asyncio +async def test_on_key_enter_invalid_selection(): + """ + Test that pressing 'enter' does nothing if the selected item is invalid. + """ + mock_manager = MagicMock() + mock_manager.get_volume_tree.return_value = MagicMock(index={}) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + screen.current_path = "" + + mock_node = MagicMock() + mock_node.childrens = {} + screen.volume_tree.index = {"": mock_node} + + with patch.object( + screen, "query_one", return_value=MagicMock() + ) as mock_query: + mock_query.return_value.cursor_row = 0 + mock_query.return_value.get_row_at.return_value = [ + "📄 ", + "invalid_item", + ] + + screen.on_key(Key("enter", None)) + + assert screen.current_path == "" + mock_query.return_value.clear.assert_not_called() + + +@pytest.mark.asyncio +async def test_on_key_other_key(): + """ + Test that pressing a key other than 'enter' or 'backspace' does nothing. + """ + mock_manager = MagicMock() + mock_manager.get_volume_tree.return_value = MagicMock(index={}) + + screen = VolumeBrowserScreen(mock_manager, "test_volume") + screen.current_path = "folder1/subfolder" + + with patch.object( + screen, "query_one", return_value=MagicMock() + ) as mock_query: + screen.on_key(Key("space", None)) + + assert screen.current_path == "folder1/subfolder" + mock_query.return_value.clear.assert_not_called() diff --git a/tests/test_volume_manager.py b/tests/test_volume_manager.py index f6929d9..5109dd5 100644 --- a/tests/test_volume_manager.py +++ b/tests/test_volume_manager.py @@ -1,6 +1,6 @@ import random from typing import List -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from docker_volume_analyzer.volume_manager import VolumeManager @@ -95,7 +95,6 @@ def test_get_volumes() -> None: lambda name: int(name.replace("volume", "")) * 10 + 10 ) - # Injecte dans VolumeManager volume_manager = VolumeManager() volume_manager.client = mock_client @@ -108,7 +107,6 @@ def test_delete_volume_success() -> None: mock_client = MagicMock() mock_client.remove_volume.return_value = True - # Injecte dans VolumeManager volume_manager = VolumeManager() volume_manager.client = mock_client @@ -123,9 +121,78 @@ def test_delete_volume_failure() -> None: mock_client.remove_volume.side_effect = Exception("Volume not found") - # Injecte dans VolumeManager volume_manager = VolumeManager() volume_manager.client = mock_client assert volume_manager.delete_volume(volume_name) is False mock_client.remove_volume.assert_called_once_with(volume_name) + + +def test_get_volume_tree_success() -> None: + volume_name = "test_volume" + mock_find_output = [ + {"path": "/file1.txt", "size": 100}, + {"path": "/dir1/file2.txt", "size": 200}, + {"path": "/dir1/file3.txt", "size": 300}, + {"path": "/dir2/file4.txt", "size": 400}, + ] + expected_tree = { + "name": "/", + "size": 1000, + "childrens": [ + {"name": "file1.txt", "size": 100, "childrens": []}, + { + "name": "dir1", + "size": 500, + "childrens": [ + {"name": "file2.txt", "size": 200, "childrens": []}, + {"name": "file3.txt", "size": 300, "childrens": []}, + ], + }, + { + "name": "dir2", + "size": 400, + "childrens": [ + {"name": "file4.txt", "size": 400, "childrens": []}, + ], + }, + ], + } + + mock_client = MagicMock() + (mock_client.get_directory_informations_with_find).return_value = ( + mock_find_output + ) + + mock_parse_find_output = MagicMock() + ( + mock_parse_find_output.return_value.compute_directory_sizes + ).return_value = expected_tree + + volume_manager = VolumeManager() + volume_manager.client = mock_client + + with patch( + "docker_volume_analyzer.volume_manager.parse_find_output", + mock_parse_find_output, + ): + assert volume_manager.get_volume_tree(volume_name) == expected_tree + ( + mock_client.get_directory_informations_with_find + ).assert_called_once_with(volume_name, directory=None) + mock_parse_find_output.assert_called_once_with(mock_find_output) + + +def test_get_volume_tree_empty() -> None: + volume_name = "empty_volume" + + mock_client = MagicMock() + mock_client.get_directory_informations_with_find.return_value = [] + + volume_manager = VolumeManager() + volume_manager.client = mock_client + + assert volume_manager.get_volume_tree(volume_name) == {} + mock_client.get_directory_informations_with_find.assert_called_once_with( + volume_name, directory=None + )