Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from socket import timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout object from the socket module is imported but never used in this file. It's best practice to remove unused imports to keep the code clean and avoid potential confusion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout import from socket is unused in this file and should be removed to improve code clarity and reduce clutter.

from airflow.sdk import dag, task
from pendulum import datetime
from celery import Celery
Expand Down Expand Up @@ -27,9 +28,9 @@ def run_github_data_queue():

@task(do_xcom_push=True, multiple_outputs=True)
def check_rate_limit(**context):
api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT")
auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two)
gh, gh_two = Github(auth=auth), Github(auth=auth_two)
api_token = os.getenv("GITHUB_API_TOKEN")
auth = Auth.Token(api_token)
gh = Github(auth=auth)

rate_limit = gh.rate_limiting
print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total")
Expand All @@ -44,15 +45,10 @@ def run_queue(**context):
rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining")
max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total")
Comment on lines 45 to 46
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variables rate_limit and max_total_api_calls are assigned values from XComs but are not used anywhere in the run_queue task. This is dead code and should be removed to improve clarity and maintainability.


if rate_limit > 100:
print("IT WORKS")
print(rate_limit)

celery_worker = app.send_task("worker.get_github_data")
app.send_task("worker.get_github_data")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

With the removal of the if condition that used rate_limit, the variables rate_limit and max_total_api_calls (fetched on lines 45-46) are no longer used in this task. Please remove the XCom pulls for these variables to clean up the code and avoid unnecessary operations.


print(celery_worker)

time.sleep(500)
print("celery_worker")
Comment on lines +49 to +51
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The statement print("celery_worker") is not very informative for logging or debugging. Previously, the Celery task object was printed. Consider capturing the result of app.send_task and logging the task ID for better traceability. Also, it's recommended to use Airflow's logging mechanism instead of print for better integration with the Airflow UI.

Suggested change
app.send_task("worker.get_github_data")
print(celery_worker)
time.sleep(500)
print("celery_worker")
celery_task = app.send_task("worker.get_github_data")
print(f"Celery task sent with ID: {celery_task.id}")

Comment on lines 50 to +51
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This print statement and the preceding blank line appear to be for debugging. For production code, it's better to use Airflow's logging mechanism. Please remove these lines if they are not needed for production.


@task
def save_data_from_queue():
Expand Down
23 changes: 23 additions & 0 deletions fluent-bit/fluent-bit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pipeline:
inputs:
- name: dummy
tag: workshop.info
dummy: '{"message": "INFO message"}'

- name: dummy
tag: workshop.info
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This dummy input generates an 'ERROR message' (on line 9) but uses the tag workshop.info. To distinguish between message types for routing and processing, it's best to use a distinct tag, such as workshop.error.

      tag: workshop.error

dummy: '{"message": "ERROR message"}'
Comment on lines +7 to +9
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This dummy input generates a record with the message 'ERROR message' but tags it as workshop.info. This will cause it to be routed to the workshop-INFO.log file, which is likely not the intended behavior for an error message. Consider changing the tag to something like workshop.error to allow for separate routing of error messages.

    - name: dummy
      tag: workshop.error
      dummy: '{"message": "ERROR message"}'


outputs:
- name: stdout
match: '*'
format: json_lines

- name: file
file: /data
match: '*.info'
file: workshop-INFO.log
Comment on lines +17 to +19
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a duplicate file key in this output configuration. The first instance on line 17 should be path to specify the directory for the log file. With the current configuration, file: /data will be overwritten by the subsequent file key, causing the log file to be created in Fluent Bit's current working directory instead of the intended /data directory.

      path: /data
      match: '*.info'
      file: workshop-INFO.log

Comment on lines +17 to +19
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This output block has a duplicate file key. To specify the output directory, you should use path. The current configuration is incorrect as the key on line 19 will override line 17.

      path: /data
      match: '*.info'
      file: workshop-INFO.log

format: json_lines



Loading