aoe2-data/airflow/dags/aoe_draft_dag.py
2023-03-30 16:54:11 +02:00

47 lines
1.4 KiB
Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import drafts
def extract_draft_data():
marcel_vs_canuck = 'lipeX'
return drafts.download_draft_json(marcel_vs_canuck)
def transform_draft(ti):
draft_json=ti.xcom_pull(task_ids='extract_draft1', key='return_value')
if not draft_json:
raise ValueError("No value stored in XComs")
draft_picks=drafts.get_picked_maps(draft_json)
return drafts.neutral_map_as_string(draft_picks)
# catchup: avoid running non-triggered dag-runs since start_date
with DAG("aoe_dag", start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False) as dag:
#E
extract_data = PythonOperator(
task_id="extract_draft1",
python_callable=extract_draft_data,
do_xcom_push=True
)
#T
transform_data = PythonOperator(
task_id="transform_draft1",
python_callable=transform_draft,
do_xcom_push=True
)
#L
string_to_print = "{{ task_instance.xcom_pull(task_ids='transform_draft1', key='return_value') }}"
print_output = BashOperator(
task_id="print1",
bash_command="echo ${STRING_TO_PRINT}",
env={"STRING_TO_PRINT": string_to_print}
)
extract_data >> transform_data >> print_output