Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
from pendulum import datetime
from celery import Celery
from github import Auth, Github, GithubException
from client import get_data_from_queue
from datetime import timedelta
import time


api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)

app = Celery(
'airflow_client',
broker = os.getenv('CELERY_BROKER_URL'),
Expand All @@ -22,12 +21,16 @@
description="Run Celery queue with RabbitMQ as the broker \
in order to get GitHub data from the GitHub API",
tags=["celery_queue"],
max_consecutive_failed_dag_runs=3,
max_consecutive_failed_dag_runs=3
)
def run_github_data_queue():

@task
def check_rate_limit():
@task(do_xcom_push=True, multiple_outputs=True)
def check_rate_limit(**context):
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
Comment on lines +30 to +32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The GitHub client gh_two and its related authentication variables (api_token_two, auth_two) are initialized but never used within this task. This adds unnecessary overhead and should be removed to improve code clarity.

Suggested change
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
api_token = os.getenv("GITHUB_API_TOKEN")
auth = Auth.Token(api_token)
gh = Github(auth=auth)


rate_limit = gh.rate_limiting
print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")

Expand All @@ -36,16 +39,29 @@ def check_rate_limit():
"total": rate_limit[1]
}

@task
def run_queue(**context):
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")

if rate_limit > 100:
print("IT WORKS")
print(rate_limit)

celery_worker = app.send_task("worker.get_github_data")

print(celery_worker)

time.sleep(500)
Comment on lines +43 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The run_queue task has several significant issues:

  • Blocking Sleep: time.sleep(500) is an Airflow anti-pattern. It blocks a worker slot for over 8 minutes without doing useful work, which can lead to worker starvation. For delays, please use Airflow native concepts like Sensors or Deferrable Operators.
  • Ineffective Logic: The rate limit check inside the if statement does not prevent the downstream save_data_from_queue task from running, which appears to trigger the main workload. This makes the rate limit check ineffective.
  • Unused Variable: max_total_api_calls is fetched from XCom but is never used.
  • Debug Prints: The print() statements should be replaced with proper logging for use in production.

This task's logic is unclear and inefficient. It should be refactored to properly gate the data fetching pipeline based on the rate limit, perhaps using a BranchPythonOperator.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The run_queue task includes a time.sleep(500) call, which blocks an Airflow worker slot for over 8 minutes. This can quickly exhaust the worker pool if the DAG is triggered multiple times, posing a resource exhaustion Denial of Service (DoS) risk. Additionally, this pattern is highly inefficient, and the task itself appears redundant as its triggered Celery task's result is unused, and subsequent tasks seem to cover its functionality. The logic and necessity of this task should be reconsidered.


@task
def run_the_queue(rate_limit: str):
print(f'rate limit: {rate_limit["total"]}, remaining {rate_limit["remaining"]}')
def save_data_from_queue():

if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The called function get_data_from_queue() contains a critical bug. In client.py, if response.get() raises an exception (e.g., a timeout), the the_data variable will not be assigned. The except block catches the exception, but execution then proceeds to return save_to_parquet(the_data), which will raise an UnboundLocalError and crash the task. The error handling in get_data_from_queue must be improved to handle this case gracefully.

Here is a suggested fix for client.py:

# In client.py
def get_data_from_queue():
    the_data = None
    try:
        print("Getting the result")
        response = build_repo_chord(total=5000, batch_size=500)
        the_data = response.get(timeout=3600)
        print(f"Result: {the_data}")
    except Exception as e:
        print(f"Error: {e}")

    if the_data is not None:
        return save_to_parquet(the_data)

Comment on lines 57 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The save_data_from_queue task calls get_data_from_queue(), which contains a blocking call response.get(timeout=3600). This will occupy an Airflow worker for up to an hour, which is inefficient and can lead to worker starvation. For long-running asynchronous operations like this, it's better to use deferrable operators or sensors to free up worker slots while waiting.




Comment on lines +58 to 63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function definition includes unnecessary blank lines and incorrect indentation, which harms readability. Please remove them.

Suggested change
def save_data_from_queue():
if rate_limit["remaining"] > 4900:
app.send_task("worker.get_data_from_queue", args=[100, 500])
get_data_from_queue()
def save_data_from_queue():
get_data_from_queue()

val = check_rate_limit()
run_the_queue(rate_limit=val)
check_rate_limit() >> run_queue() >> save_data_from_queue()


run_github_data_queue()
31 changes: 25 additions & 6 deletions airflow/docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
services:
api-server:
networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

