Architecture Overview


Workloads
@task: 파이썬에서 패키지로 지원하는 데코레이터Control flow
DAGs
Task들이 의존성을 기반으로 조직된 Airflow의 핵심 개념
어떤 task가 의존성을 가지는지와 얼마나 자주 DAG가 실행되는 지를 정의할 수 있다.
Declaring a DAG
with DAG("dag_name") as dag:
t = DummyOperator(task_id="task")
my_dag = DAG("dag_name")
op = DummyOperator(task_id="task", dag=my_dag)
Task dependencies
chan(*args)를 사용args[0] >> args[1] >> ... >> args[n-1]Loading DAGs
DAG_FOLDER를 기반으로 로딩Running DAGs
schedule_interval: 시간을 베이스로 실행이된다.Tasks
none → scheduled → queued → running → successTasks-Operators
Tasks-Sensors
Scheduler
airflow scheduler 명령어를 통해 제어를 할 수 있다.
airflow.cfg에서 진행한다.Template
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
{{ ds }}의 경우는 macro로 date time을 의미한다.Tutorial - Airflow Documentation
Module import
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
Default Arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
airflow.models.BaseOperator을 참고하자Instantiate a DAG
with DAG(
'tutorial', # setting dag_id
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
dag_id로 문자열을 그대로 넘겨준다.Tasks
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
task_id가 구분 인자로 필수적으로 필요하다.task_id 혹은 owner가 반드시 필요하다.Templating with Jinja
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
{% %} block{{ }} blockt3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
bash_command에 필요 및 실행할 bash 스크립트를 넣어주며 → templated_command.shparams를 이용해서 dict 객체를 전달한다.Adding DAG and Tssks documentation
Setting up Dependencies