diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index dfccf63..75400c4 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -3,12 +3,11 @@ from pendulum import datetime from celery import Celery from github import Auth, Github, GithubException +from client import get_data_from_queue +from datetime import timedelta +import time -api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") -auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) -gh, gh_two = Github(auth=auth), Github(auth=auth_two) - app = Celery( 'airflow_client', broker = os.getenv('CELERY_BROKER_URL'), @@ -22,12 +21,16 @@ description="Run Celery queue with RabbitMQ as the broker \ in order to get GitHub data from the GitHub API", tags=["celery_queue"], - max_consecutive_failed_dag_runs=3, + max_consecutive_failed_dag_runs=3 ) def run_github_data_queue(): - @task - def check_rate_limit(): + @task(do_xcom_push=True, multiple_outputs=True) + def check_rate_limit(**context): + api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") + auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) + gh, gh_two = Github(auth=auth), Github(auth=auth_two) + rate_limit = gh.rate_limiting print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total") @@ -36,16 +39,29 @@ def check_rate_limit(): "total": rate_limit[1] } + @task + def run_queue(**context): + rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining") + max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total") + + if rate_limit > 100: + print("IT WORKS") + print(rate_limit) + + celery_worker = app.send_task("worker.get_github_data") + + print(celery_worker) + + time.sleep(500) @task - def run_the_queue(rate_limit: str): - print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}') + def save_data_from_queue(): - if rate_limit["remaining"] > 4900: - app.send_task("worker.get_data_from_queue", args=[100, 500]) + get_data_from_queue() + + - val = check_rate_limit() - run_the_queue(rate_limit=val) + check_rate_limit() >> run_queue() >> save_data_from_queue() run_github_data_queue() \ No newline at end of file diff --git a/airflow/docker-compose.override.yml b/airflow/docker-compose.override.yml index 0cf80ef..4c415b8 100644 --- a/airflow/docker-compose.override.yml +++ b/airflow/docker-compose.override.yml @@ -1,13 +1,32 @@ services: api-server: - networks: - - etl-shared + volumes: + - /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project + environment: + PYTHONPATH: /usr/local/airflow/project + networks: [etl-shared] + + dag-processor: + volumes: + - /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project + environment: + PYTHONPATH: /usr/local/airflow/project + networks: [etl-shared] + scheduler: - networks: - - etl-shared + volumes: + - /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project + environment: + PYTHONPATH: /usr/local/airflow/project + networks: [etl-shared] + triggerer: - networks: - - etl-shared + volumes: + - /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project + environment: + PYTHONPATH: /usr/local/airflow/project + networks: [etl-shared] + networks: etl-shared: diff --git a/client.py b/client.py index 8636440..1846479 100644 --- a/client.py +++ b/client.py @@ -1,29 +1,34 @@ import logging from pathlib import Path -from datetime import datetime, timezone +from datetime import datetime from celery.result import AsyncResult from worker import build_repo_chord from rb_queue.rabbitmq import consume_repos import polars as pl +DIRECT = Path("/usr/local/airflow/project/data") def save_to_parquet(the_data): today = datetime.now().strftime("%Y-%m-%d") - if not Path(f"data/{today}/").exists(): - Path(f"data/{today}").mkdir(parents=True, exist_ok=True) + # if not Path(f"data/{today}/").exists(): + # Path(f"data/{today}").mkdir(parents=True, exist_ok=True) + + fi_direct = DIRECT / today + fi_direct.mkdir(parents=True, exist_ok=True) print("This is the else of the client") print(the_data) df = pl.DataFrame(the_data) - df.write_parquet(f"data/{today}/github_data.parquet", compression="zstd") + df.write_parquet(f"{fi_direct}/github_data.parquet", compression="zstd") print("Valid Parquet data") def get_data_from_queue(): try: print("Getting the result") + response = build_repo_chord(total=5000, batch_size=500) the_data = response.get(timeout=3600) # 1 hour timeout print(f"Result: {the_data}") @@ -36,3 +41,4 @@ def get_data_from_queue(): if __name__ == "__main__": get_data_from_queue() + diff --git a/worker.py b/worker.py index d9e24ea..e18c6e8 100644 --- a/worker.py +++ b/worker.py @@ -8,7 +8,6 @@ from datetime import datetime from github import Auth, Github, GithubException from dotenv import load_dotenv -from client import get_data_from_queue from pydantic_models.github import RabbitMQ_Data_Validation from rb_queue.rabbitmq import get_connection, QUEUE_NAME load_dotenv() @@ -133,7 +132,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git print(f"Reached batch size of {batch_size}") break - if remaining < 100: + if remaining < 20: print(f"Rate limit approaching ({remaining}). Stopping worker.") break @@ -183,13 +182,6 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500): return chord(header)(aggregate_results.s()) -@app.task -def run_queue_and_save(total: int = 5000, batch_size: int = 500): - return get_data_from_queue(total=total, batch_size=batch_size) - - - - # old code that did not work # @app.task # def distribute_tasks():