SamSam
3 min readDec 14, 2024

Airflow using guide for dag task built

# airflow 操作步驟

1. provider 安裝

$ pip install apache-airflow-providers-microsoft-mssql[common.sql]

2. connection 建立

3. task 建立

  • airflow 是以 dag內有task做為workflow的執行單位。1個dag內能有多個task 平行或是依序運行。
    * 我們拆分兩種sample演示

a. task 依照順序執行

b. task 平行分流執行

執行結果

4. task code sample
a. task 依序執行 task code sample

 start = DummyOperator(task_id = f'start')

mssql_test_sam = MsSqlOperator(
task_id='mssql_test_sam',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_sam.sql',
autocommit = True
)

mssql_test_anna = MsSqlOperator(
task_id='mssql_test_anna',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_anna.sql',
autocommit = True
)

end = DummyOperator(task_id = f'end')
start >> mssql_test_sam >> mssql_test_anna >> end

b. task 平行分流執行 task code sample

 start = DummyOperator(task_id = f'start')

mssql_test_anna = MsSqlOperator(
task_id='mssql_test_anna',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_anna.sql',
autocommit = True
)
mssql_test_sam = MsSqlOperator(
task_id='mssql_test_sam',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_sam.sql',
autocommit = True
)

end = DummyOperator(task_id = f'end')
start >> [mssql_test_anna, mssql_test_sam]

5. dag code sample

* 單純dag code sample

# task 依序執行 task code sample
#!/usr/bin/env python
# coding: utf-8
from airflow.utils.state import State
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow import DAG
from airflow.utils.dates import days_ago
# from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.exceptions import AirflowSkipException
from airflow.utils.task_group import TaskGroup
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
import datetime
default_args = {
'owner': 'admin',
'start_date': datetime.datetime(2024,12,12,14,00),
'retries': 3,
'retry_delay': datetime.timedelta(seconds=60)
}
with DAG('mssql_test_by_order', default_args=default_args, schedule_interval='5 6 * * 1', template_searchpath=['C:\\Users\\sam.xiao\\Documents\\airflow\\dags\\sql_file']) as dag:

start = DummyOperator(task_id = f'start')

mssql_test_sam = MsSqlOperator(
task_id='mssql_test_sam',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_sam.sql',
autocommit = True
)

mssql_test_anna = MsSqlOperator(
task_id='mssql_test_anna',
mssql_conn_id='admin_mssql_connect',
sql='/sql_file/mssql_test_anna.sql',
autocommit = True
)

end = DummyOperator(task_id = f'end')
start >> mssql_test_sam >> mssql_test_anna >> end
  • dag 中仍有共用的部分,且也能有dag之間的相依(dependency)
    當兩個 DAG 具有相依性關係時,值得考慮將它們組合成單個 DAG,這通常更容易理解。 Airflow 還為同一個 DAG 上的任務提供了更好的相依性視覺化表示。 但是,有時將所有相關任務都放在同一個 DAG 上並不實際。 例如

兩個 DAG 可能有不同的排程。 例如,每週 DAG 可能有任務依賴於每日 DAG 上的其他任務。

不同的團隊負責不同的 DAG,但這些 DAG 有一些跨 DAG 相依性。

任務可能依賴於同一個 DAG 上的另一個任務,但針對不同的 execution_date(資料間隔的開始)。

對於在不同時間執行的任務,請使用 execution_delta,例如 execution_delta=timedelta(hours=1) 來檢查 1 小時前執行的任務。

* 我們假設情境是,先執行完依序dag,再執行平行dag。此時,依序dag為parent; 平行dag為child

**sample**

 child_task1 = ExternalTaskSensor(
task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)
  • *sample in reality**
 mssql_test_anna = ExternalTaskSensor(
task_id="mssql_test_anna",
external_dag_id='mssql_test_by_order',
external_task_id='mssql_test_sam',
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)

ref
— -

https://shu-ting.medium.com/data-feat-programming-day-12-415db3b59514
https://medium.com/accredian/setting-up-apache-airflow-in-windows-using-wsl-dbb910c04ae0

SamSam
SamSam

Written by SamSam

用有限的資料知識探索無限的世界

No responses yet