This commit is contained in:
dechert
2023-03-30 15:36:02 +02:00
parent add7a566d5
commit 4332a35f4a
20 changed files with 649 additions and 368 deletions

View File

@@ -0,0 +1,44 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import drafts
def extract_draft_data():
marcel_vs_canuck = 'lipeX'
return drafts.download_draft_json(marcel_vs_canuck)
def transform_draft(ti):
draft_json=ti.xcom_pull(task_ids=['extract_draft1'])
if not draft_json:
raise ValueError("No value stored in XComs")
draft_picks=drafts.get_picked_maps(draft_json)
return drafts.neutral_map_as_string(draft_picks)
# catchup: avoid running non-triggered dag-runs since start_date
with DAG("aoe_dag", start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False) as dag:
#E
extract_data = PythonOperator(
task_id="extract_draft1",
python_callable=extract_draft_data,
do_xcom_push=True
)
#T
transform_data = PythonOperator(
task_id="transform_draft1",
python_callable=transform_draft
)
#L
print_output = BashOperator(
task_id="print1",
bash_command="echo 'bla'"
)
extract_data >> transform_data >> print_output

27
airflow/dags/drafts.py Normal file
View File

@@ -0,0 +1,27 @@
import requests
import json
import pandas as pd
def download_draft_json(draft_id):
url = "https://aoe2cm.net/api/draft/{}".format(draft_id)
payload={}
headers = {}
response = requests.request("GET", url, headers=headers, data=payload)
# print(response.text)
draft_json_string = response.text
draft_dict = json.loads(draft_json_string)
return draft_dict
def get_picked_maps(draft_json):
draft_events = draft_json['events']
draft_events_df = pd.DataFrame.from_dict(draft_events)
draft_picks = draft_events_df[draft_events_df['actionType'] == 'pick']
return draft_picks
# the last pick is the neutral map
def neutral_map_as_string(draft_picks):
# print(draft_picks.tail(1)['chosenOptionId'])
return str(draft_picks.tail(1)['chosenOptionId'][8])