摘要:顯然,這多帶帶執行不起作用這將通過子操作符被作為像是自己的調度任務中那樣運行。子也必須有個可用調度即使子作為其父的一部分被觸發子也必須有一個調度如果他們的調度是設成,這個子操作符將不會觸發任何任務。這兩個例子都是緣起子操作符被當做了回填工作。
前言
Airbnb的數據工程師 Maxime Beauchemin 激動地表示道:Airflow 是一個我們正在用的工作流調度器,現在的版本已經更新到1.6.1了,并且引入了一些列調度引擎的改革。我們喜歡它是因為它寫代碼太容易了,也便于調試和維護。我們也喜歡全都用他來寫代碼,而不是像xml那樣的配置文件用來描述DAG。更不用說,我們顯然不用再學習太多東西。
任務隔離在一個分布式環境中,宕機是時有發生的。Airflow通過自動重啟任務來適應這一變化。到目前為止一切安好。當我們有一系列你想去重置狀態的任務時,你就會發現這個功能簡直是救世主。為了解決這個問題,我們的策略是建立子DAG。這個子DAG任務將自動重試自己的那一部分,因此,如果你以子DAG設置任務為永不重試,那么憑借子DAG操作你就可以得到整個DAG成敗的結果。如果這個重置是DAG的第一個任務設置子DAG的策略就會非常有效,對于有一個相對復雜的依賴關系結構設置子DAG是非常棒的做法。注意到子DAG操作任務不會正確地標記失敗任務,除非你從GitHub用了最新版本的Airflow。解決這個問題的另外一個策略是使用重試柄:
def make_spooq_exporter(table, schema, task_id, dag): return SpooqExportOperator( jdbc_url=("jdbc:mysql://%s/%s?user=user&password=pasta" % (TARGET_DB_HOST,TARGET_DB_NAME)), target_table=table, hive_table="%s.%s" % (schema, table), dag=dag, on_retry_callback=truncate_db, task_id=task_id) def truncate_db(context): hook = MySqlHook("clean_db_export") hook.run( "truncate `%s`"%context["task_instance"].task.target_table, autocommit=False, parameters=None)
這樣你的重試柄就可以將任務隔離,每次執行某個特定的任務。
代碼定義任務這在執行一個特定的可重復的任務時非常管用。用代碼來定義工作流是這個系統最強大之處是你可以以編碼的方式產生DAG。這在在沒有人工干預的情況下自動接入新的數據源的時候非常有用。
我們借助現有的日志目錄將檢查HDFS日志融入DAG,并且在每次融入這些數據的時候在每個目錄下產生一個任務。示例代碼如下:
lognames = list( hdfs.list_filenames(conf.get("incoming_log_path"), full_path=False)) for logname in lognames: # TODO 使用適當的正則表達式來過濾掉不良日志名,使得Airflow 能用符合特定的字符找出相應任務的名字 if logname not in excluded_logs and "%" not in logname and "@" not in logname: ingest = LogIngesterOperator( # 因為log_name以作為unicode返回值,所以需要用str()包裝task_id task_id=str("ingest_%s" % logname), db=conf.get("hive_db"), logname=logname, on_success_callback=datadog_api.check_data_lag, dag=dp_dag ) ingest.set_upstream(transfer_from_incoming) ingest.set_downstream(transform_hive)今日事,今日畢
在每天結束的時候執行每日任務,而不是在當天工作開始的時候去執行這些任務。你不能將子DAG放在DAG文件夾下,換句話說除非你保管一類DAG,否則你不可以將子DAG放在自己的模塊中。
子DAG與主DAG不能嵌套或者更具體地說就是,雖然你也可以將子DAG放在DAG文件夾下,但是接著子DAG將先主DAG一樣運行自己的調度。這里是一個兩個DAG的例子(假設他們同時在DAG文件夾下,也就是所謂的差DAG)這里的子DAG將在主DAG中通過調度器被多帶帶調度。
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from bad_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id="main_dag", schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這多帶帶執行不起作用 transform_hive = SubDagOperator( subdag=hive_dag, task_id="hive_transform", dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator from datetime import timedelta, datetime # 這將通過子DAG操作符被作為像是自己的調度任務中那樣運行。 hive_dag = DAG("main_dag.hive_transform", # 注意到這里的重復迭代 schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21)) hive_transform = HiveOperator(task_id="flatten_tables", hql=send_charge_hql, dag=dag)
除非你真的想這個子DAG被主DAG調度。
我們通過使用工廠函數解決這個問題。這是一個優勢那就是 主DAG可以傳遞一些必要的參數到子DAG,因此他們在調度的時候其他參數也自動賦值了。當你的主DAG發生變化時,我們不需要去跟蹤參數。
在下面的例子中,假設DAG是所謂的好DAG:
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from good_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id="main_dag", schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這多帶帶執行不起作用 transform_hive = SubDagOperator( subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval), task_id="hive_transform", dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator # 對調度程序來說,沒有Dag的頂層模塊就不起作用了 def hive_dag(start_date, schedule_interval): # you might like to make the name a parameter too dag = DAG("main_dag.hive_transform", # 注意這里的設置 schedule_interval=schedule_interval, start_date=start_date) hive_transform = HiveOperator(task_id="flatten_tables", hql=send_charge_hql, dag=dag) return dag
使用工廠類使得子DAG在保障調度器從開始運行時就可維護就更強。
另一種模式是將主DAG和子DAG之間的共享設為默認參數,然后傳遞到工廠函數中去,(感謝 Maxime 的建議)。
子DAG也必須有個可用調度即使子DAG作為其父DAG的一部分被觸發子DAG也必須有一個調度,如果他們的調度是設成None,這個子DAG操作符將不會觸發任何任務。
更糟糕的是,如果你對子DAG被禁用,接著你又去運行子DAG操作,而且還沒運行完,那么以后你的子DAG就再也運行不起來了。
這將快速導致你的主DAG同時運行的任務數量一下就達到上限(默認一次寫入是16個)并且這將導致調度器形同虛設。
這兩個例子都是緣起子DAG操作符被當做了回填工作。這里可以看到這個
什么是DagRun:遲到的禮物Airflow1.6的最大更新是引入了DagRun?,F在,任務調度實例是由DagRun對象來創建的。
相應地,如果你想跑一個DAG而不是回填工作,你可能就需要用到DagRun。
你可以在代碼里寫一些airflow trigger_dag命令,或者也可以通過DagRun頁面來操作。
這個巨大的優勢就是調度器的行為可以被很好的理解,就像它可以遍歷DagRun一樣,基于正在運行的DagRun來調度任務實例。
這個服務器現在可以向我們顯示每一個DagRun的狀態,并且將任務實例的狀態與之關聯。
DagRun是怎樣被調度的新的模型也提供了一個控制調度器的方法。下一個DagRun會基于數據庫里上一個DagRun的實例來調度。
除了服務峰值的例外之外,大多數實例是處于運行還是結束狀態都不會影響整體任務的運行。
這意味著如果你想返回一個在現有和歷史上不連續集合的部分DagRun ,你可以簡單刪掉這個DagRun任務實例,并且設置DagRun的狀態為正在運行。
按照我們的經驗,一個需要占用很長時間運行的調度器至少是個最終沒有安排任務的CeleryExcecutor。很不幸,我們仍然不知道具體的原因。不過慶幸的是,Airflow 內建了一個以num_runs形式作標記的權宜之計。它為調度器確認了許多迭代器來在它退出之前確保執行這個循環。我們運行了10個迭代,Airbnb一般運行5個。注意到這里如果用LocalExecutor將會引發一些問題。我們現在使用chef來重啟executor;我們正計劃轉移到supervisor上來自動重啟。
操作符的依賴于依賴包這個airflow.operators包有一些魔法,它讓我們只能使用正確導入的操作符。這意味著如果你沒有安裝必要的依賴,你的操作符就會失效。
這是所有的 Fork! (現在)Airflow 是正在快速迭代中,而且不只是Airbnb自己在做貢獻。Airflow將會繼續演化,而我也將寫更多有關Airflow的技巧供大家學習使用。
如果你也對解決這些問題感興趣,那就加入我們吧!
參考資料Airflow官方文檔
docker-airflow
Airflow 的GitHub地址
Designing workflow with Airflow
Airflow Demo
pandastrike:Airflow
Airflow review
Airflow and Hive
Youtube: Airflow An open source platform to author and monitor data pipelines
Hackenews: Airflow by airbnb is a nice alternative to luigi
Luigi vs Airflow vs Pinball
Existing Workflow systems
Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!
AirFlow Joins Apache Incubator
Managing Containerized Data Pipeline Dependencies With Luigi
Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances
工作流調研 oozie vs azkaban
日拱一卒
Existing Workflow systems
Awesome Pipeline
rediit: Azkaban vs Oozie vs Airflow
推薦閱讀董老師在硅谷:[硅谷熱門公司技術巡禮]1.Airbnb基礎數據架構
董老師在硅谷:DAG、Workflow 系統設計、Airflow 與開源的那些事兒
[原]數據流編程教程:如何使用Airflow構建數據科學工作流
原作者:Marcin Tustin 翻譯:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls作為分享主義者(sharism),本人所有互聯網發布的圖文均遵從CC版權,轉載請保留作者信息并注明作者 Harry Zhu 的 FinanceR專欄:https://segmentfault.com/blog/harryprince,如果涉及源代碼請注明GitHub地址:https://github.com/harryprince。微信號: harryzhustudio
商業使用請聯系作者。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/37915.html
摘要:概述是一個我們正在用的工作流調度器,相對于傳統的任務管理,很好的為我們理清了復雜的任務依賴關系監控任務執行的情況。步驟三修改默認數據庫找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信號商業使用請聯系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...
摘要:在同行評議上,我們檢查方法論的改進現有工作的關聯性以及準確的解釋性聲明。學習價值通過之前一系列的工作,現在數據科學家可以分享自己的新方法論代碼技術并且加快品牌化推廣,讓團隊之外的人可以快速了解自己的領域。 頑疾 Airbnb的數據團隊很重要的一個職責就是傳播基于數據的決策方法。我們將數據的獲取民主化,使得每一個Airbnb的成員都可以量化他們基于數據的決策影響力并且借此洞察用戶偏好,提...
摘要:面試如何防騙一份優秀的前端開發工程師簡歷是怎么樣的作為,有哪些一般人我都告訴他,但是他都不聽的忠告如何面試前端工程師 更多資源請Star:https://github.com/maidishike... 文章轉自:https://github.com/jsfront/mo... 3月份前端資源分享 1. Javascript 使用judge.js做信息判斷 javascript...
摘要:蠎周刊年度最贊親俺們又來回顧又一個偉大的年份兒包去年最受歡迎的文章和項目如果你錯過了幾期就這一期不會丟失最好的嗯哼還為你和你的準備了一批紀念裇從這兒獲取任何時候如果想分享好物給大家在這兒提交喜歡我們收集的任何意見建議通過來吧原文 Title: 蠎周刊 2015 年度最贊Date: 2016-01-09 Tags: Weekly,Pycoder,Zh Slug: issue-198-to...
摘要:概述我非常認同前百度數據工程師現神策分析創始人桑老師最近談到的數據分析三重境界統計計數多維分析機器學習數據分析的統計計數和多維分析,我們通常稱之為數據探索式分析,這個步驟旨在了解數據的特性,有助于我們進一步挖掘數據的價值。 showImg(https://camo.githubusercontent.com/f98421e503a81176b003ddd310d97e1e1214625...
閱讀 1887·2021-11-15 11:46
閱讀 1077·2021-10-26 09:49
閱讀 1819·2021-10-14 09:42
閱讀 3374·2021-09-26 09:55
閱讀 827·2019-08-30 13:58
閱讀 1024·2019-08-29 16:40
閱讀 3462·2019-08-26 10:27
閱讀 601·2019-08-23 18:18