diff --git a/README.md b/README.md index e8fcf8c..0fa550c 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,14 @@ This repo was created for Sourcegraph Implementation Engineering deployments, an - Other version control systems are left up to the customer to convert to Git - This project builds a framework to convert repos from other VCSes to Git +## Versioning + +- This project uses semantic versioning, where x.y.z is major.minor.patch +- The minor version is incremented if a breaking change has been made, usually to either: + - Configuration files + - Environment variables + - Storage locations on the host + ## Deployment For Sourcegraph Cloud customers, they'll need to run the repo-converter, src serve-git, and the Sourcegraph Cloud Private Access Agent on a container platform with connectivity to both their Sourcegraph Cloud instance, and their code hosts. This can be done quite securely, as the src serve-git API endpoint does not need any ports exposed outside of the container network Running src serve-git and the agent together on the same container network allows the agent to use the container platform's local DNS service to reach src serve-git, and prevents src serve-git's unauthenticated HTTP endpoint from needing to be opened outside of the container network. diff --git a/deploy/docker-compose/customer1/docker-compose.override.yaml b/deploy/docker-compose/customer1/docker-compose.override.yaml index 1e52fd0..db37481 100644 --- a/deploy/docker-compose/customer1/docker-compose.override.yaml +++ b/deploy/docker-compose/customer1/docker-compose.override.yaml @@ -6,7 +6,7 @@ services: repo-converter: - # image: ghcr.io/sourcegraph/repo-converter:v0.5.1 + image: ghcr.io/sourcegraph/repo-converter:${DOCKER_TAG} environment: - LOG_LEVEL=DEBUG - REPO_CONVERTER_INTERVAL_SECONDS=600 diff --git a/deploy/docker-compose/customer1/pull-start.sh b/deploy/docker-compose/customer1/pull-start.sh index 900cf3f..c8d741a 100644 --- a/deploy/docker-compose/customer1/pull-start.sh +++ b/deploy/docker-compose/customer1/pull-start.sh @@ -10,13 +10,50 @@ ## Get script args -# If an f is passed into the script args, then try to fix the ownership and permissions of files in the src-serve-git directory -if [[ "$1" == *"f"* ]] -then - fix_perms="true" -else - fix_perms="false" -fi +# If an -f is passed into the script args, then try to fix the ownership and permissions of files in the src-serve-git directory +fix_perms="false" + +# If a -dt or --docker-tag is passed in, then use it in the Docker Compose up command for the repo-converter +# DOCKER_TAG="latest" +DOCKER_TAG="stable" + +# Create the arg to allow disabling git reset and pull +NO_GIT="" + +POSITIONAL_ARGS=() + +while [[ $# -gt 0 ]]; do + case $1 in + -f|--fix-perms) + fix_perms="true" + shift # past argument + ;; + -l|--latest) + DOCKER_TAG="latest" + shift # past argument + ;; + -s|--stable) + DOCKER_TAG="stable" + shift # past argument + ;; + -n|--no-git) + NO_GIT="true" + shift # past argument + ;; + -dt|--docker-tag) + DOCKER_TAG="$2" + shift # past argument + shift # past value + ;; + -*|--*) + echo "Unknown option $1" + exit 1 + ;; + esac +done + +set -- "${POSITIONAL_ARGS[@]}" # restore positional parameters + ## Setup # Define file paths @@ -111,16 +148,34 @@ fi log "On branch before git pull:" $git_branch_cmd +## Formulate Git and Docker commands +git_commands="\ + $git_cmd reset --hard &&\ + $git_cmd pull --force &&\ +" + +if [[ -n "$NO_GIT" ]] +then + git_commands="" +fi + + +export DOCKER_TAG=$DOCKER_TAG +docker_commands="\ + $docker_cmd pull &&\ + DOCKER_TAG=$DOCKER_TAG CURRENT_UID_GID=$CURRENT_UID_GID $docker_cmd up -d --remove-orphans +" + +command="\ + $git_commands \ + $docker_commands \ + " + + log "Docker compose file: $docker_compose_full_file_path" log "docker ps before:" $docker_cmd ps -command="\ - $git_cmd reset --hard &&\ - $git_cmd pull --force &&\ - $docker_cmd pull &&\ - CURRENT_UID_GID=$CURRENT_UID_GID $docker_cmd up -d --remove-orphans \ - " log "Running command in a sub shell:" # awk command to print the command nicely with newlines diff --git a/dev/TODO.md b/dev/TODO.md index b974602..f5afead 100644 --- a/dev/TODO.md +++ b/dev/TODO.md @@ -181,31 +181,46 @@ - Add a timeout in run_subprocess() for hanging svn info ~~and svn log~~ commands, if data isn't transferring - Does the svn cli not have a timeout built in for this command? -- PID layers, from `docker exec -it repo-converter top` - - This output was captured 14 hours into converting a repo that's up to 2 GB on disk so far, with 6 years of history left to catch up on - - This is after removing our batch processing bubble-wrap, and just lettin'er buck +- Process tree + - Copied from the output of `docker exec -it repo-converter top` ``` - PID PPID nTH S CODE USED SWAP RES %MEM nMaj nMin nDRT OOMa OOMs %CPU TIME+ COMMAND - 1 0 2 S 2.7m 37.4m 1.5m 35.9m 0.5 991 8.7m 0 0 668 0.0 2:44.22 /usr/bin/python3 /sg/repo-converter/src/main.py - 85 1 1 S 2.7m 40.8m 11.6m 29.2m 0.4 0 20k 0 0 669 0.0 0:05.82 `- /usr/bin/python3 /sg/repo-converter/src/main.py - 330 85 1 S 2.7m 1.4m 0.2m 1.2m 0.0 0 364 0 0 666 0.0 0:00.00 `- git -C /sg/src-serve-root/org/repo svn fetch --quiet --username user --log-window-size 100 - 331 330 1 S 1.6m 115.6m 17.8m 97.8m 1.2 56 534m 0 0 674 13.6 66:17.92 `- /usr/bin/perl /usr/lib/git-core/git-svn fetch --quiet --username user --log-window-size 100 - 376 331 1 S 2.7m 1.1g 0.1m 1.1g 14.6 18k 1.4m 0 0 744 0.3 1:18.22 `- git cat-file --batch - 34015 331 1 S 2.7m 10.0m 0.0m 10.0m 0.1 17 889k 0 0 667 2.0 4:36.38 `- git hash-object -w --stdin-paths --no-filters - 1850259 331 1 S 2.7m 5.1m 0.0m 5.1m 0.1 0 499 0 0 666 0.0 0:00.00 `- git update-index -z --index-info + top - 03:39:24 up 6 days, 4:37, 0 users, load average: 0.89, 1.60, 1.83 + Tasks: 14 total, 1 running, 13 sleeping, 0 stopped, 0 zombie + %Cpu(s): 8.7 us, 10.7 sy, 0.0 ni, 80.2 id, 0.2 wa, 0.0 hi, 0.3 si, 0.0 st + GiB Mem : 7.8 total, 2.0 free, 1.2 used, 4.5 buff/cache + GiB Swap: 2.0 total, 1.9 free, 0.1 used. 6.3 avail Mem + + SID PGRP PID PPID VIRT RES SHR %MEM OOMs %CPU TIME+ COMMAND + 1 1 1 0 0.1g 0.0g 0.0g 0.4 668 0.0 0:02.41 /usr/bin/python3 /sg/repo-converter/src/main.py + 1 1 7 1 0.5g 0.0g 0.0g 0.2 668 0.0 0:00.45 `- /usr/bin/python3 /sg/repo-converter/src/main.py + 81 81 81 1 0.1g 0.0g 0.0g 0.3 668 0.0 0:00.40 `- /usr/bin/python3 /sg/repo-converter/src/main.py + 81 81 527 81 0.0g 0.0g 0.0g 0.0 666 0.0 0:00.00 `- git -C /sg/src-serve-root/repo1 svn fetch --quiet --username user --log-window-size 100 + 81 81 529 527 0.0g 0.0g 0.0g 0.5 668 0.3 2:52.51 `- /usr/bin/perl /usr/lib/git-core/git-svn fetch --quiet --username user --log-window-size 100 + 81 81 880 529 2.1g 0.2g 0.1g 2.6 680 0.0 0:02.23 `- git cat-file --batch + 81 81 6267 529 0.0g 0.0g 0.0g 0.1 667 0.0 0:09.38 `- git hash-object -w --stdin-paths --no-filters + 81 81 305238 529 0.0g 0.0g 0.0g 0.1 666 0.0 0:00.00 `- git update-index -z --index-info + 144 144 144 1 0.1g 0.0g 0.0g 0.4 668 0.0 0:00.92 `- /usr/bin/python3 /sg/repo-converter/src/main.py + 144 144 478 144 0.0g 0.0g 0.0g 0.0 666 0.0 0:00.00 `- git -C /sg/src-serve-root/repo2 -c http.sslVerify=false svn fetch --quiet --username user --log-window-size 100 + 144 144 479 478 0.2g 0.1g 0.0g 1.8 676 12.3 7:21.52 `- /usr/bin/perl /usr/lib/git-core/git-svn fetch --quiet --username user --log-window-size 100 + 144 144 709 479 0.0g 0.0g 0.0g 0.4 668 0.0 0:02.79 `- git cat-file --batch + 144 144 1015 479 0.0g 0.0g 0.0g 0.1 666 0.0 0:08.89 `- git hash-object -w --stdin-paths --no-filters ``` - PID 1 - Docker container entrypoint - - PID 85 - - Spawned by `multiprocessing.Process().start()` in `convert_repos.start()` - - PID 330 - - Spawned by `psutil.Popen()` in `cmd.run_subprocess()` + - PID 7 + - Probably the `status_monitor.start` function + - PIDs 81 and 144 + - Notice that the SID (Session ID) and PGRP (Process Group) match the PID 81 and 144 numbers, i.e. this process is its session and group leader, as a result of the `os.setsid()` call in `fork_conversion_processes.py`, this makes it much easier to find PGRP values in the container's logs to track which processes are getting cleaned up as they finish + - Spawned by `multiprocessing.Process().start()` in `fork_conversion_processes.start()` + - PIDs 527 and 478 - `git svn fetch` command, called from `_git_svn_fetch()` in `svn.convert()` - - PID 331 + - Spawned by `psutil.Popen()` in `cmd.run_subprocess()` + - PIDs 529 and 479 - `git-svn` perl script, which runs the `git svn fetch` workload in [sub fetch, in SVN.pm](https://github.com/git/git/blob/v2.50.1/perl/Git/SVN.pm#L2052) - - This script is quite naive, no retries, always exits 0, even on failures - - PID 376 + - This perl script is quite naive, no retries, always exits 0, even on failures + - PIDs 880 and 709 - Long-running `git cat-file` process, which stores converted content in memory + - This process usually has a higher than average OOMs (OOMkill score) - It seems quite likely that this process doesn't free up memory after each commit, so memory requirements for this process alone would be some large portion of a repo's size - The minimum memory requirements for this process would be the contents of the largest commit in the repo's history, otherwise the conversion would never progress beyond this commit - This process' CPU state is usually Sleeping, because it spends almost all of its time receiving content from the subversion server @@ -386,6 +401,8 @@ ## Old Doc +- Need to clean this up, and put it somewhere + ```yaml xmlbeans: # Usage: This key is used as the converted Git repo's name diff --git a/src/config/load_repos.py b/src/config/load_repos.py index 97e2f19..e4ca35a 100644 --- a/src/config/load_repos.py +++ b/src/config/load_repos.py @@ -4,10 +4,9 @@ # Import repo-converter modules from utils import secret from utils.context import Context -from utils.log import log +from utils.logging import log # Import Python standard modules -from sys import exit from urllib.parse import urlparse # Import third party modules @@ -29,20 +28,14 @@ def load_from_file(ctx: Context) -> None: # This should return a dict repos = yaml.safe_load(repos_to_convert_file) - except IsADirectoryError: + except IsADirectoryError as e: + log(ctx, f"File not found at {repos_to_convert_file_path}, but found a directory, likely created by the Docker mount. Please stop the container, delete the directory, and create the yaml file.", "critical", exception=e) - log(ctx, f"File not found at {repos_to_convert_file_path}, but found a directory, likely created by the Docker mount. Please stop the container, delete the directory, and create the yaml file.", "critical") - exit(1) + except FileNotFoundError as e: + log(ctx, f"File not found at {repos_to_convert_file_path}", "critical", exception=e) - except FileNotFoundError: - - log(ctx, f"File not found at {repos_to_convert_file_path}", "critical") - exit(2) - - except (AttributeError, yaml.scanner.ScannerError) as exception: # type: ignore - - log(ctx, f"YAML syntax error in {repos_to_convert_file_path}, please lint it. Exception: {type(exception)}, {exception.args}, {exception}", "critical") - exit(3) + except (AttributeError, yaml.scanner.ScannerError) as e: # type: ignore + log(ctx, f"YAML syntax error in {repos_to_convert_file_path}, please lint it", "critical", exception=e) repos = check_types(ctx, repos) repos = reformat_repos_dict(ctx, repos) @@ -439,7 +432,7 @@ def validate_inputs(ctx: Context, repos_input: dict) -> dict: break except Exception as e: - log(ctx, f"urlparse failed to parse URL {url}: {e}", "warning") + log(ctx, f"urlparse failed to parse URL {url}", "warning", exception=e) # Fallback to code-host-name if provided if not server_name: diff --git a/src/config/validate_env.py b/src/config/validate_env.py index f78315c..59b39fe 100644 --- a/src/config/validate_env.py +++ b/src/config/validate_env.py @@ -5,8 +5,7 @@ # Import repo-converter modules from utils.context import Context -from utils.log import log - +from utils.logging import log def validate_env_vars(ctx: Context) -> None: """Validate inputs here, now that the logger is instantiated, instead of throughout the code""" @@ -15,10 +14,10 @@ def validate_env_vars(ctx: Context) -> None: # Validate concurrency limits if ctx.env_vars["MAX_CONCURRENT_CONVERSIONS_PER_SERVER"] <= 0: - raise ValueError("MAX_CONCURRENT_CONVERSIONS_PER_SERVER must be greater than 0") + log(ctx, "MAX_CONCURRENT_CONVERSIONS_PER_SERVER must be greater than 0", "critical") if ctx.env_vars["MAX_CONCURRENT_CONVERSIONS_GLOBAL"] <= 0: - raise ValueError("MAX_CONCURRENT_CONVERSIONS_GLOBAL must be greater than 0") + log(ctx, "MAX_CONCURRENT_CONVERSIONS_GLOBAL must be greater than 0", "critical") if ctx.env_vars["MAX_CONCURRENT_CONVERSIONS_PER_SERVER"] > ctx.env_vars["MAX_CONCURRENT_CONVERSIONS_GLOBAL"]: diff --git a/src/main.py b/src/main.py index e61a38f..1ad3b34 100644 --- a/src/main.py +++ b/src/main.py @@ -5,7 +5,7 @@ from config import load_env, load_repos, validate_env from utils import concurrency_manager, fork_conversion_processes, git, logger, signal_handler, status_monitor from utils.context import Context -from utils.log import log +from utils.logging import log # Import Python standard modules # import sysconfig @@ -24,7 +24,7 @@ def main(): ) # Configure logging - logger.configure_logger(ctx.env_vars["LOG_LEVEL"]) + logger.configure(ctx.env_vars["LOG_LEVEL"]) # Validate env vars, now that we have logging available validate_env.validate_env_vars(ctx) diff --git a/src/source_repo/svn.py b/src/source_repo/svn.py index 9cbebeb..ebedef7 100644 --- a/src/source_repo/svn.py +++ b/src/source_repo/svn.py @@ -3,8 +3,8 @@ # Import repo-converter modules from utils.context import Context -from utils.log import log, set_job_result -from utils import cmd, git, lockfiles +from utils.logging import log +from utils import cmd, git, lockfiles, logging # Import Python standard modules from datetime import datetime @@ -300,15 +300,15 @@ def _check_if_conversion_is_already_running_in_another_process( log_failure_message += f"with command: {args}; " if log_failure_message: - set_job_result(ctx, "skipped", log_failure_message, False) + logging.set_job_result(ctx, "skipped", log_failure_message, False) log(ctx, f"Skipping repo conversion job", "info") return True else: # No processes running, can proceed, break out of the max_retries loop break - except Exception as exception: - log(ctx, f"Failed check {i} of {max_retries} if fetching process is already running. Exception: {type(exception)}: {exception}", "warning") + except Exception as e: + log(ctx, f"Failed check {i} of {max_retries} if fetching process is already running", "warning", exception=e) return False @@ -325,6 +325,10 @@ def _initialize_pysvn(ctx: Context) -> pysvn.Client: password = job_config.get("password") pysvn_client = pysvn.Client() + # Set the exception style for more verbose output + # https://pysvn.sourceforge.io/Docs/pysvn_prog_ref.html#pysvn_clienterror + pysvn_client.exception_style = 1 + # Handle the untrusted TLS certificate prompt if disable_tls_verification: @@ -397,8 +401,9 @@ def _test_connection_and_credentials( repo_url = job_config.get("repo_url") # Variables to track execution and break out of the retry loop - tries_attempted = 1 + svn_info = {} svn_info_success = False + tries_attempted = 1 while True: @@ -460,11 +465,23 @@ def _test_connection_and_credentials( ): svn_info_success = True - except: - log_failure_message = "pysvn exception" - set_job_result(ctx, "skipped", log_failure_message, False) - log(ctx, log_failure_message, "error", {"svn_info": svn_info}) - raise + except pysvn.ClientError as e: + + # Handle the pysvn module's custom exception type + if hasattr(e, "args") and len(e.args) >= 2: + + errors_list = [] + + for message, code in e.args[1]: + errors_list.append(f"{code}: {message}") + + svn_info["errors"] = errors_list + + else: + svn_info["errors"] = logging.breakup_lists_and_strings(str(e)) + + except Exception as e: + log(ctx, "pysvn exception", "error", {"svn_info": svn_info}, exception=e) # If the command exited successfully, return here @@ -473,13 +490,16 @@ def _test_connection_and_credentials( if tries_attempted > 1: log(ctx, f"Successfully connected to repo remote after {tries_attempted} tries", "warning") + if svn_info.get("errors"): + svn_info.pop("errors") + return True # If we've hit the max_retries limit, return here elif tries_attempted >= max_retries: log_failure_message = f"svn info failed to connect to repo remote, reached max retries {max_retries}" - set_job_result(ctx, "skipped", log_failure_message, False) + logging.set_job_result(ctx, "skipped", log_failure_message, False) log(ctx, f"{log_failure_message}", "error", {"svn_info": svn_info}) return False @@ -536,20 +556,20 @@ def _check_if_repo_exists_locally(ctx: Context, event: str = "") -> bool: if urls_match and has_commits: - set_job_result(ctx, "fetching", "valid repo found on disk, with matching URL, and some commits") + logging.set_job_result(ctx, "fetching", "valid repo found on disk, with matching URL, and some commits") return True else: - set_job_result(ctx, "creating", "valid repo not found on disk") + logging.set_job_result(ctx, "creating", "valid repo not found on disk") elif "end" in event: if urls_match and has_commits: - set_job_result(ctx, f"{job_result_action} succeeded", "valid repo found on disk after fetch, with matching URL, and some commits") + logging.set_job_result(ctx, f"{job_result_action} succeeded", "valid repo found on disk after fetch, with matching URL, and some commits") return True else: - set_job_result(ctx, f"{job_result_action} failed", "repo not valid after fetch", False) + logging.set_job_result(ctx, f"{job_result_action} failed", "repo not valid after fetch", False) else: if urls_match and has_commits: @@ -767,11 +787,11 @@ def _check_if_repo_up_to_date(ctx: Context) -> bool: last_changed_rev and git_latest_commit_rev_begin == last_changed_rev ): - set_job_result(ctx, "skipped", "repo up to date", True) + logging.set_job_result(ctx, "skipped", "repo up to date", True) return True else: - set_job_result(ctx, "fetching", "repo out of date") + logging.set_job_result(ctx, "fetching", "repo out of date") return False @@ -1076,7 +1096,7 @@ def _check_git_svn_fetch_success(ctx: Context, git_svn_fetch_result: dict) -> bo success = True log_level = "info" - set_job_result(ctx, action, reason, success) + logging.set_job_result(ctx, action, reason, success) log(ctx, f"{reason}", log_level, structured_log_dict) return success @@ -1131,7 +1151,7 @@ def _remove_non_errors_from_git_svn_fetch_output(ctx: Context, git_svn_fetch_out for ignore_line in ignore_lines: - compiled_regex = re.compile(ignore_line) + compiled_regex = re.compile(ignore_line, re.IGNORECASE) ignore_lines_compiled_regexes.append((ignore_line, compiled_regex)) # Remove the ignored lines, and empty lines from the output list @@ -1224,7 +1244,7 @@ def _find_errors_in_svn_output(ctx: Context, svn_output: list = []) -> list: for error_category, patterns in error_message_regex_patterns_dict.items(): for pattern in patterns: regex_pattern = rf".*{pattern}.*" - compiled_regex = re.compile(regex_pattern) + compiled_regex = re.compile(regex_pattern, re.IGNORECASE) compiled_patterns.append((error_category, compiled_regex)) # Single pass through output lines - O(lines x patterns) diff --git a/src/utils/cmd.py b/src/utils/cmd.py index e26edbd..8f4c408 100644 --- a/src/utils/cmd.py +++ b/src/utils/cmd.py @@ -2,7 +2,7 @@ # Utility functions to execute external binaries, fork child processes, and track / cleanup child processes # Import repo-converter modules -from utils.log import log +from utils.logging import log from utils.context import Context from utils import lockfiles @@ -97,10 +97,11 @@ def _get_process_metadata(ctx: Context, process: psutil.Process) -> Dict: def log_process_status( - ctx: Context, - subprocess_psutils_dict: Dict = {}, - subprocess_dict: Dict = {}, - log_level: str = "", + ctx: Context, + subprocess_psutils_dict: Dict = {}, + subprocess_dict: Dict = {}, + log_level: str = "", + exception = None, ) -> None: """ Log detailed process status information including PID, runtime, and process metadata. @@ -117,11 +118,10 @@ def log_process_status( Logs process status information via the logging system """ - log_process_status_dict = { - "subprocess_dict": subprocess_dict, - "subprocess_psutils_dict": subprocess_psutils_dict, - } - + # log_process_status_dict = { + # "subprocess_dict": subprocess_dict, + # "subprocess_psutils_dict": subprocess_psutils_dict, + # } # log(ctx, "log_process_status", "debug", log_process_status_dict) # Take shallow copies of the dicts, so we can modify top level keys here to reduce duplicates in log output, without affecting the dict in the calling function; if we need to modify nested objects, then we'll need to switch to deep copies @@ -219,7 +219,7 @@ def log_process_status( structured_log_dict["psutils"] = psutils_dict_output # Log the event - log(ctx, status_message, log_level, structured_log_dict) + log(ctx, status_message, log_level, structured_log_dict, exception=exception) def run_subprocess( @@ -229,7 +229,6 @@ def run_subprocess( quiet: Optional[bool] = False, name: Optional[str] = "", stderr: Optional[str] = "stdout", - expect: Union[tuple[str,str], List[tuple[str,str]], None] = "", ) -> Dict[str, Any]: """ Middleware function to @@ -249,15 +248,13 @@ def run_subprocess( "stdout" (default), stderr stream is redirected to stdout stream "ignore", stderr stream is redirected to /dev/null "stderr", stderr stream is stored at return_dict["stderr"] - expect: - (prompt:str, response:str), if prompt is found in the stdout stream, then insert response into stdin stream Notes: Use log_process_status() in this function, to format and print process stats """ # Dict for psutils to fill with .as_dict() function - subprocess_psutils_dict = {} + subprocess_psutils_dict = {} # Dict for anything other functions need to consume, # which isn't set in subprocess_psutils_dict @@ -271,6 +268,7 @@ def run_subprocess( subprocess_dict["success"] = None # true / false; if false, the reason field should have a value subprocess_dict["truncated_output"] = None # For logging + # Normalize args as a string for log output if isinstance(args, list): subprocess_dict["args"] = " ".join(args) @@ -292,17 +290,6 @@ def run_subprocess( stderr_int = subprocess.STDOUT - - # Set the needed Popen args, which are different if expect is provided - bufsize = -1 - shell = False - - if expect: - # args = " ".join(args) - bufsize = 1 - # shell = True - - # Which log level to emit log events at, # so we can increase the log_level depending on process success / fail / quiet # so events are only logged if this level his higher than the LOG_LEVEL the container is running at @@ -321,8 +308,6 @@ def run_subprocess( # TODO: Disable text = True, and handle stdin / out / err pipes as byte streams, so that stdout can be checked without waiting for a newline sub_process = psutil.Popen( args = args, - bufsize = bufsize, - shell = shell, stderr = stderr_int, stdin = subprocess.PIPE, stdout = subprocess.PIPE, @@ -334,6 +319,9 @@ def run_subprocess( # Try to read process metadata, and catch exceptions when the process finishes faster than its metadata can be read try: + # Exception object, to pass to log function, if an exception is raised and needs to be logged + e = None + # Immediately capture basic process info, # in case the process exits faster than psutils can get the full .as_dict() # before it can finish, or SIGCHLD can reap it @@ -351,7 +339,7 @@ def run_subprocess( subprocess_dict["pgid"] = os.getpgid(subprocess_dict["pid"]) # Process finished before we could get detailed info - except (psutil.NoSuchProcess, ProcessLookupError, FileNotFoundError) as exception: + except (psutil.NoSuchProcess, ProcessLookupError, FileNotFoundError) as e: subprocess_dict["status_message"] = "finished" subprocess_dict["status_message_reason"] = "before getting process metadata" @@ -363,89 +351,18 @@ def run_subprocess( # Either started, with psutil dict, # or finished before getting process metadata, with pid if not quiet: - log_process_status(ctx, subprocess_psutils_dict, subprocess_dict) + log_process_status(ctx, subprocess_psutils_dict, subprocess_dict, exception=e) + + # Reset exception object, so a prior exception isn't printed more than once + e = None + # Capture stdout and stderr std_err_list = [] std_err_string = "" std_out_list = [] std_out_string = "" - if expect: - pass - # # On exit of this with block, standard file descriptors are closed, and the process is waited / returncode attribute set. - # with sub_process: - - # log(ctx, "with sub_process") - - # if sub_process.stderr: - - # log(ctx, "if sub_process.stderr") - - # for std_err_line in sub_process.stderr: - - # log(ctx, f"for std_err_line {std_err_line} in sub_process.stderr {sub_process.stderr}") - - # # Remove whitespaces, and skip empty lines - # std_err_line = std_err_line.strip() - # if not std_err_line: - # continue - - # std_err_list.append(std_err_line) - # log(ctx, f"std_err_list {std_err_list}.append(std_err_line {std_err_line})") - - # # Loop through the list of tuples passed in to the expect parameter - # for prompt, response in expect: - - # log(ctx, f"for prompt {prompt}, response {response} in expect {expect}") - - # # If the first part of the tuple is found in the output line - # if prompt in std_err_line: - - # log(ctx, f"prompt {prompt} is in std_err_line {std_err_line}") - - # # Send the second part into stdin - # sub_process.stdin.write(f"{response}\n") - - # # And flush the buffer - # sub_process.stdin.flush() - - # else: - # log(ctx, f"prompt {prompt} is NOT in std_err_line {std_err_line}") - - - # for std_out_line in sub_process.stdout: - - # log(ctx, f"std_out_line: {std_out_line}") - - # # Remove whitespaces, and skip empty lines - # std_out_line = std_out_line.strip() - # if not std_out_line: - # continue - - # std_out_list.append(std_out_line) - # log(ctx, f"std_out_list: {std_out_list}") - - # # Loop through the list of tuples passed in to the expect parameter - # for prompt, response in expect: - - # log(ctx, f"for prompt {prompt}, response {response} in expect {expect}") - - # # If the first part of the tuple is found in the output line - # if prompt in std_out_line: - - # log(ctx, f"prompt {prompt} is in std_out_line {std_out_line}") - - # # Send the second part into stdin - # sub_process.stdin.write(f"{response}\n") - - # # And flush the buffer - # sub_process.stdin.flush() - - # else: - # log(ctx, f"prompt {prompt} is NOT in std_out_line {std_out_line}") - - - elif password: + if password: # If password is provided to this function, # feed the password string into the subprocess' stdin pipe; @@ -492,12 +409,12 @@ def run_subprocess( subprocess_dict["log_level"] = "error" # Catching the CalledProcessError exception, - # only to catch in case that the subprocess' sub_process _itself_ raised an exception - # not necessarily any below processes the subprocess createdsubprocess_dict["command"] - except subprocess.CalledProcessError as exception: + # only catches exceptions that the subprocess _itself_ raises, + # not necessarily any child processes + except subprocess.CalledProcessError as e: subprocess_dict["status_message"] = "finished" - subprocess_dict["status_message_reason"] = f"raised an exception: {type(exception)}, {exception.args}, {exception}" + subprocess_dict["status_message_reason"] = "raised an exception" subprocess_dict["success"] = False if not quiet: @@ -524,7 +441,7 @@ def run_subprocess( subprocess_dict["status_message_reason"] = "failed due to a lock file" if not (quiet and subprocess_dict["log_level"] == "debug"): - log_process_status(ctx, subprocess_psutils_dict, subprocess_dict) + log_process_status(ctx, subprocess_psutils_dict, subprocess_dict, exception=e) return subprocess_dict @@ -576,9 +493,9 @@ def status_update_and_cleanup_zombie_processes(ctx: Context) -> None: process_pids_to_wait_for.add(process_parents_pid) - except psutil.NoSuchProcess as exception: + except psutil.NoSuchProcess as e: - log(ctx, f"Caught an exception when listing parents of processes: {exception}", "debug") + log(ctx, f"Caught an exception when listing parents of processes: {e}", "debug", exception=e) # Remove this script's PID so it's not waiting on itself process_pids_to_wait_for.discard(os_this_pid) @@ -615,11 +532,11 @@ def status_update_and_cleanup_zombie_processes(ctx: Context) -> None: subprocess_dict["status_message"] = "finished" subprocess_dict["status_message_reason"] = "on cleanup" - except psutil.NoSuchProcess as exception: + except psutil.NoSuchProcess as e: subprocess_dict["status_message"] = "finished" subprocess_dict["status_message_reason"] = "on wait" - except psutil.TimeoutExpired as exception: + except psutil.TimeoutExpired as e: # Ignore logging main function processes which are still running if "cmdline" in subprocess_psutils_dict.keys() and subprocess_psutils_dict["cmdline"] == ["/usr/bin/python3", "/sg/repo-converter/src/main.py"]: @@ -639,9 +556,9 @@ def status_update_and_cleanup_zombie_processes(ctx: Context) -> None: # Get latest output subprocess_dict["status_message_reason"] = f"" - except Exception as exception: + except Exception as e: subprocess_dict["status_message"] = "raised an exception while waiting" - subprocess_dict["status_message_reason"] = f"{type(exception)}, {exception.args}, {exception}" + subprocess_dict["status_message_reason"] = f"Exception: {type(e)}, {e.args}, {e}" if "pid" not in subprocess_psutils_dict.keys() and "pid" not in subprocess_dict.keys(): subprocess_dict["pid"] = process_pid_to_wait_for diff --git a/src/utils/concurrency_manager.py b/src/utils/concurrency_manager.py index 33b7535..8e1c8b8 100644 --- a/src/utils/concurrency_manager.py +++ b/src/utils/concurrency_manager.py @@ -6,7 +6,8 @@ # Import repo-converter modules from utils.context import Context -from utils.log import log, set_job_result +from utils.logging import log +from utils import logging # Import Python standard modules from datetime import datetime @@ -91,7 +92,7 @@ def acquire_job_slot(self, ctx: Context, job: dict) -> bool: for active_job_trace, active_job_repo, active_job_timestamp in self.active_jobs[server_name]: if active_job_repo == this_job_repo: - set_job_result(ctx, "skipped", "Repo job already in progress", False) + logging.set_job_result(ctx, "skipped", "Repo job already in progress", False) log(ctx, f"Skipping; Repo job already in progress; started at: {active_job_timestamp}; trace: {active_job_trace}; running for: {int(time.time() - active_job_timestamp)} seconds", "info", log_job) return False @@ -293,12 +294,12 @@ def get_status(self, ctx: Context) -> dict: status["active_jobs"][server_name] = status_active_jobs_list except Exception as e: - log(ctx, f"Error processing active jobs for {server_name}: {e}", "warning") + log(ctx, f"Error processing active jobs for {server_name}", "warning", exception=e) finally: self.active_jobs_lock.release() except Exception as e: - log(ctx, f"Error in get_status() processing servers: {e}", "warning") + log(ctx, f"Error in get_status() processing servers", "warning", exception=e) finally: self.per_server_semaphores_lock.release() @@ -339,7 +340,7 @@ def get_status(self, ctx: Context) -> dict: status["queued_jobs"][server_name] = status_queued_jobs_list except Exception as e: - log(ctx, f"Error in get_status() processing queued jobs: {e}", "warning") + log(ctx, f"Error in get_status() processing queued jobs", "warning", exception=e) finally: self.queued_jobs_lock.release() @@ -398,4 +399,4 @@ def release_job_slot(self, ctx: Context, job: dict) -> None: # log(ctx, f"Released job slot", "debug", log_job) except ValueError as e: - log(ctx, f"Error releasing job slot: {e}", "error", log_job) + log(ctx, f"Error releasing job slot", "error", log_job, exception=e) diff --git a/src/utils/fork_conversion_processes.py b/src/utils/fork_conversion_processes.py index f70df99..8b093c6 100644 --- a/src/utils/fork_conversion_processes.py +++ b/src/utils/fork_conversion_processes.py @@ -14,7 +14,7 @@ from source_repo import svn from utils.concurrency_manager import ConcurrencyManager from utils.context import Context -from utils.log import log +from utils.logging import log # Import Python standard modules import multiprocessing diff --git a/src/utils/git.py b/src/utils/git.py index 04c4c05..a77161e 100644 --- a/src/utils/git.py +++ b/src/utils/git.py @@ -5,7 +5,7 @@ # Import repo-converter modules from utils import cmd from utils.context import Context -from utils.log import log +from utils.logging import log # Import standard libraries import os @@ -122,9 +122,9 @@ def cleanup_branches_and_tags(ctx: Context) -> None: continue - except Exception as exception: + except Exception as e: - log(ctx, f"Exception while cleaning branches and tags: {exception}", "error") + log(ctx, f"Exception while cleaning branches and tags", "error", exception=e) continue # If the path is a local tag, then delete it diff --git a/src/utils/lockfiles.py b/src/utils/lockfiles.py index d665966..25a0b14 100644 --- a/src/utils/lockfiles.py +++ b/src/utils/lockfiles.py @@ -5,7 +5,7 @@ # Import repo-converter modules from utils import cmd from utils.context import Context -from utils.log import log +from utils.logging import log # Import Python standard modules import os @@ -70,8 +70,8 @@ def clear_lock_files(ctx: Context) -> bool: with open(found_lock_file, "r") as lock_file_object: lock_file_content = lock_file_object.read() - except UnicodeDecodeError as exception: - lock_file_content = exception + except UnicodeDecodeError as e: + lock_file_content = e log(ctx, f"Process failed to start due to a lock file in the repo at {found_lock_file}, but no other process is running with {command} for this repo; deleting the lock file so it'll try again on the next run; lock file content: {lock_file_content}", "warning") @@ -80,8 +80,8 @@ def clear_lock_files(ctx: Context) -> bool: lock_file_deleted = True - except subprocess.CalledProcessError as exception: - log(ctx, f"Failed to delete lock file at {found_lock_file} with exception: {type(exception)}, {exception.args}, {exception}", "error") + except subprocess.CalledProcessError as e: + log(ctx, f"Failed to delete lock file at {found_lock_file} with exception", "error", exception=e) except FileNotFoundError: log(ctx, f"Lock file found at {found_lock_file}, but didn't exist at the time of deletion", "error") diff --git a/src/utils/logger.py b/src/utils/logger.py index a836791..47b38a1 100644 --- a/src/utils/logger.py +++ b/src/utils/logger.py @@ -18,7 +18,7 @@ import json -def configure_logger(log_level: str) -> None: +def configure(log_level: str) -> None: """ Configure structured logging with JSON Lines output format using structlog """ diff --git a/src/utils/log.py b/src/utils/logging.py similarity index 85% rename from src/utils/log.py rename to src/utils/logging.py index ff22408..f67df7c 100644 --- a/src/utils/log.py +++ b/src/utils/logging.py @@ -7,9 +7,11 @@ # Import Python standard modules from datetime import datetime +from sys import exit import inspect import os import time +import traceback # Import third party modules import structlog @@ -23,6 +25,7 @@ def log( correlation_id: str = "", log_env_vars: bool = False, log_concurrency_status: bool = False, + exception = None ) -> None: """ Enhanced logging function with structured data support. @@ -46,10 +49,12 @@ def log( # Build structured data payload structured_payload = _build_structured_payload( ctx, + event_log_level_name, structured_data, correlation_id, log_env_vars, log_concurrency_status, + exception ) # Apply redaction to the entire payload @@ -58,13 +63,19 @@ def log( # Log using structlog's logging commands, where the command is the log level's name getattr(logger, event_log_level_name.lower())(message, **redacted_payload) + # Exit the container for critical log events + if "CRITICAL" in event_log_level_name: + exit(1) + def _build_structured_payload( ctx: Context, + event_log_level_name: str, structured_data: dict = {}, correlation_id: str = "", log_env_vars: bool = False, log_concurrency_status: bool = False, + exception = None ) -> dict: """Build the complete structured data payload for logging""" @@ -102,7 +113,10 @@ def _build_structured_payload( except TypeError: pass - if ctx.env_vars.get("LOG_LEVEL") == "DEBUG": + if ( + ctx.env_vars.get("LOG_LEVEL") == "DEBUG" or + event_log_level_name in ["CRITICAL", "ERROR", "WARNING"] + ): pid = os.getpid() @@ -176,6 +190,9 @@ def _build_structured_payload( if log_concurrency_status: payload["concurrency"] = ctx.concurrency_manager.get_status(ctx) + if exception: + payload["exception"] = _get_exception_data(exception) + # Remove any null values payload = _remove_null_values(payload) @@ -260,6 +277,44 @@ def _format_uptime(uptime_seconds: float) -> str: return " ".join(parts) +def _get_exception_data(exception) -> dict: + """ + Parse attributes from the provided exception, and return them as a dict to be printed in structured logs + """ + + return_dict = {} + + if exception: + + return_dict["type"] = type(exception) + return_dict["args"] = breakup_lists_and_strings(exception.args) + + traceback_original = traceback.format_exception(exception) + return_dict["traceback"] = breakup_lists_and_strings(traceback_original) + + return return_dict + + +def breakup_lists_and_strings(input) -> list[str]: + """ + Parse individual attributes from exceptions + Take in either a string or a list of strings + Break up strings with newlines + Return a list of strings + """ + + return_list = [] + + if isinstance(input, str): + input = list([input]) + + if isinstance(input, list): + for line in input: + return_list += line.splitlines() + + return return_list + + def _remove_null_values(payload: dict) -> dict: """ Recursive function to remove keys from payload where values are null, or empty strings, diff --git a/src/utils/signal_handler.py b/src/utils/signal_handler.py index 28b5a30..da67cc2 100644 --- a/src/utils/signal_handler.py +++ b/src/utils/signal_handler.py @@ -2,7 +2,7 @@ # Utility functions to handle signals # Import repo-converter modules -from utils.log import log +from utils.logging import log from utils.context import Context from utils import cmd @@ -21,9 +21,9 @@ def register_signal_handler(ctx: Context): # log(ctx, f"Registered signal handlers","debug") - except Exception as exception: + except Exception as e: - log(ctx, f"Registering signal handlers failed with exception: {type(exception)}, {exception.args}, {exception}","critical") + log(ctx, f"Registering signal handlers failed with exception","critical", exception=e) def signal_handler(ctx: Context, incoming_signal, frame) -> None: @@ -43,14 +43,14 @@ def signal_handler(ctx: Context, incoming_signal, frame) -> None: log(ctx, "No process group to terminate", "debug") except OSError as e: - log(ctx, f"Error terminating process group: {e}", "error") + log(ctx, f"Exception terminating process group", "error", exception=e) # Terminate any active multiprocessing jobs try: terminate_multiprocessing_jobs_on_shutdown(ctx, timeout=15) # Shorter timeout during shutdown except Exception as e: - log(ctx, f"Error during multiprocessing job termination: {e}", "error") + log(ctx, f"Exception during multiprocessing job termination", "error", exception=e) # Clean up any remaining zombie processes cmd.status_update_and_cleanup_zombie_processes(ctx) @@ -89,7 +89,7 @@ def sigchld_handler(ctx: Context, incoming_signal, frame) -> None: break except Exception as e: - log(ctx, f"Error in SIGCHLD handler: {e}", "debug") + log(ctx, f"Exception in SIGCHLD handler", "debug", exception=e) break @@ -131,4 +131,4 @@ def terminate_multiprocessing_jobs_on_shutdown(ctx: Context, timeout: int = 30) ctx.active_repo_conversion_processes.remove((process, repo_key, server_name)) except Exception as e: - log(ctx, f"Error terminating multiprocessing job: {e}", "error") + log(ctx, f"Exception terminating multiprocessing job", "error", exception=e) diff --git a/src/utils/status_monitor.py b/src/utils/status_monitor.py index e5a6e34..b71a111 100644 --- a/src/utils/status_monitor.py +++ b/src/utils/status_monitor.py @@ -4,7 +4,7 @@ # Import repo-converter modules from utils.context import Context -from utils.log import log +from utils.logging import log from utils import cmd # Import Python standard modules @@ -38,15 +38,14 @@ def status_monitor_loop() -> None: log(ctx, "Concurrency status", "debug", log_concurrency_status=True) - except (BrokenPipeError, ConnectionResetError) as exception: + except (BrokenPipeError, ConnectionResetError) as e: + # These errors occur during shutdown when manager connections are closed - log(ctx, f"Connection error in concurrency monitor (likely during shutdown): {exception}", "debug") + log(ctx, f"Connection error in concurrency monitor (likely during shutdown)", "debug", exception=e) break - except Exception as exception: - log(ctx, f"Error in concurrency monitor: {exception}", "error") - - raise exception + except Exception as e: + log(ctx, f"Exception in concurrency monitor", "error", exception=e) time.sleep(interval)