From 9a354f006bf75a3af240881e31029ca3a8b8bd4f Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Tue, 17 Feb 2026 11:05:32 -0500 Subject: [PATCH] Enhance Airflow and Celery integration with improved logging and Docker configuration --- airflow/dags/run_queue.py | 15 ++++----- celery_logging_config.py | 23 +++++++++++++ docker-compose.yml | 5 +++ fluent-bit/fluent-bit.conf | 0 rb_queue/rabbitmq.py | 2 +- worker.py | 68 ++++++++++++++++---------------------- 6 files changed, 64 insertions(+), 49 deletions(-) delete mode 100644 fluent-bit/fluent-bit.conf diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index 3c8deff..7f5d36f 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -24,8 +24,8 @@ tags=["celery_queue"], max_consecutive_failed_dag_runs=3 ) -def run_github_data_queue(): - +def run_queue(): + @task(do_xcom_push=True, multiple_outputs=True) def check_rate_limit(**context): api_token = os.getenv("GITHUB_API_TOKEN") @@ -38,15 +38,14 @@ def check_rate_limit(**context): return { "remaining": rate_limit[0], "total": rate_limit[1] - } + } @task - def run_queue(**context): + def send_task_to_celery_worker(**context): rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining") max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total") - - app.send_task("worker.get_github_data") + app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500}) print("celery_worker") @@ -57,7 +56,7 @@ def save_data_from_queue(): - check_rate_limit() >> run_queue() >> save_data_from_queue() + check_rate_limit() >> send_task_to_celery_worker() >> save_data_from_queue() -run_github_data_queue() \ No newline at end of file +run_queue() \ No newline at end of file diff --git a/celery_logging_config.py b/celery_logging_config.py index e69de29..3753932 100644 --- a/celery_logging_config.py +++ b/celery_logging_config.py @@ -0,0 +1,23 @@ +import logging +import os +from math import log +from celery.signals import after_setup_logger, after_setup_task_logger + +def _configure(logger: logging.Logger) -> None: + logger.handlers.clear() + logger.setLevel(os.getenv("CELERY_LOG_LEVEL", "INFO").upper()) + logger.propagate = False + + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter( + "%(asctime)s %(levelname)s %(name)s %(message)s" + )) + logger.addHandler(handler) + +@after_setup_logger.connect +def setup_root_logger(logger, *args, **kwargs): + _configure(logger) + +@after_setup_task_logger.connect +def setup_task_logger(logger, *args, **kwargs): + _configure(logger) \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d8e5d0f..8f24488 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,11 @@ services: - ./data:/app/data command: celery -A worker worker --loglevel=info --concurrency=8 env_file: ./config/.env + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "5" deploy: resources: limits: diff --git a/fluent-bit/fluent-bit.conf b/fluent-bit/fluent-bit.conf deleted file mode 100644 index e69de29..0000000 diff --git a/rb_queue/rabbitmq.py b/rb_queue/rabbitmq.py index e5b4643..de005cb 100644 --- a/rb_queue/rabbitmq.py +++ b/rb_queue/rabbitmq.py @@ -27,7 +27,7 @@ def consume_repos(callback): channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) - # properties is needed becayse channel.basic_consume expects 4 parameters, properties does not have + # properties is needed because channel.basic_consume expects 4 parameters, properties does not have # any value but it is required to meet the 4 parameters requirement def on_message(ch, method, properties, body): try: diff --git a/worker.py b/worker.py index e18c6e8..33153bb 100644 --- a/worker.py +++ b/worker.py @@ -1,3 +1,4 @@ +import logging import os import pika from pathlib import Path @@ -10,6 +11,7 @@ from dotenv import load_dotenv from pydantic_models.github import RabbitMQ_Data_Validation from rb_queue.rabbitmq import get_connection, QUEUE_NAME +import celery_logging_config # registers Celery logging signal handlers load_dotenv() @@ -25,7 +27,6 @@ region_name=os.getenv("AWS_REGION", "us-east-1") ) - def save_to_s3(data, file_directory): s3_client.put_object( Bucket=S3_BUCKET_NAME, @@ -33,7 +34,6 @@ def save_to_s3(data, file_directory): Body=json.dumps(data, default=str) ) - app = Celery( 'github_repos', broker = os.getenv('CELERY_BROKER_URL'), @@ -41,22 +41,21 @@ def save_to_s3(data, file_directory): ) -api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") -auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) -gh, gh_two = Github(auth=auth), Github(auth=auth_two) - - # bind = True allows to get task data, like task id @app.task(bind=True) -def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, github_instance: Github = gh): +def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500): counter = 0 connection = None channel = None mylist = [] - repositories = github_instance.get_repos(since=start_in_repo_num) - rate_limit = github_instance.rate_limiting - print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total") + api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") + auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) + gh, gh_two = Github(auth=auth), Github(auth=auth_two) + + repositories = gh.get_repos(since=start_in_repo_num) + rate_limit = gh.rate_limiting + logger.info(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total") try: connection = get_connection() @@ -64,7 +63,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git channel.queue_declare(queue=QUEUE_NAME, durable=True) for repo in repositories: - print(f"This is the repo printing: {repo}") + logger.info(f"This is the repo printing: {repo}") try: github_data_points = { @@ -99,12 +98,12 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git except GithubException as ge: if ge.status == 403: - print(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") + logging.exception(f"Skipping blocked repo (403): {repo.full_name if hasattr(repo, 'full_name') else 'unknown'}") else: - print(f"GitHub API error {ge.status} for repo, skipping...") + logging.exception(f"GitHub API error {ge.status} for repo, skipping...") continue except Exception as e: - print(f"Error accessing repo data: {e}, skipping...") + logging.exception(f"Error accessing repo data: {e}, skipping...") continue try: @@ -118,22 +117,25 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git ) counter += 1 - print(github_data_points) + logger.info(github_data_points) except Exception as validation_error: print(f"Validation error for repo {github_data_points.get('full_name')}: {validation_error}") print("Skipping this repo and continuing") continue - remaining_api_calls = github_instance.rate_limiting + remaining_api_calls = gh.rate_limiting remaining = remaining_api_calls[0] + if counter >= 5: + break + if counter >= batch_size: - print(f"Reached batch size of {batch_size}") + logger.info(f"Reached batch size of {batch_size}") break if remaining < 20: - print(f"Rate limit approaching ({remaining}). Stopping worker.") + logger.info(f"Rate limit approaching ({remaining}). Stopping worker.") break # # raise self.retry(countdown=3600) @@ -144,27 +146,25 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git # # github account and it's credentials to have 1000 more API calls else: - print("Remaining api calls") - print(remaining) + logger.info("Remaining api calls") + logger.info(remaining) - print("Count") - print(counter) + logger.info("Count") + logger.info(counter) except Exception as e: - print("Error", e) + logger.exception("Error", e) finally: if connection: connection.close() else: - print("The connection does not exist") + logger.info("The connection does not exist") # s3_url = save_to_s3(data=repo_collection, file_directory="github_repos/test.json") logger.info(f"Processed {counter} repositories") - return mylist - @app.task def aggregate_results(results): @@ -179,16 +179,4 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500): header = [ get_github_data.s(start, batch_size) for start in range(0, total, batch_size) ] - return chord(header)(aggregate_results.s()) - - -# old code that did not work -# @app.task -# def distribute_tasks(): - -# jobs = group([ -# get_github_data.s(start, 500) -# for start in range(0, 5000, 500) -# ]) - -# return chord(jobs)() \ No newline at end of file + return chord(header)(aggregate_results.s()) \ No newline at end of file