From e6b971fc0463864fc08b51b3e263984271e0e6d4 Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Sat, 31 Jan 2026 18:35:09 -0500 Subject: [PATCH] testing airflow and fixed the data type of the day when the data was extracted --- airflow/.astro/config.yaml | 2 ++ airflow/Dockerfile | 1 + .../dags/.airflowignore | 0 airflow/dags/my_dag.py | 17 +++++++++++++++++ celery_logging_config.py | 0 worker.py | 14 +++++++------- 6 files changed, 27 insertions(+), 7 deletions(-) create mode 100644 airflow/.astro/config.yaml create mode 100644 airflow/Dockerfile rename c_logging_config.py => airflow/dags/.airflowignore (100%) create mode 100644 airflow/dags/my_dag.py create mode 100644 celery_logging_config.py diff --git a/airflow/.astro/config.yaml b/airflow/.astro/config.yaml new file mode 100644 index 0000000..ba92922 --- /dev/null +++ b/airflow/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: airflow diff --git a/airflow/Dockerfile b/airflow/Dockerfile new file mode 100644 index 0000000..0eeab8c --- /dev/null +++ b/airflow/Dockerfile @@ -0,0 +1 @@ +FROM astrocrpublic.azurecr.io/runtime:3.1-11 diff --git a/c_logging_config.py b/airflow/dags/.airflowignore similarity index 100% rename from c_logging_config.py rename to airflow/dags/.airflowignore diff --git a/airflow/dags/my_dag.py b/airflow/dags/my_dag.py new file mode 100644 index 0000000..295f07b --- /dev/null +++ b/airflow/dags/my_dag.py @@ -0,0 +1,17 @@ +from airflow.sdk import dag, task +from pendulum import datetime + +@dag( + schedule="@daily", + start_date=datetime(2026, 1, 31), + description="test dag", + tags=["first dag"], + max_consecutive_failed_dag_runs=3, +) +def my_dag(): + + @task + def _task_a(): + print("hello") + + _task_a() \ No newline at end of file diff --git a/celery_logging_config.py b/celery_logging_config.py new file mode 100644 index 0000000..e69de29 diff --git a/worker.py b/worker.py index aae7a5d..55506ea 100644 --- a/worker.py +++ b/worker.py @@ -15,7 +15,7 @@ logger = get_task_logger(__name__) -todays_date = datetime.now().strftime("%m-%d-%Y") +todays_date = datetime.now().isoformat() S3_BUCKET_NAME = "github-etl-data-bucket" @@ -129,12 +129,13 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git remaining_api_calls = github_instance.rate_limiting remaining = remaining_api_calls[0] - if remaining == 2: - print(f"Reached batch size limit of {batch_size}") + if counter >= batch_size: + print(f"Reached batch size of {batch_size}") break - # # start_in_repo_num = counter - # # github_instance = gh_two + if remaining < 100: + print(f"Rate limit approaching ({remaining}). Stopping worker.") + break # # raise self.retry(countdown=3600) # break @@ -191,5 +192,4 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500): # for start in range(0, 5000, 500) # ]) -# return chord(jobs)() - \ No newline at end of file +# return chord(jobs)() \ No newline at end of file