摘要:調度和監控工作流的平臺,用于用來創建監控和調整。安裝以及方式啟動重要說明使用需要安裝配置說明上篇在中配置的。負責調度,只支持單節點,多節點啟動可能會掛掉負責執行具體中的。輪詢查詢狀態是成功失敗。如是則繼續輪詢,成功失敗操作相應后續操作。
airflow是一個 Airbnb 的 Workflow 開源項目,在Github 上已經有超過兩千星。data pipeline調度和監控工作流的平臺,用于用來創建、監控和調整data pipeline。類似的產品有:Azkaban、ooziepip方式安裝
默認已經安裝python >= 2.7 以及 pip
安裝可以參考這篇,比較詳細。airflow安裝以及celery方式啟動
python 2 : pip install MySQL-python python 3 : pip install PyMySQLAIRFLOW_HOME配置說明
上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME設置目錄在airflow initdb的時候初始化,存放airflow的配置文件airflow.cfg及相關文件。
DAG說明-管理建議默認$AIRFLOW_HOME/dags存放定義的dag,可以分目錄管理dag。常用管理dag做法,dag存放另一個目錄通過git管理,并設置軟連接映射到$AIRFLOW_HOME/dag。好處方便dag編輯變更,同時dag變更不會出現編輯到一半的時候就加載到airflow中。
plugins說明-算子定義默認$AIRFLOW_HOME/plugins存放定義的plugins,自定義組件。可以自定義operator,hook等等。我們希望可以直接使用這種模式定義機器學習的一個算子。下面定義了一個簡單的加法算子。
# -*- coding: UTF-8 -*- # !/usr/bin/env python from airflow.plugins_manager import AirflowPlugin from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults # Will show up under airflow.operators.plus_plugin.PluginOperator class PlusOperator(BaseOperator): @apply_defaults def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs): super(PlusOperator, self).__init__(*args, **kwargs) self.params = params or {} self.set_context = set_context def execute(self, context): if self.provide_context: context.update(self.op_kwargs) self.op_kwargs = context puls = self.op_kwargs["a"] + self.op_kwargs["b"] print "a =", self.op_kwargs["a"], ". b=", self.op_kwargs["a"] return_value = self.main() context[self.task_id].xcom_push(key="return_value", value=return_value) return puls # Defining the plugin class class PlusPlugin(AirflowPlugin): name = "plus_plugin" operators = [PlusOperator]
在dag中使用案例如下
from airflow.operators.plus_plugin import PlusOperator plus_task = PlusOperator(task_id="plus_task", provide_context=True, params={"a": 1,"b":2},dag=dag)一些命令說明
命令 | 說明 |
---|---|
airflow webserver -p 8091 | 8091啟動webserver,通過頁面查詢不需要可以不啟動 |
airflow scheduler | 調度器,必須啟動,不然dag沒法run起來(使用CeleryExecutor、LocalExecutor時) |
airflow run dagid [time] | run task instance |
airflow backfill [dagid] -s[startTime] -e [endTime] | run a backfill over 2 days |
run的demo # run your first task instance airflow run example_bash_operator runme_0 2018-01-11 # run a backfill over 2 days airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11基于CeleryExecutor方式的系統架構
使用celery方式的系統架構圖(官方推薦使用這種方式,同時支持mesos方式部署)。turing為外部系統,GDags服務幫助拼接成dag,可以忽略。
1.master節點webui管理dags、日志等信息。scheduler負責調度,只支持單節點,多節點啟動scheduler可能會掛掉
2.worker負責執行具體dag中的task。這樣不同的task可以在不同的環境中執行。
基于LocalExecutor方式的系統架構圖另一種啟動方式的思考,一個dag分配到1臺機器上執行。如果task不復雜同時task環境相同,可以采用這種方式,方便擴容、管理,同時沒有master單點問題。
基于源碼的啟動以及二次開發很多情況airflow是不滿足我們需求,就需要自己二次開發,這時候就需要基于源碼方式啟動。比如日志我們期望通過http的方式提供出來,同其他系統查看。airflow自動的webserver只提供頁面查詢的方式。
下載源碼github源碼地址 : [https://github.com/apache/inc...]
git clone git@github.com:apache/incubator-airflow.git
master分支的表初始化有坑,mysql設置的sql校驗安全級別過高一直建表不成功。這個坑被整的有點慘。v1-8-stable或者v1-9-stable分支都可以。
git checkout v1-8-stable
進入incubator-airflow,python setup.py install (沒啥文檔說明,又是一個坑。找了半天)
初始化直接輸入airflow initdb(python setup.py install這個命令會將airflow安裝進去)
修改配置進入$AIRFLOE_HOME (默認在~/airflow),修改airflow.cfg,修改mysql配置??梢圆榭瓷厦嫱扑]的文章以及上面的[使用mysql需要安裝]
啟動airflow webserver -p 8085
airflow scheduler
1.進入incubator-airflow/airflow/www/
2.修改views.py
在 class Airflow(BaseView)中添加下面代碼
@expose("/logs") @login_required @wwwutils.action_logging def logs(self): BASE_LOG_FOLDER = os.path.expanduser( conf.get("core", "BASE_LOG_FOLDER")) dag_id = request.args.get("dag_id") task_id = request.args.get("task_id") execution_date = request.args.get("execution_date") dag = dagbag.get_dag(dag_id) log_relative = "{dag_id}/{task_id}/{execution_date}".format( **locals()) loc = os.path.join(BASE_LOG_FOLDER, log_relative) loc = loc.format(**locals()) log = "" TI = models.TaskInstance session = Session() dttm = dateutil.parser.parse(execution_date) ti = session.query(TI).filter( TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == dttm).first() dttm = dateutil.parser.parse(execution_date) form = DateTimeForm(data={"execution_date": dttm}) if ti: host = ti.hostname log_loaded = False if os.path.exists(loc): try: f = open(loc) log += "".join(f.readlines()) f.close() log_loaded = True except: log = "*** Failed to load local log file: {0}. ".format(loc) else: WORKER_LOG_SERVER_PORT = conf.get("celery", "WORKER_LOG_SERVER_PORT") url = os.path.join( "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative ).format(**locals()) log += "*** Log file isn"t local. " log += "*** Fetching here: {url} ".format(**locals()) try: import requests timeout = None # No timeout try: timeout = conf.getint("webserver", "log_fetch_timeout_sec") except (AirflowConfigException, ValueError): pass response = requests.get(url, timeout=timeout) response.raise_for_status() log += " " + response.text log_loaded = True except: log += "*** Failed to fetch log file from worker. ".format( **locals()) if not log_loaded: # load remote logs remote_log_base = conf.get("core", "REMOTE_BASE_LOG_FOLDER") remote_log = os.path.join(remote_log_base, log_relative) log += " *** Reading remote logs... " # S3 if remote_log.startswith("s3:/"): log += log_utils.S3Log().read(remote_log, return_error=True) # GCS elif remote_log.startswith("gs:/"): log += log_utils.GCSLog().read(remote_log, return_error=True) # unsupported elif remote_log: log += "*** Unsupported remote log location." session.commit() session.close() if PY2 and not isinstance(log, unicode): log = log.decode("utf-8") title = "Log" return wwwutils.json_response(log)
3.重啟服務,訪問url如:
http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11
就可以拿到這個任務在execution_date=2018-01-11的日志
異步任務思考案例:task通過http請求大數據操作,拆分一些數據,存入一些臨時表。
方案:
1.新建一張task實例的狀態表如:task_instance_state。
2.擴展一個plugins,如:AsyncHttpOperator。AsyncHttpOperator實現邏輯:
在task_instance_state插入一條running狀態記錄running。
發送http請求給大數據平臺,操作數據。
輪詢查詢task_instance_state狀態是成功、失敗、running。如是running則繼續輪詢,成功、失敗操作相應后續操作。
3.提供一個restful api update task_instance_state,供大數據平臺回調,修改任務實例狀態。
不錯的文章推薦瓜子云的任務調度系統
Get started developing workflows with Apache Airflow
官網地址
生產環境使用可能遇到的坑
初探airflow
焦油坑
系統研究Airbnb開源項目airflow
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/41221.html
摘要:數據科學項目的完整流程通常是這樣的五步驟需求定義數據獲取數據治理數據分析數據可視化一需求定義需求定義是數據科學項目和數據科學比賽的最大不同之處,在真實情景下,我們往往對目標函數自變量約束條件都并不清晰。 概述 和那些數據科學比賽不同,在真實的數據科學中,我們可能更多的時間不是在做算法的開發,而是對需求的定義和數據的治理。所以,如何更好的結合現實業務,讓數據真正產生價值成了一個更有意義的...
摘要:概述是一個我們正在用的工作流調度器,相對于傳統的任務管理,很好的為我們理清了復雜的任務依賴關系監控任務執行的情況。步驟三修改默認數據庫找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信號商業使用請聯系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...
摘要:有了自己的系統我覺得就很安心了,以后能夠做數據處理和機器學習方面就相對方便一些。隆重推薦的工具是我很喜歡的公司,他們有很多開源的工具,我覺得是最實用的代表。是,在很多機器學習里有應用,也就是所謂的有向非循環。 最近在Prettyyes一直想建立起非常專業的data pipeline系統,然后沒有很多時間,這幾個禮拜正好app上線,有時間開始建立自己的 data pipeline,能夠很...
摘要:顯然,這單獨執行不起作用這將通過子操作符被作為像是自己的調度任務中那樣運行。子也必須有個可用調度即使子作為其父的一部分被觸發子也必須有一個調度如果他們的調度是設成,這個子操作符將不會觸發任何任務。這兩個例子都是緣起子操作符被當做了回填工作。 showImg(https://segmentfault.com/img/remote/1460000006768714); 前言 Airbnb的...
閱讀 2943·2023-04-25 19:20
閱讀 786·2021-11-24 09:38
閱讀 2040·2021-09-26 09:55
閱讀 2430·2021-09-02 15:11
閱讀 2015·2019-08-30 15:55
閱讀 3610·2019-08-30 15:54
閱讀 3148·2019-08-30 14:03
閱讀 2962·2019-08-29 17:11