大家好,我叫高楚枫,来自阿里云EMR团队的开发工程师,同时也是ApacheDolphinScheduler的PMC成员之一。
在现代的数据处理和调度过程中,工作流的依赖管理变得越来越复杂,尤其是当涉及多个工作流的依赖关系时。ApacheDolphinScheduler为此提供了强大的跨工作流依赖功能,帮助开发者更高效地管理和调度复杂任务。
接下来,我将为大家逐一讲解这些内容。
首先,给大家介绍一下ApacheDolphinScheduler中跨工作流依赖的实现方式,大家可以看下面这个截图,主要通过一个名为dependenttask的任务插件来实现。
对于那些熟悉ApacheDolphinScheduler的朋友来说,可能会觉得奇怪,因为它看起来像是一个普通的任务插件。
那么,这其中有什么特别之处?为什么跨工作流依赖功能显得复杂呢?实际上,这背后有以下三个关键点:
在ApacheDolphinScheduler中,插件有两种形式:
大家可以看看下面这张图片,当你在DolphinScheduler中创建一个dependenttask后,你可以选择项目名称、工作流名称和任务名称,从而指定跨工作流的任务依赖。甚至在跨项目的工作流中,也可以通过这种方式进行配置。
有些人可能会问:为什么我们不在同一个工作流里解决依赖关系?
这是因为,在像ApacheDolphinScheduler或Airflow这样的开源调度工具中,调度的属性是在工作流级别上定义的,而不是任务本身。
如果两个任务之间存在依赖关系,但它们的调度周期不同,就需要通过跨工作流依赖来解决。
这里已经涉及到了一些概念,任务的调度周期、依赖的周期以及依赖失败的处理策略都是非常重要的概念。这些将在后续部分中跟大家详细讲解。
例如,dependenttype可以指定为dependentontask,表示依赖于跨工作流中的具体任务。还有另一种形式是alltask,即依赖于整个工作流的执行结果。
当前DolphinScheduler中的跨工作流依赖管理还存在一些复杂性。这边详细去讲的话,又会有其他的情况。
比如在生产环境中,某些用户希望判断工作流实例中每个任务的成功与否,这需要通过手动组合多个依赖项来实现。虽然目前还没有一键依赖所有任务的功能,但通过dependenttask中的逻辑组合按钮,我们可以实现复杂依赖关系的配置。
在跨工作流依赖的实现中,ApacheDolphinScheduler通过多个唯一标识符来管理项目、工作流和任务。
这些标识符包括projectcode、definitioncode和taskcode,它们分别对应项目名称、工作流名称和任务名称。这些唯一标识符是由后台自动生成的,用于识别跨工作流的依赖关系,确保各项任务之间的依赖能够准确生效。
除了ApacheDolphinScheduler,ApacheAirflow中也可以通过Sensor来实现跨工作流依赖实现。
其中一个常见的配置是ExternalTaskSensor,用于监控外部工作流的任务状态。和DolphinScheduler的taskcode类似,Airflow的external_task_id和external_dag_id提供了对应的任务和工作流标识符,帮助不同的工作流之间建立依赖关系。
Airflow的Sensor通常会占用worker的slot,在早期版本中,这可能会导致资源浪费,因为当依赖的外部任务还未完成时,Sensor会持续等待占用资源。
不过,Airflow在较新版本中引入了一个名为Trigger的机制,优化了这一流程。
它允许异步检查依赖状态。一旦依赖条件达成,Trigger会重新唤醒任务,从而避免占用过多的workerslot,实现更高效的资源利用。
当然,无论是在Airflow还是Dolphin中也有一些技巧可以用来处理跨工作流依赖的场景。
例如,Airflow中有个TriggerDagRunOperator。这个operator可以触发另一个外部工作流中的任务运行。
DolphinScheduler也支持使用子工作流的方式来实现跨工作流依赖。这种方法是将子工作流嵌入到主工作流中,并通过主工作流触发其他外部任务。
虽然这种方式在处理简单场景时效果良好,但它并不适用于复杂场景。特别是在上游和下游工作流具有不同周期,或存在依赖周期需求的情况下,这种方法会有很多局限性:
所在这里不建议使用子工作流的方式处理跨工作流依赖,大家可以看到它存在以下缺陷:
无法应对复杂场景:当上游工作流和下游工作流的周期不一致,或者存在复杂的依赖关系时,子工作流的方式显得力不从心。
强耦合问题:子工作流会使得上游和下游工作流紧密耦合,导致当需要修改依赖关系时,不得不去修改上游工作流中的节点。这不仅增加了维护的复杂度,也降低了灵活性。
因此,虽然子工作流在某些简单场景下可以使用,但对于依赖关系复杂、调度周期不一致的场景,不推荐使用这种方法。接下来,我将深入讲解为什么跨工作流依赖会变得如此复杂,以及背后的挑战。
在ApacheDolphinScheduler中,dependenttask是一个相对复杂的功能模块。
首先,dependenttask的判定逻辑本身就非常复杂,不仅在代码实现上有相当多的逻辑分支,而且在语言层面和功能的理解上,用户往往也会遇到困难。
它涉及到不同任务之间的依赖关系、调度逻辑,以及依赖满足与否的多种判断情况。
不同的工作流可能有着不同的执行频率和周期,而在跨工作流依赖的场景中,调度周期和依赖周期如何匹配,以及如何协调这些周期之间的关系,都是容易让人感到困惑的地方。
另外一个复杂场景是补数(Backfill),这是指在历史数据的场景下,重新运行某个实例以补充之前执行失败的任务。在这种情况下,依赖关系可能非常复杂,涉及多层次的任务和工作流依赖,还包括对同周期工作流的重新部署,很多用户对此理解有误,容易混淆。
此外,dependenttask还支持不同的失败策略,每个依赖任务可以配置专门的失败处理逻辑。这种灵活性带来了更多的复杂情况,不同的失败策略会导致任务在执行时表现出不同的行为。
为了更好地理解dependenttask的执行逻辑,我们需要通过它的端到端流程来分析。在ApacheDolphinScheduler中,dependenttask的提交与执行流程和其他任务插件类似,但在判定任务是否成功的过程中有其不同的操作。
当提交一个dependenttask时,首先会进行实例化。系统会从数据库中提取任务的配置参数,生成一个任务实例,并将其提交到dependenttask的执行流程中。
在任务提交后,它会被注册到一个名为StateWheelExecuteThread的线程中。该线程会定期轮询,监控任务状态的变化。如果依赖的上游任务还没有完成,它会保持任务的running状态,并等待上游任务的依赖条件满足。
ApacheDolphinScheduler的核心是基于事件驱动的工作流调度系统。当dependenttask接收到任务状态变化的事件时,才会触发下一步的判断。通过这种事件驱动的方式,可以在不占用多余资源的情况下实现等待依赖完成的目的。
dependenttask的执行流程中有一个TaskStateEventHandler,负责对依赖任务的状态进行判定。如果依赖任务满足条件,dependenttask会结束执行;如果依赖未满足,则会继续轮询,直到依赖完成。
下面标注了相应的代码,通过查看源码,可以更清晰地理解dependenttask的执行逻辑。
DependentTaskProcessor.runTaskDependentTaskProcessor.getTaskDependResultDependentTaskProcessor.getTaskDependResultDependentExecute.getDependentResultForItemDependentExecute.findLastProcessIntervalDependentExecute.calculateResultForTasksDependentExecute.getDependTaskResultDependentExecute.getDependTaskResultDependentExecute.getDependTaskResultDependentExecute.getDependTaskResult
然而,在3.2.0版本中,DolphinScheduler对依赖功能进行了较大的重构,可能从代码结构上看会有所不同。尽管如此,其核心思想和主要实现方式仍然保持一致,差异主要体现在细节上。
依赖任务的判断逻辑相对复杂,涉及多个步骤和条件的处理。
以下是该判断过程的详细解释:
依赖项的结束条件
首先,一个依赖任务可能依赖多个其他任务或工作流。这些依赖项必须全部结束,依赖任务才会继续往下执行。如果有任何一个依赖项尚未结束,依赖任务将会被阻塞,继续等待。
遍历依赖项
当所有依赖项结束时,系统会对每个依赖项进行遍历。这是一个循环过程,依赖任务会逐一检查每个依赖项的状态,确保其执行结果满足预期。
依赖周期的处理
依赖周期是跨工作流依赖中的一个核心概念。不同的任务可能具有不同的周期性调度,因此需要系统在每个周期内计算依赖的满足情况。
对于ScheduleTime,由不同触发方式所产生的工作流实例情况有所不同:
依赖任务可以分为两种情况:依赖整个工作流:在这种情况下,系统会检查整个工作流是否成功执行。如果工作流成功,则依赖任务也会成功;如果工作流失败,则依赖任务也会失败。如果工作流还在执行中,任务将继续等待,不会立即做出结论。
依赖具体的Task实例:系统需要先判断指定的Task实例是否存在。如果工作流实例已生成,但尚未执行到该Task,任务同样会继续等待,直到找到相应的Task实例,并根据Task实例的执行结果来判定依赖是否满足。
在dependenttask中,用户可以通过配置逻辑表达式来处理多个依赖项。
系统提供了"且"(AND)和"或"(OR)两种逻辑操作符,用于组合多个依赖项。如果你有多个跨工作流的依赖任务,通过这些逻辑表达式,可以将它们组合起来,形成复杂的依赖关系。
最终,系统会根据你设置的逻辑表达式计算依赖项的整体状态。只有在所有组合条件满足的情况下,任务依赖才会被认为成功。
这张图的是代码入口,如果大家想去看具体的实现,或者是想要去推敲细节的话,可以去根据这个代码入口去看。
假设有一个子工作流在每月内需要检查父工作流的执行情况。那么,系统会在子工作流执行时,检查父工作流是否在本月内生成并完成。如果发现父工作流实例满足条件,则任务依赖成功。
例如:
实际上,系统是从整点开始计算周期的,即从4:00到5:00。
这里直接给大家贴了代码,通过查看该函数的实现,可以更明确地理解如何处理跨工作流的依赖周期,避免产生误解。
在之前的介绍中,我们已经了解到跨工作流依赖的复杂性。
然而,补数(backfill)操作进一步增加了复杂度,尤其是在生产环境中处理多层级的工作流依赖时,如果不加以仔细规划,很容易导致生产问题。
接下来我们讨论一下在补数操作中,跨工作流依赖会带来哪些挑战,并通过具体例子进行说明。
跨工作流的依赖可能涉及多个层级。
一个典型场景是有三个工作流:
A、B和C,每个工作流中包含多个任务。各个任务之间存在跨工作流的依赖关系。例如,任务B2依赖于A中的任务A2,而C1依赖于B2。这个层级关系在实际场景中可能会更加复杂,甚至可能涉及三到四个层级的工作流。
在补数操作中,我们要考虑的问题是:当上游工作流开始补数时,下游工作流将会如何受到影响?
假设当前有三个工作流,A、B和C,A正处于上线状态,而B和C处于下线状态。
在这种情况下,如果我们触发工作流A的补数操作,哪些工作流和任务会受到影响?
根据补数的执行逻辑,只有工作流A中的任务会受到影响。系统在补数时会检查下游工作流的状态。
如果下游工作流未上线或未被挂起,那么它们将不会受到补数操作的影响。因此,在这个场景下,补数操作只会影响工作流A。
假设现在工作流A、B和C都处于上线状态,如果我们触发工作流A的补数操作,那么哪些任务会受到影响?
这个问题的答案取决于使用的DolphinScheduler版本:
在某些情况下,工作流A和工作流B的调度周期不一致。
比如,工作流A每天执行一次,而工作流B每小时执行一次。那么,当我们对工作流A进行补数操作时,B的调度周期如何处理?是按照A的周期,还是B的周期来执行?
答案是:以父工作流的周期为准。
在补数操作的底层逻辑中,系统会根据父工作流的周期生成实例。在生成补数实例时,DolphinScheduler会向数据库中插入相应的指令(command),该指令的周期属性是基于父工作流的调度周期。
因此,在这种场景下,补数的周期将以工作流A的调度周期为准。
在跨工作流依赖的实际使用过程中,用户经常会遇到一些问题,很多问题涉及概念上的模糊理解。
这些问题虽然看似常见,但往往会导致实际的调度问题。我们不一一详细讲解每个问题,而是挑选了一些典型的例子进行说明。
没有实例的情况下,它不会被认为是失败的依赖,这与用户的直观感受不同,因此容易产生误解。
另一个常见的问题是,用户将依赖周期配置为"月"或"小时"后,发现依赖节点一直卡在运行状态。
这通常是因为用户对依赖周期的理解存在偏差。很多用户会误认为"月"是指绝对的30天,而不是从当月的开始到月底。
因此,用户在理解依赖周期时,可能会混淆其实际含义,从而导致依赖节点无法正常工作。
有些用户尝试将两个依赖节点串联在同一个工作流中,他们通过创建多个dependenttask,并将这些任务串联起来。
更推荐的做法是:在一个dependenttask中,通过逻辑操作符(AND/OR)来配置多个依赖项,而不是通过串联多个dependenttask来实现。
我今天穿的这件衣服就是在第一次参加ApacheDolphinSchedulerMeetup社区送的,每次参加ApacheConAsia时我都会穿它。
如果大家有任何问题,欢迎随时交流讨论。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。