Architecture Overview
Workloads
@task
: 파이썬에서 패키지로 지원하는 데코레이터Control flow
DAGs
Task들이 의존성을 기반으로 조직된 Airflow의 핵심 개념
어떤 task가 의존성을 가지는지와 얼마나 자주 DAG가 실행되는 지를 정의할 수 있다.
Declaring a DAG
with DAG("dag_name") as dag:
t = DummyOperator(task_id="task")
my_dag = DAG("dag_name")
op = DummyOperator(task_id="task", dag=my_dag)
Task dependencies
chan(*args)
를 사용args[0] >> args[1] >> ... >> args[n-1]
Loading DAGs
DAG_FOLDER
를 기반으로 로딩Running DAGs
schedule_interval
: 시간을 베이스로 실행이된다.Tasks
none
→ scheduled
→ queued
→ running
→ success
Tasks-Operators
Tasks-Sensors
Scheduler
airflow scheduler
명령어를 통해 제어를 할 수 있다.
airflow.cfg
에서 진행한다.Template
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
{{ ds }}
의 경우는 macro로 date time을 의미한다.Tutorial - Airflow Documentation
Module import
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
Default Arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
airflow.models.BaseOperator
을 참고하자Instantiate a DAG
with DAG(
'tutorial', # setting dag_id
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
dag_id
로 문자열을 그대로 넘겨준다.Tasks
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
task_id
가 구분 인자로 필수적으로 필요하다.task_id
혹은 owner
가 반드시 필요하다.Templating with Jinja
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
{% %}
block{{ }}
blockt3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
bash_command
에 필요 및 실행할 bash 스크립트를 넣어주며 → templated_command.sh
params
를 이용해서 dict 객체를 전달한다.dict(hello=lambda name: 'Hello %s' % name)
를 정의하면{{ 'world' | hello }}
과 같이 쓰일 수 있다.Adding DAG and Tssks documentation
하나의 task에 documentation을 넣을 수 있다.
DAG docs는 markdown을 지원하며 task docs는 plaintext, markdown 별걸 다 지원한다.
t1.doc_md = dedent(
"""\\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](<http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png>)
"""
)
Setting up Dependencies
t2
가 정상적으로 작동하기 위해서는 t1
에 의존한다.t2.set_upstream(t1)
t1 >> t2
t1 >> [t2, t3]
도 가능하다.t1
이 제대로 완료돼야 t2
, t3
가 완료된다.