A medium-severity vulnerability exists due to a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...) in docker-compose.override.yml. This makes the configuration non-portable, causes failures in other environments, and leaks local filesystem information. This issue affects all services defined in this file (api-server, dag-processor, scheduler, triggerer) and should be replaced with a relative path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The volume mount in docker-compose.override.yml uses a hardcoded absolute path to a local directory, which leaks sensitive PII (e.g., developer's username and local file system structure). This also severely impacts portability, causing services to fail in other environments. This issue is repeated for dag-processor, scheduler, and triggerer services. It is recommended to use a relative path.

       - ./:/usr/local/airflow/project

environment:
PYTHONPATH: /usr/local/airflow/project
networks: [etl-shared]

dag-processor:
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

environment:
PYTHONPATH: /usr/local/airflow/project
networks: [etl-shared]

scheduler:
networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

environment:
PYTHONPATH: /usr/local/airflow/project
networks: [etl-shared]

triggerer:
networks:
- etl-shared
volumes:
- /Users/luisgonzalez/Documents/code/etl pipeline/github_etl:/usr/local/airflow/project
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The docker-compose.override.yml file contains a hardcoded absolute path to a local user directory (/Users/luisgonzalez/...). This makes the configuration non-portable and will cause it to fail on any other developer's machine or in a CI/CD environment. It also leaks information about the developer's local filesystem.

environment:
PYTHONPATH: /usr/local/airflow/project
networks: [etl-shared]


networks:
etl-shared:
Expand Down
14 changes: 10 additions & 4 deletions client.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
import logging
from pathlib import Path
from datetime import datetime, timezone
from datetime import datetime
from celery.result import AsyncResult
from worker import build_repo_chord
from rb_queue.rabbitmq import consume_repos
import polars as pl

DIRECT = Path("/usr/local/airflow/project/data")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The data directory path is hardcoded. This makes the application less flexible and harder to configure for different environments. It is a best practice to source this kind of configuration from an environment variable, for example by using os.getenv(). This would require importing the os module at the top of the file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DIRECT constant is defined with a hardcoded absolute path. This reduces portability. Consider making this path configurable, for example, by reading it from an environment variable. You will also need to add import os at the top of the file.

Suggested change
DIRECT = Path("/usr/local/airflow/project/data")
DIRECT = Path(os.getenv("PROJECT_DATA_DIR", "/usr/local/airflow/project/data"))


def save_to_parquet(the_data):
today = datetime.now().strftime("%Y-%m-%d")

if not Path(f"data/{today}/").exists():
Path(f"data/{today}").mkdir(parents=True, exist_ok=True)
# if not Path(f"data/{today}/").exists():
# Path(f"data/{today}").mkdir(parents=True, exist_ok=True)
Comment on lines +14 to +15
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This commented-out code should be removed to keep the codebase clean.

Comment on lines +14 to +15
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code is commented out. If it's no longer needed, it should be removed to keep the codebase clean.


fi_direct = DIRECT / today
fi_direct.mkdir(parents=True, exist_ok=True)

print("This is the else of the client")
print(the_data)

df = pl.DataFrame(the_data)
df.write_parquet(f"data/{today}/github_data.parquet", compression="zstd")
df.write_parquet(f"{fi_direct}/github_data.parquet", compression="zstd")
print("Valid Parquet data")


def get_data_from_queue():
try:
print("Getting the result")

response = build_repo_chord(total=5000, batch_size=500)
the_data = response.get(timeout=3600) # 1 hour timeout
print(f"Result: {the_data}")
Expand All @@ -36,3 +41,4 @@ def get_data_from_queue():

if __name__ == "__main__":
get_data_from_queue()

10 changes: 1 addition & 9 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from datetime import datetime
from github import Auth, Github, GithubException
from dotenv import load_dotenv
from client import get_data_from_queue
from pydantic_models.github import RabbitMQ_Data_Validation
from rb_queue.rabbitmq import get_connection, QUEUE_NAME
load_dotenv()
Expand Down Expand Up @@ -133,7 +132,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git
print(f"Reached batch size of {batch_size}")
break

if remaining < 100:
if remaining < 20:
print(f"Rate limit approaching ({remaining}). Stopping worker.")
break

Expand Down Expand Up @@ -183,13 +182,6 @@ def build_repo_chord(total: int = 5000, batch_size: int = 500):
return chord(header)(aggregate_results.s())


@app.task
def run_queue_and_save(total: int = 5000, batch_size: int = 500):
return get_data_from_queue(total=total, batch_size=batch_size)




# old code that did not work
# @app.task
# def distribute_tasks():
Expand Down
Loading