摘要:如果沒(méi)有指定,則沒(méi)有期限。取消當(dāng)前正在運(yùn)行的,然后新建來(lái)替換。和這兩個(gè)字段也是可選的。設(shè)置限制值為,相關(guān)類(lèi)型的完成后將不會(huì)被保留。列出所有的列出所有的遍歷所有的根據(jù)字段確定該是否由所創(chuàng)建。
k8s version: v1.11.0源碼流程圖 概述author: lbl167612@alibaba-inc.com
cronJob controller 的實(shí)現(xiàn)比較簡(jiǎn)單,使用 Cron - Wikipedia 的方法,確定調(diào)度規(guī)則,底層的調(diào)度對(duì)象就是依賴(lài)了 job,它不會(huì)去檢查任何 Pod。
該 controller 也沒(méi)有依賴(lài)各種 informer,就簡(jiǎn)單創(chuàng)建了一個(gè)循環(huán)運(yùn)行的協(xié)程,每次遍歷現(xiàn)有的 jobs & cronJobs,整理它們的關(guān)系并進(jìn)行管理。
注意:kubernetes version >= 1.4 (ScheduledJob),>= 1.5(CronJob),需要給 apiserver 傳遞 --runtime-config=batch/v2alpha1=true 開(kāi)啟 batch/v2alpha1 API 才可用。spec 關(guān)鍵字段
.spec.schedule 是 cronJob 的必填字段,該值是 Cron - Wikipedia 格式的字符串,例如:0 * * * *,或者 @hourly,來(lái)確定調(diào)度策略。
.spec.startingDeadlineSeconds 是可選字段,表示啟動(dòng) Job 的期限(秒級(jí)別),如果因?yàn)槿魏卧蚨e(cuò)過(guò)了被調(diào)度的時(shí)間,那么錯(cuò)誤執(zhí)行時(shí)間的 Job 被認(rèn)為是失敗的。如果沒(méi)有指定,則沒(méi)有期限。
.spec.concurrencyPolicy 也是可選字段,指定了 cronJob 創(chuàng)建 Job 的并發(fā)執(zhí)行策略:
Allow(默認(rèn)):允許并發(fā)運(yùn)行 Job。
Forbid:禁止并發(fā)運(yùn)行,如果前一個(gè)還沒(méi)有完成,則直接跳過(guò)。
Replace:取消當(dāng)前正在運(yùn)行的 Jobs,然后新建 Job 來(lái)替換。
.spec.suspend 也是可選字段,如果設(shè)置為 true,則后續(xù)所有的執(zhí)行都會(huì)被過(guò)濾掉,但是對(duì)當(dāng)前已經(jīng)在運(yùn)行的 Job 不影響。默認(rèn)為false。
.spec.successfulJobsHistoryLimit 和 .spec.failedJobsHistoryLimit 這兩個(gè)字段也是可選的。它們指定了可以保留完成和失敗 Job 數(shù)量的限制。
默認(rèn)沒(méi)有限制,所有成功和失敗的 Job 都會(huì)被保留。然而,當(dāng)運(yùn)行一個(gè) Cron Job 時(shí),很快就會(huì)堆積很多 Job,推薦設(shè)置這兩個(gè)字段的值。設(shè)置限制值為 0,相關(guān)類(lèi)型的 Job 完成后將不會(huì)被保留。
路徑:pkg/controller/cronjob/cronjob_controller.go
type CronJobController struct { // 訪問(wèn) kube-apiserver 的 client. kubeClient clientset.Interface // job 控制器,用于創(chuàng)建和刪除 job. jobControl jobControlInterface // cronJob 控制器,用于更新?tīng)顟B(tài). sjControl sjControlInterface // pod 控制器,用于list & delete pods // 在刪除 job 時(shí),同時(shí)也清理 job 創(chuàng)建的 pods. podControl podControlInterface // cronJob 相關(guān)的events, 通過(guò)該 recorder 進(jìn)行廣播 recorder record.EventRecorder }
注意:代碼中有很多sj,因?yàn)橐郧安唤?cronJob,叫 scheduled jobs。startCronJobController()
路徑:cmd/kube-controller-manager/app/batch.go
startCronJobController() 是啟動(dòng) cronJob controller 的入口函數(shù)。它會(huì)初始化 CronJobController 對(duì)象,并Run().
func startCronJobController(ctx ControllerContext) (bool, error) { // 在啟動(dòng) cronJob controller 之前,判斷下 cronJob 是否有配置生效 // 用戶(hù)可以在創(chuàng)建k8s clusters時(shí),通過(guò)修改kube-apiserver --runtime-config配置想要生效的 resource if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return false, nil } // 初始化 CronJobController 對(duì)象 cjc, err := cronjob.NewCronJobController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return true, fmt.Errorf("error creating CronJob controller: %v", err) } // Run go cjc.Run(ctx.Stop) return true, nil }syncAll()
CronJobController Run() 方法比較簡(jiǎn)單,就是每10s 循環(huán)調(diào)用 syncAll() 函數(shù)。
syncAll() 邏輯也比較清楚,根據(jù)初始化的 kubeClient, 獲取所有的 jobs 和 cronJobs,并遍歷所有 Jobs, 根據(jù)ObjectMeta.OwnerReferences 字段匹配是否由 cronJob controller 所創(chuàng)建。最后基于 cronJob 的UUID 進(jìn)行整理。
最后處理所有的 cronJobs,確認(rèn)需要調(diào)度的時(shí)間并根據(jù)并行策略創(chuàng)建 jobs,同步完后再清理所有已經(jīng) finished jobs。
func (jm *CronJobController) syncAll() { // 列出所有的 jobs jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can"t list Jobs: %v", err)) return } js := jl.Items glog.V(4).Infof("Found %d jobs", len(js)) // 列出所有的 cronJobs sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can"t list CronJobs: %v", err)) return } sjs := sjl.Items glog.V(4).Infof("Found %d cronjobs", len(sjs)) // 遍歷所有的 jobs, 根據(jù) ObjectMeta.OwnerReferences 字段確定該 job 是否由 cronJob 所創(chuàng)建。 // 然后根據(jù) cronJob uuid 進(jìn)行排列 jobsBySj := groupJobsByParent(js) glog.V(4).Infof("Found %d groups", len(jobsBySj)) // 遍歷所有的 cronJobs for _, sj := range sjs { // 進(jìn)行同步 // 確定需要調(diào)度的時(shí)間,并根據(jù) Spec.ConcurrencyPolicy 策略,確認(rèn)如何來(lái)創(chuàng)建 jobs // 并更新 cronJob.Status syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) // 清理所有已經(jīng)完成的 jobs cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) } }syncOne()
該接口就是 cronJob controller 中實(shí)現(xiàn)同步的關(guān)鍵部分。
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) // 遍歷所有獲取到的 jobs // 1.記錄到 childrenJobs 中,表示當(dāng)前屬于該 cronJob 的所有 Jobs,便于后面清理 cronJob 中記錄的 active Jobs // 2.查看該 job 是否在 cronJob.Status.Active 的列表中 // - 如果在的話(huà),且該 Job 已經(jīng) finished,則將該 job 從 active list 中刪除 // - 如果不在,且該 Job 還沒(méi)有 finished,則發(fā)送異常事件 childrenJobs := make(map[types.UID]bool) for _, j := range js { childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(*sj, j.ObjectMeta.UID) if !found && !IsJobFinished(&j) { recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) } else if found && IsJobFinished(&j) { deleteFromActiveList(sj, j.ObjectMeta.UID) // TODO: event to call out failure vs success. recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) } } // 遍歷 cronJob 所有的 active jobs, 根據(jù)前面的 childrenJobs 來(lái)判斷該繼續(xù)的 active job 是否還存在,如果不存在的話(huà),也從 active list 中刪除。 for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) deleteFromActiveList(sj, j.UID) } } // 上面更新了 cronJob.Status.Active 字段,所以需要更新一把 cronJob updatedSJ, err := sjc.UpdateStatus(sj) if err != nil { glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) return } *sj = *updatedSJ // 如果 cronJob 已經(jīng)被用戶(hù)刪除,則直接 return if sj.DeletionTimestamp != nil { return } // 如果 cronJob 已經(jīng)被 suspend,也直接 return if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } // 根據(jù) cronJob 的創(chuàng)建時(shí)間或最近一次的調(diào)度時(shí)間,和 cronJob.Spec.Schedule 配置,計(jì)算出到現(xiàn)在為止所有應(yīng)該調(diào)度的時(shí)間點(diǎn)。 times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } // 如果返回的時(shí)間點(diǎn)列表為空,則表示該 cronJob 暫時(shí)還不需要調(diào)度,直接 return if len(times) == 0 { glog.V(4).Infof("No unmet start times for %s", nameForLog) return } // 有多次未滿(mǎn)足的調(diào)度時(shí)間 if len(times) > 1 { glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) } // scheduledTime 取列表中的最后一次時(shí)間 scheduledTime := times[len(times)-1] tooLate := false // 如果用戶(hù)配置了 Spec.StartingDeadlineSeconds,則需要判斷 scheduledTime 是否滿(mǎn)足條件 // 如果 now - scheduledTime > Spec.StartingDeadlineSeconds,則直接 return if sj.Spec.StartingDeadlineSeconds != nil { tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) } if tooLate { glog.V(4).Infof("Missed starting window for %s", nameForLog) return } // scheduledTime 滿(mǎn)足各種條件的情況下,就需要查看 cronJob 配置的并發(fā)策略 // 如果 ForbidConcurrent,且 active jobs > 0, 則直接 return; // 否則繼續(xù)往下創(chuàng)建; if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) return } // 如果 ReplaceConcurrent,則刪除所有的 active jobs, 等后面重新創(chuàng)建 if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) job, err := jc.GetJob(j.Namespace, j.Name) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } if !deleteJob(sj, job, jc, pc, recorder, "") { return } } } // 根據(jù) cronJob.spec.JobTemplate,填充 job 的完整結(jié)構(gòu) // 比如 name, labels, OwnerReferences 等等。 jobReq, err := getJobFromTemplate(sj, scheduledTime) if err != nil { glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) return } // 創(chuàng)建 job jobResp, err := jc.CreateJob(sj.Namespace, jobReq) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) return } glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) // 根據(jù)創(chuàng)建 job 返回的 response,獲取 ObjectReference 結(jié)構(gòu) // 用于記錄到 cronJob.Status.Active 中 ref, err := getRef(jobResp) if err != nil { glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) } else { sj.Status.Active = append(sj.Status.Active, *ref) } // 設(shè)置最近一次的調(diào)度時(shí)間 sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} // 更新 cronJob if _, err := sjc.UpdateStatus(sj); err != nil { glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) } return }參考資料
Running automated tasks with cron jobs - Kubernetes
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/32721.html
摘要:用于批量處理短暫的一次性任務(wù),并保證指定數(shù)量的成功結(jié)束。一旦有一個(gè)成功結(jié)束,其他都會(huì)準(zhǔn)備退出。默認(rèn)值指定可運(yùn)行的時(shí)間期限,超過(guò)時(shí)間還未結(jié)束,系統(tǒng)將會(huì)嘗試進(jìn)行終止。已知問(wèn)題設(shè)置為時(shí),會(huì)與沖突,可以暫時(shí)將設(shè)置為進(jìn)行規(guī)避。 介紹 Kubernetes有兩個(gè)概念跟job有關(guān): Job: 負(fù)責(zé)批量處理短暫的一次性任務(wù),僅執(zhí)行一次,并保證處理的一個(gè)或者多個(gè)Pod成功結(jié)束。 CronJob: ...
摘要:用于獲取元數(shù)據(jù)及根據(jù)的來(lái)匹配該會(huì)使用到的接口如下用于根據(jù)反推根據(jù)獲取元數(shù)據(jù)提供了接口用于獲取指定下管理的所有通過(guò)的數(shù)據(jù)變更,比如,來(lái)操作該。 k8s version: v1.11.0author: lbl167612@alibaba-inc.com 源碼流程圖 showImg(https://segmentfault.com/img/remote/1460000016496285?w...
摘要:標(biāo)識(shí)是與操作對(duì)象間的紐帶。集群為每個(gè)對(duì)象維護(hù)三類(lèi)信息對(duì)象元數(shù)據(jù)期望狀態(tài)與實(shí)際狀態(tài)元數(shù)據(jù)指對(duì)象的基本信息,比如命名標(biāo)簽注釋等等,用于識(shí)別對(duì)象期望狀態(tài)一般由用戶(hù)配置來(lái)描述的實(shí)際狀態(tài)是由集群各個(gè)組件上報(bào)的集群實(shí)際的運(yùn)行情況。 綜述 學(xué)習(xí)Kubernetes時(shí),發(fā)現(xiàn)它的概念和術(shù)語(yǔ)還是比較多的,光靠啃官方文檔比較晦澀。所以邊學(xué)習(xí)邊整理,對(duì)主要的概念和術(shù)語(yǔ)做一下分類(lèi)及簡(jiǎn)要說(shuō)明。感覺(jué)把重要概念都理解...
摘要:功能提供的指標(biāo),按照階段分為三種類(lèi)別實(shí)驗(yàn)性質(zhì)的中階段的或者的字段。穩(wěn)定版本的中不向后兼容的主要版本的更新被廢棄的已經(jīng)不在維護(hù)的。通過(guò)比較來(lái)保證的順序并不保證包含所有資源本文為容器監(jiān)控實(shí)踐系列文章,完整內(nèi)容見(jiàn) 概述 已經(jīng)有了cadvisor、heapster、metric-server,幾乎容器運(yùn)行的所有指標(biāo)都能拿到,但是下面這種情況卻無(wú)能為力: 我調(diào)度了多少個(gè)replicas?現(xiàn)在可...
摘要:功能提供的指標(biāo),按照階段分為三種類(lèi)別實(shí)驗(yàn)性質(zhì)的中階段的或者的字段。穩(wěn)定版本的中不向后兼容的主要版本的更新被廢棄的已經(jīng)不在維護(hù)的。通過(guò)比較來(lái)保證的順序并不保證包含所有資源本文為容器監(jiān)控實(shí)踐系列文章,完整內(nèi)容見(jiàn) 概述 已經(jīng)有了cadvisor、heapster、metric-server,幾乎容器運(yùn)行的所有指標(biāo)都能拿到,但是下面這種情況卻無(wú)能為力: 我調(diào)度了多少個(gè)replicas?現(xiàn)在可...
閱讀 2294·2021-09-22 15:27
閱讀 3166·2021-09-03 10:32
閱讀 3491·2021-09-01 11:38
閱讀 2493·2019-08-30 15:56
閱讀 2206·2019-08-30 13:01
閱讀 1531·2019-08-29 12:13
閱讀 1410·2019-08-26 13:33
閱讀 885·2019-08-26 13:30