diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py index 75400c4..3c8deff 100644 --- a/airflow/dags/run_queue.py +++ b/airflow/dags/run_queue.py @@ -1,4 +1,5 @@ import os +from socket import timeout from airflow.sdk import dag, task from pendulum import datetime from celery import Celery @@ -27,9 +28,9 @@ def run_github_data_queue(): @task(do_xcom_push=True, multiple_outputs=True) def check_rate_limit(**context): - api_token, api_token_two = os.getenv("GITHUB_API_TOKEN"), os.getenv("GITHUB_API_TOKEN_SECOND_ACCOUNT") - auth, auth_two = Auth.Token(api_token), Auth.Token(api_token_two) - gh, gh_two = Github(auth=auth), Github(auth=auth_two) + api_token = os.getenv("GITHUB_API_TOKEN") + auth = Auth.Token(api_token) + gh = Github(auth=auth) rate_limit = gh.rate_limiting print(f"Rate limit: {rate_limit[0]} remaining / {rate_limit[1]} total") @@ -44,15 +45,10 @@ def run_queue(**context): rate_limit = context["ti"].xcom_pull(task_ids="check_rate_limit", key="remaining") max_total_api_calls = context["ti"].xcom_pull(task_ids="check_rate_limit", key="total") - if rate_limit > 100: - print("IT WORKS") - print(rate_limit) - celery_worker = app.send_task("worker.get_github_data") + app.send_task("worker.get_github_data") - print(celery_worker) - - time.sleep(500) + print("celery_worker") @task def save_data_from_queue(): diff --git a/fluent-bit/fluent-bit.yml b/fluent-bit/fluent-bit.yml new file mode 100644 index 0000000..cbc889a --- /dev/null +++ b/fluent-bit/fluent-bit.yml @@ -0,0 +1,23 @@ +pipeline: + inputs: + - name: dummy + tag: workshop.info + dummy: '{"message": "INFO message"}' + + - name: dummy + tag: workshop.info + dummy: '{"message": "ERROR message"}' + + outputs: + - name: stdout + match: '*' + format: json_lines + + - name: file + file: /data + match: '*.info' + file: workshop-INFO.log + format: json_lines + + +