From 360a87e27740935c428f13f9711573f37eb5e09a Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Wed, 18 Feb 2026 19:47:29 -0500 Subject: [PATCH] working on airflow dag --- airflow/dags/run_queue.py | 25 +++++++++++++++---------- worker.py | 8 +++++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index 7f5d36f..66337c4 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -1,5 +1,7 @@ +from email.policy import default import os from socket import timeout +from tracemalloc import start from airflow.sdk import dag, task from pendulum import datetime from celery import Celery @@ -7,14 +9,14 @@ from client import get_data_from_queue from datetime import timedelta import time +from airflow.models import Variable +from worker import app - -app = Celery( - 'airflow_client', - broker = os.getenv('CELERY_BROKER_URL'), - backend = os.getenv('CELERY_BACKEND_URL') -) - +# app = Celery( +# 'airflow_client', +# broker = os.getenv('CELERY_BROKER_URL'), +# backend = os.getenv('CELERY_BACKEND_URL') +# ) @dag( schedule="@hourly", @@ -45,16 +47,19 @@ def send_task_to_celery_worker(**context): rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining") max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total") - app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": 1000, "batch_size": 500}) + start_with_repo_number = int(Variable.get("github_repo_number", default_var = "0")) + start_with_repo_number += 500 - print("celery_worker") + app.send_task("worker.get_github_data", kwargs={"start_in_repo_num": start_with_repo_number, "batch_size": 500}) + + Variable.set(key= "github_repo_number", value= str(start_with_repo_number)) + print("celery_worker") @task def save_data_from_queue(): get_data_from_queue() - check_rate_limit() >> send_task_to_celery_worker() >> save_data_from_queue() diff --git a/worker.py b/worker.py index 33153bb..b7f16b2 100644 --- a/worker.py +++ b/worker.py @@ -116,7 +116,6 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500): properties=pika.BasicProperties(delivery_mode=2) ) - counter += 1 logger.info(github_data_points) except Exception as validation_error: @@ -127,8 +126,6 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500): remaining_api_calls = gh.rate_limiting remaining = remaining_api_calls[0] - if counter >= 5: - break if counter >= batch_size: logger.info(f"Reached batch size of {batch_size}") @@ -152,6 +149,11 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500): logger.info("Count") logger.info(counter) + counter += 1 + + if counter >= 25: + break + except Exception as e: logger.exception("Error", e)