From 2976c02eb37bf5c2673bc9db59304b82ad0e5ddc Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Wed, 11 Feb 2026 20:28:01 -0500 Subject: [PATCH] added new task to dag that gets remaining api call --- airflow/dags/run_queue.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index 4e09b19..dfccf63 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -1,7 +1,13 @@ +import os from airflow.sdk import dag, task from pendulum import datetime from celery import Celery -import os +from github import Auth, Github, GithubException + + +api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") +auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) +gh, gh_two = Github(auth=auth), Github(auth=auth_two) app = Celery( 'airflow_client', @@ -9,6 +15,7 @@ backend = os.getenv('CELERY_BACKEND_URL') ) + @dag( schedule="@hourly", start_date=datetime(2026, 2, 3), @@ -17,14 +24,28 @@ tags=["celery_queue"], max_consecutive_failed_dag_runs=3, ) -def run_queue(): +def run_github_data_queue(): + + @task + def check_rate_limit(): + rate_limit = gh.rate_limiting + print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total") + + return { + "remaining": rate_limit[0], + "total": rate_limit[1] + } + @task - def run_the_queue(): - app.send_task("worker.get_data_from_queue", args=[100, 500]) + def run_the_queue(rate_limit: str): + print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}') + + if rate_limit["remaining"] > 4900: + app.send_task("worker.get_data_from_queue", args=[100, 500]) - - run_the_queue() + val = check_rate_limit() + run_the_queue(rate_limit=val) -run_queue() \ No newline at end of file +run_github_data_queue() \ No newline at end of file