From f7c8cbce2f1a0e9f35beaf3a34dcd6ccedfbfce9 Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Wed, 4 Feb 2026 22:17:52 -0500 Subject: [PATCH] trying to run celery queue with airflow --- airflow/dags/run_queue.py | 3 ++- client.py | 32 +++++++++++++++++++------------- worker.py | 9 ++++++++- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index f78c8e1..4e09b19 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -21,8 +21,9 @@ def run_queue(): @task def run_the_queue(): - app.send_task("worker.get_github_data", args=[0, 500]) + app.send_task("worker.get_data_from_queue", args=[100, 500]) + run_the_queue() diff --git a/client.py b/client.py index f9d99ae..8636440 100644 --- a/client.py +++ b/client.py @@ -7,20 +7,9 @@ import polars as pl -today = datetime.now().strftime("%Y-%m-%d") +def save_to_parquet(the_data): + today = datetime.now().strftime("%Y-%m-%d") -print("Waiting for Celery task to complete") - -try: - print("Getting the result") - response = build_repo_chord(total=5000, batch_size=500) - the_data = response.get(timeout=3600) # 1 hour timeout - print(f"Result: {the_data}") - -except Exception as e: - print(f"Error: {e}") - -else: if not Path(f"data/{today}/").exists(): Path(f"data/{today}").mkdir(parents=True, exist_ok=True) @@ -30,3 +19,20 @@ df = pl.DataFrame(the_data) df.write_parquet(f"data/{today}/github_data.parquet", compression="zstd") print("Valid Parquet data") + + +def get_data_from_queue(): + try: + print("Getting the result") + response = build_repo_chord(total=5000, batch_size=500) + the_data = response.get(timeout=3600) # 1 hour timeout + print(f"Result: {the_data}") + + except Exception as e: + print(f"Error: {e}") + + return save_to_parquet(the_data) + + +if __name__ == "__main__": + get_data_from_queue() diff --git a/worker.py b/worker.py index 8e37ee3..d9e24ea 100644 --- a/worker.py +++ b/worker.py @@ -3,12 +3,12 @@ from pathlib import Path import json import boto3 -import time from celery import Celery, group, chord from celery.utils.log import get_task_logger from datetime import datetime from github import Auth, Github, GithubException from dotenv import load_dotenv +from client import get_data_from_queue from pydantic_models.github import RabbitMQ_Data_Validation from rb_queue.rabbitmq import get_connection, QUEUE_NAME load_dotenv() @@ -183,6 +183,13 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500): return chord(header)(aggregate_results.s()) +@app.task +def run_queue_and_save(total: int = 5000, batch_size: int = 500): + return get_data_from_queue(total=total, batch_size=batch_size) + + + + # old code that did not work # @app.task # def distribute_tasks():