diff --git a/.npmignore b/.npmignore deleted file mode 100644 index 68b5234..0000000 --- a/.npmignore +++ /dev/null @@ -1,15 +0,0 @@ -# Exclude Python bytecode and caches from npm bundles -__pycache__/ -*.pyc -*.pyo -*.pyd - -# Exclude development metadata and tests from runtime package -tests/ -.pytest_cache/ -poetry.lock -package-lock.json -pyproject.toml -.gitignore -.npmignore -.github/ diff --git a/README.md b/README.md index 8221363..7dfab77 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,14 @@ cd trushell pip install -e ".[dev]" ``` +After installation, the `trushell` console script is added to your shell `PATH` and can be run directly. + +If you are working from the source tree without installing, run: + +```bash +PYTHONPATH=. python -m trushell +``` + ## Quick Start ```bash diff --git a/package-lock.json b/package-lock.json index 41f5a2a..9c5a53b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,8 +7,10 @@ "": { "name": "@truos/trushell", "version": "1.0.2", + "hasInstallScript": true, "license": "Apache-2.0", "bin": { + "atoffice-shell": "bin/index.js", "truos": "bin/index.js", "TruOS": "bin/index.js", "trushell": "bin/index.js" diff --git a/tests/test_cli.py b/tests/test_cli.py index 5d54025..8a1e32d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -11,7 +11,11 @@ _handle_edit_command, _handle_local_command, _handle_os_fallback, + _prompt_command, _run_external_command, + _split_command, + is_dangerous_command, + parse_and_execute_command, ) @@ -33,7 +37,7 @@ def test_help_shows_usage() -> None: def test_unknown_command_uses_os_fallback(monkeypatch) -> None: calls = {} - def fake_run(command: str, shell: bool, check: bool, cwd: str) -> subprocess.CompletedProcess[str]: + def fake_run(command: list[str], shell: bool, check: bool, cwd: str) -> subprocess.CompletedProcess[str]: calls["command"] = command calls["shell"] = shell calls["check"] = check @@ -43,7 +47,131 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> subprocess.Com monkeypatch.setattr("trushell.project._run_external_command", fake_run) assert _handle_os_fallback("pwd") is True - assert calls == {"command": "pwd", "shell": True, "check": False, "cwd": os.getcwd()} + assert calls == { + "command": ["pwd"], + "shell": False, + "check": False, + "cwd": os.getcwd(), + } + + +def test_is_dangerous_command_blocks_shell_meta() -> None: + assert is_dangerous_command("ls | grep foo") is True + assert is_dangerous_command("echo $HOME") is True + assert is_dangerous_command("rm file; echo hi") is True + assert is_dangerous_command("echo hello") is False + assert is_dangerous_command('echo "hello world"') is False + + +def test_handle_local_command_prioritizes_todo(monkeypatch) -> None: + calls = {} + + def fake_addtask(task: str, category: str) -> None: + calls["task"] = task + calls["category"] = category + + monkeypatch.setattr("trushell.project.addtask", fake_addtask) + + command, arguments = _split_command('addtask "Review PR" "Work"') + result = _handle_local_command(command, arguments) + + assert result == "handled" + assert calls == {"task": "Review PR", "category": "Work"} + + +def test_handle_local_command_addtask_escaped_quotes(monkeypatch) -> None: + calls = {} + + def fake_addtask(task: str, category: str) -> None: + calls["task"] = task + calls["category"] = category + + monkeypatch.setattr("trushell.project.addtask", fake_addtask) + + command, arguments = _split_command( + 'addtask "Finish report with \\"quotes\\" inside" "Work"' + ) + result = _handle_local_command(command, arguments) + + assert result == "handled" + assert calls == { + "task": 'Finish report with "quotes" inside', + "category": "Work", + } + + +def test_split_command_returns_empty_on_unclosed_quotes() -> None: + command, arguments = _split_command('addtask "Unclosed quote') + + assert command == "" + assert arguments == [] + + +def test_prompt_command_warns_on_multiline_input(monkeypatch) -> None: + messages = [] + + monkeypatch.setattr("builtins.input", lambda prompt: "echo hi\nls") + monkeypatch.setattr( + "trushell.project.typer.secho", + lambda message, fg=None: messages.append((message, fg)), + ) + + raw, command, arguments = _prompt_command() + + assert raw == "" + assert command == "" + assert arguments == [] + assert messages and "Please enter commands one at a time" in messages[0][0] + + +def test_prompt_command_warns_on_invalid_syntax(monkeypatch) -> None: + messages = [] + + monkeypatch.setattr("builtins.input", lambda prompt: 'addtask "Unclosed quote') + monkeypatch.setattr( + "trushell.project.typer.secho", + lambda message, fg=None: messages.append((message, fg)), + ) + + raw, command, arguments = _prompt_command() + + assert raw == "" + assert command == "" + assert arguments == [] + assert messages and "Invalid syntax" in messages[0][0] + + +def test_parse_and_execute_command_handles_unclosed_quotes(monkeypatch) -> None: + messages = [] + + monkeypatch.setattr( + "trushell.project.typer.secho", + lambda message, fg=None: messages.append((message, fg)), + ) + + assert parse_and_execute_command('addtask "Unclosed quote') is True + assert messages and "Invalid syntax" in messages[0][0] + + +def test_parse_and_execute_command_runs_simple_external(monkeypatch) -> None: + calls = {} + + def fake_run(command: list[str], shell: bool, check: bool, cwd: str) -> subprocess.CompletedProcess[str]: + calls["command"] = command + calls["shell"] = shell + calls["check"] = check + calls["cwd"] = cwd + return subprocess.CompletedProcess(args=command, returncode=0) + + monkeypatch.setattr("trushell.project._run_external_command", fake_run) + + assert parse_and_execute_command("echo hi") is True + assert calls == { + "command": ["echo", "hi"], + "shell": False, + "check": False, + "cwd": os.getcwd(), + } def test_run_external_command_profiles_resources(monkeypatch) -> None: @@ -98,7 +226,7 @@ def test_cd_command_changes_directory_and_runs_ls(monkeypatch) -> None: def fake_chdir(path: str) -> None: calls["chdir"] = path - def fake_run(command: str, shell: bool, check: bool, cwd: str) -> SimpleNamespace: + def fake_run(command: list[str], shell: bool, check: bool, cwd: str) -> SimpleNamespace: calls["ls_command"] = command calls["ls_shell"] = shell calls["ls_check"] = check @@ -108,8 +236,15 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> SimpleNamespac monkeypatch.setattr("trushell.project.os.chdir", fake_chdir) monkeypatch.setattr("trushell.project._run_external_command", fake_run) - assert _handle_cd_command("cd /tmp") is True - assert calls == {"chdir": "/tmp", "ls_command": "ls", "ls_shell": True, "ls_check": False, "ls_cwd": os.getcwd()} + command, arguments = _split_command("cd /tmp") + assert _handle_cd_command(command, arguments) is True + assert calls == { + "chdir": "/tmp", + "ls_command": ["ls"], + "ls_shell": False, + "ls_check": False, + "ls_cwd": os.getcwd(), + } def test_cd_without_target_prints_syntax_hint(monkeypatch) -> None: @@ -127,7 +262,8 @@ def fake_run(command: str, shell: bool, check: bool, cwd: str) -> SimpleNamespac monkeypatch.setattr("trushell.project.os.chdir", fake_chdir) monkeypatch.setattr("trushell.project.subprocess.run", fake_run) - assert _handle_cd_command("cd") is True + command, arguments = _split_command("cd") + assert _handle_cd_command(command, arguments) is True assert calls == {} @@ -139,7 +275,8 @@ def test_addtask_missing_arguments_is_blocked(monkeypatch) -> None: lambda message, fg=None: messages.append((message, fg)), ) - assert _handle_local_command("addtask", "") == "handled" + command, arguments = _split_command("addtask") + assert _handle_local_command(command, arguments) == "handled" def test_edit_requires_filename(monkeypatch) -> None: @@ -150,7 +287,8 @@ def test_edit_requires_filename(monkeypatch) -> None: lambda message, fg=None: messages.append((message, fg)), ) - assert _handle_edit_command("edit") is True + command, arguments = _split_command("edit") + assert _handle_edit_command(command, arguments) is True assert messages and "Syntax: edit " in messages[0][0] @@ -170,7 +308,8 @@ def run(self) -> None: monkeypatch.setattr("trushell.project.TruShellEditor", FakeEditor) - assert _handle_edit_command(f"edit {file_path}") is True + command, arguments = _split_command(f"edit {file_path}") + assert _handle_edit_command(command, arguments) is True assert calls == {"filename": str(file_path), "initial_text": "hello", "ran": True} diff --git a/trushell/project.py b/trushell/project.py index c9fb100..c3e6b2f 100644 --- a/trushell/project.py +++ b/trushell/project.py @@ -29,27 +29,56 @@ ) -def _split_command(user_input: str) -> tuple[str, str]: - parts = user_input.strip().split(maxsplit=1) +def _split_command(user_input: str) -> tuple[str, list[str]]: + stripped = user_input.strip() + if not stripped: + return "", [] + + try: + parts = shlex.split(stripped) + except ValueError: + return "", [] + if not parts: - return "", "" + return "", [] + command = parts[0].lower() - argument = parts[1] if len(parts) > 1 else "" - return command, argument + arguments = parts[1:] + return command, arguments + + +def _prompt_command() -> tuple[str, str, list[str]]: + raw_input = input("trushell ❯ ") + if "\n" in raw_input: + typer.secho( + "⚠️ Please enter commands one at a time.", + fg=typer.colors.YELLOW, + ) + return "", "", [] + command, arguments = _split_command(raw_input) + if not command: + if raw_input.strip(): + typer.secho( + "⚠️ Invalid syntax: Check your quotes and escapes.", + fg=typer.colors.YELLOW, + ) + return "", "", [] -def _prompt_command() -> tuple[str, str, str]: - raw_command = input("trushell ❯ ").strip() - command, argument = _split_command(raw_command) - return raw_command, command, argument + return raw_input, command, arguments def _run_external_command( - command: str, + command: str | list[str], shell: bool = True, check: bool = False, cwd: str | None = None, ) -> subprocess.CompletedProcess[str]: + """Run an external command and optionally profile resource usage. + + When shell=False, the command must be provided as a list of arguments so it is + executed directly without shell interpretation. + """ if psutil is None: return subprocess.run(command, shell=shell, check=check, cwd=cwd) @@ -152,44 +181,39 @@ def _handle_joke_command(command: str) -> bool: return False -def _handle_todo_command(command: str) -> bool: - delete_match = re.match(r"deletetask\s+(\d+)", command) - if delete_match: - delete_todo(int(delete_match.group(1)) - 1) +def _handle_todo_command(command: str, arguments: list[str]) -> bool: + if command == "deletetask" and len(arguments) == 1 and arguments[0].isdigit(): + delete_todo(int(arguments[0]) - 1) return True - add_match = re.match(r'addtask\s+"([^"]+)"\s+"([^"]+)"', command) - if add_match: - addtask(add_match.group(1), add_match.group(2)) + if command == "addtask" and len(arguments) == 2: + addtask(arguments[0], arguments[1]) return True - update_match = re.match(r'updatetask\s+(\d+)\s+"([^"]+)"\s+"([^"]+)"', command) - if update_match: - update_todo(int(update_match.group(1)), update_match.group(2), update_match.group(3)) + if command == "updatetask" and len(arguments) == 3 and arguments[0].isdigit(): + update_todo(int(arguments[0]), arguments[1], arguments[2]) return True - complete_match = re.match(r'completetask\s+(\d+)', command) - if complete_match: - complete_todo(int(complete_match.group(1)) - 1) + if command == "completetask" and len(arguments) == 1 and arguments[0].isdigit(): + complete_todo(int(arguments[0]) - 1) return True - if command == "showtasks": + if command == "showtasks" and not arguments: showtask() return True return False -def _handle_edit_command(raw_command: str) -> bool: - command, argument = _split_command(raw_command) +def _handle_edit_command(command: str, arguments: list[str]) -> bool: if command != "edit": return False - if not argument.strip(): + if not arguments: typer.secho("⚠️ Syntax: edit ", fg=typer.colors.YELLOW) return True - file_path = Path(argument.strip()) + file_path = Path(arguments[0]) initial_text = file_path.read_text(encoding="utf-8") if file_path.exists() else "" try: @@ -200,8 +224,8 @@ def _handle_edit_command(raw_command: str) -> bool: return True -def _handle_local_command(command: str, argument: str) -> str: - if command == "addtask" and not argument: +def _handle_local_command(command: str, arguments: list[str]) -> str: + if command == "addtask" and len(arguments) < 2: typer.secho( '⚠️ Missing arguments. Syntax: addtask "task-name" "category"', fg=typer.colors.YELLOW, @@ -212,7 +236,7 @@ def _handle_local_command(command: str, argument: str) -> str: return "exit" if _handle_joke_command(command): return "handled" - if _handle_todo_command(command): + if _handle_todo_command(command, arguments): return "handled" if command == "settings": launch_settings() @@ -234,21 +258,20 @@ def _handle_chronoterm_command(raw_command: str, normalized_command: str) -> boo return True -def _handle_cd_command(raw_command: str) -> bool: +def _handle_cd_command(command: str, arguments: list[str]) -> bool: """Handle cd natively so the shell's working directory changes permanently.""" - command, argument = _split_command(raw_command) if command != "cd": return False - if not argument.strip(): + if not arguments: typer.secho("Syntax: cd ", fg=typer.colors.YELLOW) return True - target = os.path.expanduser(argument) + target = os.path.expanduser(arguments[0]) try: os.chdir(target) - _run_external_command("ls", shell=True, check=False, cwd=os.getcwd()) + _run_external_command(["ls"], shell=False, check=False, cwd=os.getcwd()) except (FileNotFoundError, NotADirectoryError, PermissionError) as error: typer.secho(f"❌ Cannot navigate: {error}", fg=typer.colors.RED) except OSError as error: @@ -257,47 +280,111 @@ def _handle_cd_command(raw_command: str) -> bool: return True +def is_dangerous_command(command: str) -> bool: + """Detect unsafe shell operators or expansions in user input. + + Quotes, spaces, and escaped quote sequences are allowed, but shell + metacharacters are blocked before external execution. + """ + if not command.strip(): + return False + + try: + shlex.split(command) + except ValueError: + return True + + dangerous_pattern = re.compile(r"\|\||&&|>>|[|<>;`$&{}()]") + return bool(dangerous_pattern.search(command)) + + def _handle_os_fallback(raw_command: str) -> bool: - """Pass unrecognized commands to the host OS shell.""" + """Pass unrecognized commands to the host OS safely using shell=False.""" command = raw_command.strip() if not command: return False + if is_dangerous_command(command): + typer.secho( + "⚠️ TruShell blocks shell operators and expansions for safety.", + fg=typer.colors.YELLOW, + ) + typer.secho( + "Use simple external commands without |, >, <, ;, &&, ||, $, `, or $().", + fg=typer.colors.YELLOW, + ) + return True + try: - completed = _run_external_command(command, shell=True, check=False, cwd=os.getcwd()) + parsed_command = shlex.split(command) + except ValueError: + typer.secho( + "❌ Could not parse command. Check your quoting and try again.", + fg=typer.colors.RED, + ) + return True + + if not parsed_command: + return False + + try: + completed = _run_external_command(parsed_command, shell=False, check=False, cwd=os.getcwd()) except (OSError, subprocess.SubprocessError) as error: typer.secho("❓ Command not recognized by TruShell or your host OS.", fg=typer.colors.YELLOW) typer.secho(f"OS fallback error: {error}", fg=typer.colors.RED) return True if completed.returncode != 0: - typer.secho("❓ Command not recognized by TruShell or your host OS.", fg=typer.colors.YELLOW) + typer.secho( + f"⚠️ External command exited with status {completed.returncode}.", + fg=typer.colors.YELLOW, + ) return True +def parse_and_execute_command(raw_command: str) -> bool: + """Parse a command and execute TruShell built-ins or safe external commands.""" + stripped = raw_command.strip() + if not stripped: + return True + + command, arguments = _split_command(stripped) + if not command: + typer.secho( + "⚠️ Invalid syntax: Check your quotes and escapes.", + fg=typer.colors.YELLOW, + ) + return True + + local_result = _handle_local_command(command, arguments) + if local_result == "exit": + return False + if local_result == "handled": + return True + + if _handle_chronoterm_command(stripped, command): + return True + if _handle_cd_command(command, arguments): + return True + if _handle_edit_command(command, arguments): + return True + + return _handle_os_fallback(stripped) + + def run_interactive_shell() -> None: """Persistent REPL loop for the TruShell core.""" typer.secho("Entering TruShell. Type 'exit' to quit.", fg=typer.colors.CYAN) while True: try: - raw_command, command, argument = _prompt_command() + raw_command, command, arguments = _prompt_command() except (KeyboardInterrupt, EOFError): typer.echo("") break - local_result = _handle_local_command(command, argument) - if local_result == "exit": - break - if local_result == "handled": - continue - if _handle_chronoterm_command(raw_command, command): - continue - if _handle_cd_command(raw_command): - continue - if _handle_edit_command(raw_command): - continue - if _handle_os_fallback(raw_command): + if not command: continue - typer.secho(f"Unknown command: {command}", fg=typer.colors.RED) + if not parse_and_execute_command(raw_command): + break