fix xcom
This commit is contained in:
		| @@ -1,5 +1,9 @@ | |||||||
| # Apache Airflow Investigation | # Apache Airflow Investigation | ||||||
|  |  | ||||||
|  | ## How to run | ||||||
|  | in airflow directory: | ||||||
|  | `docker-compose up -d` | ||||||
|  |  | ||||||
| login: | login: | ||||||
| airflow:airflow | airflow:airflow | ||||||
|  |  | ||||||
| @@ -22,3 +26,8 @@ https://airflow.apache.org/docs/apache-airflow-providers-openfaas/stable/index.h | |||||||
|  |  | ||||||
| https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/index.html | https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/index.html | ||||||
|  |  | ||||||
|  |  | ||||||
|  | Lessons learned | ||||||
|  | Airflow stack takes A LOT of RAM: ~ 9GB | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -12,7 +12,7 @@ def extract_draft_data(): | |||||||
|     return drafts.download_draft_json(marcel_vs_canuck) |     return drafts.download_draft_json(marcel_vs_canuck) | ||||||
|  |  | ||||||
| def transform_draft(ti): | def transform_draft(ti): | ||||||
|     draft_json=ti.xcom_pull(task_ids=['extract_draft1']) |     draft_json=ti.xcom_pull(task_ids='extract_draft1', key='return_value')  | ||||||
|     if not draft_json: |     if not draft_json: | ||||||
|         raise ValueError("No value stored in XComs") |         raise ValueError("No value stored in XComs") | ||||||
|     draft_picks=drafts.get_picked_maps(draft_json) |     draft_picks=drafts.get_picked_maps(draft_json) | ||||||
| @@ -32,13 +32,16 @@ with DAG("aoe_dag", start_date=datetime(2023, 1, 1), schedule_interval="@daily", | |||||||
|     #T |     #T | ||||||
|     transform_data = PythonOperator( |     transform_data = PythonOperator( | ||||||
|         task_id="transform_draft1", |         task_id="transform_draft1", | ||||||
|         python_callable=transform_draft |         python_callable=transform_draft, | ||||||
|  |         do_xcom_push=True | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|     #L |     #L | ||||||
|  |     string_to_print = "{{ task_instance.xcom_pull(task_ids='transform_draft1', key='return_value') }}" | ||||||
|     print_output = BashOperator( |     print_output = BashOperator( | ||||||
|         task_id="print1", |         task_id="print1", | ||||||
|         bash_command="echo 'bla'" |         bash_command="echo ${STRING_TO_PRINT}", | ||||||
|  |         env={"STRING_TO_PRINT": string_to_print} | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
| extract_data >> transform_data >> print_output | extract_data >> transform_data >> print_output | ||||||
		Reference in New Issue
	
	Block a user