From 38e4d086cff3d09b9f947b3f2eb3cd732c95a4b2 Mon Sep 17 00:00:00 2001 From: Marcel D Date: Thu, 30 Mar 2023 16:54:11 +0200 Subject: [PATCH] fix xcom --- airflow/Readme.md | 9 +++++++++ airflow/dags/aoe_draft_dag.py | 9 ++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/airflow/Readme.md b/airflow/Readme.md index 34cf526..061a0d7 100644 --- a/airflow/Readme.md +++ b/airflow/Readme.md @@ -1,5 +1,9 @@ # Apache Airflow Investigation +## How to run +in airflow directory: +`docker-compose up -d` + login: airflow:airflow @@ -22,3 +26,8 @@ https://airflow.apache.org/docs/apache-airflow-providers-openfaas/stable/index.h https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/index.html + +Lessons learned +Airflow stack takes A LOT of RAM: ~ 9GB + + diff --git a/airflow/dags/aoe_draft_dag.py b/airflow/dags/aoe_draft_dag.py index 8ed2953..72ba0f6 100644 --- a/airflow/dags/aoe_draft_dag.py +++ b/airflow/dags/aoe_draft_dag.py @@ -12,7 +12,7 @@ def extract_draft_data(): return drafts.download_draft_json(marcel_vs_canuck) def transform_draft(ti): - draft_json=ti.xcom_pull(task_ids=['extract_draft1']) + draft_json=ti.xcom_pull(task_ids='extract_draft1', key='return_value') if not draft_json: raise ValueError("No value stored in XComs") draft_picks=drafts.get_picked_maps(draft_json) @@ -32,13 +32,16 @@ with DAG("aoe_dag", start_date=datetime(2023, 1, 1), schedule_interval="@daily", #T transform_data = PythonOperator( task_id="transform_draft1", - python_callable=transform_draft + python_callable=transform_draft, + do_xcom_push=True ) #L + string_to_print = "{{ task_instance.xcom_pull(task_ids='transform_draft1', key='return_value') }}" print_output = BashOperator( task_id="print1", - bash_command="echo 'bla'" + bash_command="echo ${STRING_TO_PRINT}", + env={"STRING_TO_PRINT": string_to_print} ) extract_data >> transform_data >> print_output \ No newline at end of file