Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from airflow.sdk import dag, task
from pendulum import datetime
from celery import Celery
import os

app = Celery(
'airflow_client',
broker = os.getenv('CELERY_BROKER_URL'),
backend = os.getenv('CELERY_BACKEND_URL')
Comment on lines +8 to +9
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using os.getenv to fetch connection details directly within a DAG is not a recommended practice. It makes the DAG less portable and harder to manage as it creates a tight coupling with the execution environment's environment variables. A more robust and secure approach is to use Airflow Connections. You can store your broker and backend URLs as a Celery connection in the Airflow UI and then retrieve them in your DAG using an Airflow hook. This centralizes connection management and enhances security.

)

@dag(
schedule="@hourly",
start_date=datetime(2026, 2, 2),
start_date=datetime(2026, 2, 3),
description="Run Celery queue with RabbitMQ as the broker \
in order to get GitHub data from the GitHub API",
tags=["celery_queue"],
Expand All @@ -13,7 +21,7 @@ def run_queue():

@task
def run_the_queue():
print("hello")
app.send_task("worker.get_github_data", args=[0, 500])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The arguments 0 and 500 are hardcoded in the send_task call. This limits the DAG's flexibility and reusability. To make this task more dynamic, consider using Airflow's params feature. You can define default values in the @dag decorator and override them when triggering the DAG manually.

For example:

@dag(
    # ... other arguments
    params={"start_in_repo_num": 0, "batch_size": 500},
)

You would then access these parameters within your task via the task context.


run_the_queue()

Expand Down
14 changes: 14 additions & 0 deletions airflow/docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
api-server:
networks:
- etl-shared
scheduler:
networks:
- etl-shared
triggerer:
networks:
- etl-shared

networks:
etl-shared:
external: true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This file is missing a final newline character. It's a POSIX standard for text files to end with a newline, and some tools and editors may not handle files correctly without it.

    external: true

1 change: 0 additions & 1 deletion client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

print("Waiting for Celery task to complete")

# test
try:
print("Getting the result")
response = build_repo_chord(total=5000, batch_size=500)
Expand Down
16 changes: 15 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ services:
timeout: 10s
retries: 5
start_period: 60s
networks:
- etl-shared

redis:
image: redis:7-alpine
Expand All @@ -31,6 +33,8 @@ services:
retries: 5
start_period: 30s
restart: on-failure
networks:
- etl-shared
Comment on lines +36 to +37
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The Redis service is configured without authentication and is being added to an external shared network (etl-shared). This allows any other container on the same host that joins this network to access the Redis instance without a password. Since Redis is used as a Celery backend, an attacker on the same network could read or manipulate task results, potentially leading to data corruption or unauthorized access to processed information. It is highly recommended to secure Redis with a password and ensure that the shared network is restricted to only necessary services.


celery_worker:
build: .
Expand All @@ -56,6 +60,8 @@ services:
redis:
condition: service_healthy
restart: on-failure
networks:
- etl-shared

flower:
image: mher/flower
Expand All @@ -69,6 +75,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
networks:
- etl-shared

client:
build: .
Expand All @@ -83,6 +91,8 @@ services:
condition: service_healthy
celery_worker:
condition: service_healthy
networks:
- etl-shared


# airflow-init:
Expand Down Expand Up @@ -130,4 +140,8 @@ services:

volumes:
rabbitmq_data:
redis_data:
redis_data:

networks:
etl-shared:
external: true
Comment on lines +145 to +147
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The etl-shared network is defined as external in both docker-compose.yml and airflow/docker-compose.override.yml. This is redundant. To adhere to the DRY (Don't Repeat Yourself) principle, it's best to define the network in a single location. Since docker-compose.yml serves as the base configuration, I recommend keeping the definition here and removing the networks block from airflow/docker-compose.override.yml.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This file is missing a final newline character. It's a POSIX standard for text files to end with a newline, and some tools and editors may not handle files correctly without it.

    external: true

Loading