Openstack TaskFlow 分析

引言

简介

OpenStack TaskFlow是一个用于编排和协调OpenStack任务的框架。它提供了一种简单的方式来定义和管理任务流程,以及处理任务之间的依赖关系。TaskFlow支持任务的状态管理、任务重试、任务失败处理、任务取消等功能,可以帮助开发人员更好地管理和控制任务的执行过程。TaskFlow整体架构如下:

image.png

TaskFlow 分为提供同步执行能力的基础部分(上图左半部分)和依赖外部支撑提供异步运行的可选部分(右半部分),目前 OpenStack 的组件主要用的还是基础部分的功能。上图右半部分可以用于编排运行在分布式集群环境的工作流,例如通过worker架构安装一个一主二从架构的mysql集群。

另外TaskFlow提供了一些Openstack用到的场景和代码用例,例如创建虚拟机,创建数据卷等。具体可参考:
docs.openstack.org/taskflow/la…

优势

Taskflow 以一个python依赖包的方式进行发布,方便其他python程序进行集成;TaskFlow内部很多模块都先定义抽象接口并后提供多种的实现,方便用户根据自己业务场景进行选择或者自定义扩展。

框架入口

框架入口是源码文件 taskflow.helpers.py 定义的 load() 和 run() 函数:

# 创建一个engine对象,engine类型通过参数 engine= 来指定
def load(flow, store=None, flow_detail=None, book=None,
         backend=None, namespace=ENGINES_NAMESPACE,
         engine=ENGINE_DEFAULT, **kwargs):
         
# 创建一个engine对象并执行engine,返回 flow的执行结果。engine类型通过参数 engine= 来指定
def run(flow, store=None, flow_detail=None, book=None,
        backend=None, namespace=ENGINES_NAMESPACE,
        engine=ENGINE_DEFAULT, **kwargs):

下面demo代码执行一个简单的taskflow:

class Task1(task.Task):
    def execute(self, *args, **kwargs):
        print("执行Task1 ......")
        time.sleep(15)
        return True




    def revert(self, *args, **kwargs):
        print("Task1 执行失败,开始回滚...")



class Task2(task.Task):
    def execute(self, *args, **kwargs):
        print("执行Task2...")
        time.sleep(10)
        return True




flow = gf.Flow("taskflow demo").add(*[Task1(), Task2()])
engine = taskflow.engines.load(flow)
engine.run()

核心概念

TaskFlow涉及较多的概念,本节内容将分别对它的的核心的念进行解说。

Atom

Atom是指一个原子操作,它是TaskFlow中的基本执行单元。Atom操作是不可分割的,要么全部执行成功,要么全部回滚。这意味着,如果在执行Atom操作时发生错误,TaskFlow会自动回滚到操作之前的状态,以确保数据的一致性和完整性。

Atom操作通常用于执行一些简单的任务,例如创建或删除一个资源,或者执行一些简单的计算。

Atom定义了与任务执行以及回滚相关的函数接口:

# 执行任务前调用
def pre_execute(self):







# 执行任务后调用
def post_execute(self):




# 任务执行的逻辑
def execute(self, *args, **kwargs):



# 回滚操作,execute函数执行失败时调用
def revert(self, *args, **kwargs):

# 回滚函数执行之后调用
def post_revert(self):

Task

Task继承了Atom:

class Task(atom.Atom, metaclass=abc.ABCMeta):
    """An abstraction that defines a potential piece of work.







    This potential piece of work is expected to be able to contain
    functionality that defines what can be executed to accomplish that work
    as well as a way of defining what can be executed to reverted/undo that
    same piece of work.
    """

每个Task都有一个唯一的名称和一组输入和输出。输入是Task执行所需的数据,输出是Task执行后产生的数据。Task可以依赖于其他Task的输出,这样就可以构建复杂的任务流。使用TaskFlow时可以通过继承Task类来创建具体的业务任务。从源码上看相比于Atom,Task多了一个 _notifier 属性,用于监听事件。当前已经定义的Event有:

TASK_EVENTS = (EVENT_UPDATE_PROGRESS,)

Retry

Retry 也继承了Atom:

class Retry(atom.Atom, metaclass=abc.ABCMeta):
    """A class that can decide how to resolve execution failures.







    This abstract base class is used to inherit from and provide different
    strategies that will be activated upon execution failures. Since a retry
    object is an atom it may also provide :meth:`~taskflow.retry.Retry.execute`
    and :meth:`~taskflow.retry.Retry.revert` methods to alter the inputs of
    connected atoms (depending on the desired strategy to be used this can be
    quite useful).



    NOTE(harlowja): the :meth:`~taskflow.retry.Retry.execute` and
    :meth:`~taskflow.retry.Retry.revert` and
    :meth:`~taskflow.retry.Retry.on_failure` will automatically be given
    a ``history`` parameter, which contains information about the past
    decisions and outcomes that have occurred (if available).
    """

Flow

Flow表示一个任务流程,定义了一组任务及其之间的依赖关系,同时可以定义控制流程,例如循环、条件分支。Flow可以看作是一个有向无环图(DAG),其中节点表示任务,边表示任务之间的依赖关系。

Flow 模式(pattern)

  • liner_flow : 线性模式,任务按照指定的顺序依次执行,每个任务的输出作为下一个任务的输入,这是最简单的模式。
  • graph_flow: 图模式,任务与任务之间以有向无环图(DAG)的模式来组织。任务之间可以并行执行,一个任务之间可以传递数据。

Engine

engine是一个flow运行的入口和核心控制器。负责管理任务的状态、执行任务、处理任务之间的依赖关系以及处理任务执行过程中的异常情况。Taskflow内部很多的模块组件都是以插件的方式进行加载的,Engine也不例外,从源码的 setup.cfg 配置文件可知道目前提供以下几种实现:

taskflow.engines =
    default = taskflow.engines.action_engine.engine:SerialActionEngine
    serial = taskflow.engines.action_engine.engine:SerialActionEngine
    parallel = taskflow.engines.action_engine.engine:ParallelActionEngine
    worker-based = taskflow.engines.worker_based.engine:WorkerBasedActionEngine
    workers = taskflow.engines.worker_based.engine:WorkerBasedActionEngine

SerialActionEngine

SerialActionEngine为默认的engine,它是一种串行执行引擎,可以按照指定的顺序依次执行任务。它内部有一个 _task_executor,负责Task的执行:

class SerialActionEngine(ActionEngine):
    """Engine that runs tasks in serial manner."""







    def __init__(self, flow, flow_detail, backend, options):
        super(SerialActionEngine, self).__init__(flow, flow_detail,
                                                 backend, options)
        self._task_executor = executor.SerialTaskExecutor()

SerialTaskExecutor是一个串行执行器,可以在源码找到它的定义:

class SerialTaskExecutor(TaskExecutor):
    """Executes tasks one after another."""







    def __init__(self):
        self._executor = futurist.SynchronousExecutor()




    def start(self):
        self._executor.restart()



    def stop(self):
        self._executor.shutdown()

所以即使是Flow定义了多分支并行的Task,它们在engine内部也是串行执行的。

ParallelActionEngine

并行执行任务的engine,提供了多线程和多进程两种并行方式。

class ParallelActionEngine(ActionEngine):
    def __init__(self, flow, flow_detail, backend, options):
        super(ParallelActionEngine, self).__init__(flow, flow_detail,
                                                   backend, options)
        # This ensures that any provided executor will be validated before
        # we get to far in the compilation/execution pipeline...
        self._task_executor = self._fetch_task_executor(self._options)

启动一个ParallelActionEngine时,通过指定option来选择_task_executor的类型,从而选择不同的任务并行运行方式:

=========================  ===============================================
Type provided              Executor used
=========================  ===============================================
|cft|.ThreadPoolExecutor   :class:`~.executor.ParallelThreadTaskExecutor`
|cfp|.ProcessPoolExecutor  :class:`~.|pe|.ParallelProcessTaskExecutor`
|cf|._base.Executor        :class:`~.executor.ParallelThreadTaskExecutor`
=========================  ===============================================
# One of these types should match when a object (non-string) is provided
    # for the 'executor' option.
    #
    # NOTE(harlowja): the reason we use the library/built-in futures is to
    # allow for instances of that to be detected and handled correctly, instead
    # of forcing everyone to use our derivatives (futurist or other)...
    _executor_cls_matchers = [
        _ExecutorTypeMatch((futures.ThreadPoolExecutor,),
                           executor.ParallelThreadTaskExecutor),
        _ExecutorTypeMatch((futures.ProcessPoolExecutor,),
                           process_executor.ParallelProcessTaskExecutor),
        _ExecutorTypeMatch((futures.Executor,),
                           executor.ParallelThreadTaskExecutor),
    ]




    # One of these should match when a string/text is provided for the
    # 'executor' option (a mixed case equivalent is allowed since the match
    # will be lower-cased before checking).
    _executor_str_matchers = [
        # 多进程并发模型,每个task都有自己独立的python解析器
        _ExecutorTextMatch(frozenset(['processes', 'process']),
                           process_executor.ParallelProcessTaskExecutor),
        # 多线程并发模型
        _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']),
                           executor.ParallelThreadTaskExecutor),
        # 基于协程并发模型的executor,实现单线程的并发执行
        _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads',
                                      'greenthreaded']),
                           executor.ParallelGreenThreadTaskExecutor),
    ]

    # Used when no executor is provided (either a string or object)...
    _default_executor_cls = executor.ParallelThreadTaskExecutor

WorkerBasedActionEngine

用于分布式执行环境中, 使用WorkerBaseActionEngine执行后,执行模型变成以下形式:

image.png

server: 负责接收网络请求;将Request转换为一个work并通过work生成一个endpoint。Server定义了两个路由:

pr.NOTIFY 处理任务状态变化
pr.REQUEST 处理执行任务请求

参考源码文件 taskflow.engines.worker_based.server.py

class Server(object):
    """Server implementation that waits for incoming tasks requests."""







    def __init__(self, topic, exchange, executor, endpoints,
                 url=None, transport=None, transport_options=None,
                 retry_options=None):
        # 定义两个路由
        type_handlers = {
            pr.NOTIFY: dispatcher.Handler(
                self._delayed_process(self._process_notify),
                validator=functools.partial(pr.Notify.validate,
                                            response=False)),
            pr.REQUEST: dispatcher.Handler(
                self._delayed_process(self._process_request),
                validator=pr.Request.validate),
        }
        self._executor = executor
        self._proxy = proxy.Proxy(topic, exchange,
                                  type_handlers=type_handlers,
                                  url=url, transport=transport,
                                  transport_options=transport_options,
                                  retry_options=retry_options)
        self._topic = topic
        self._endpoints = dict([(endpoint.name, endpoint)
                                for endpoint in endpoints])


endpoint:task在worker上的执行入口。通过调用Executor的接口来负责task的生成,执行和回滚:

lass Endpoint(object):
    """Represents a single task with execute/revert methods."""







    ... 省略部分代码

    def generate(self, name=None):
        # NOTE(skudriashev): Note that task is created here with the `name`
        # argument passed to its constructor. This will be a problem when
        # task's constructor requires any other arguments.
        return self._task_cls(name=name)


    def execute(self, task, **kwargs):
        event, result = self._executor.execute_task(task, **kwargs).result()
        return result




    def revert(self, task, **kwargs):
        event, result = self._executor.revert_task(task, **kwargs).result()
        return result

Executor: 真正执行任务的资源池,与另外两种模式(SeriesActionEngine和ParallelActionEngine)的Executor是一样的角色,只是它部署在了Worker进程中,默认实现是WorkerTaskExecutor。

Compile

编译是taskflow启动engine执行Flow前的一个关键步骤,它将用户定义的Flow转换为引擎内部可识别的执行图 _execute_graph,execute_graph 的底层存储直接复用了开源项目 networkx 对图结构的操作能力。编译的过程中会产生一些辅助对象(help objects)并存储到engine对象中去,help object用于辅助后面engine执行task任务,分析Flow以及自己一些内部的活动。

compile() 函数执行后,会将 graph以及第一个node封装到自己的变量 _compilation 中去

@fasteners.locked
    def compile(self):
        """Compiles the contained item into a compiled equivalent."""
        if self._compilation is None:
            self._pre_compile()
            try:
                # node : first node for current graph
                graph, node = self._compile(self._root, parent=None)
            except Exception:
                with excutils.save_and_reraise_exception():
                    # Always clear the history, to avoid retaining junk
                    # in memory that isn't needed to be in memory if
                    # compilation fails...
                    self._history.clear()
            else:
                self._post_compile(graph, node)
                if self._freeze:
                    graph.freeze()
                    node.freeze()
                self._compilation = Compilation(graph, node)
        return self._compilation

Backend 和 Storage

  1. Storage是位于engine和persistence两层之间的接口层,它定义了访问flow和task等元数据信息的接口。engine通过storage层来获取和更新任务状态。
  1. Backend可以是任何支持持久化存储的系统,例如数据库、消息队列或文件系统。它负责存储任务的状态信息,例如任务的状态、输入和输出数据、依赖关系等。当任务执行时,TaskFlow会将任务状态存储到Backend中,并在需要时从Backend中检索任务状态。

Backend和Storage在TaskFlow架构中的位置如下:

image.png

TaskFlow提供了多种Backend实现,包括SQLAlchemy、Zookeeper、Redis等,如果需要高可用性和分布式支持,可以选择Zookeeper或Redis作为Backend;如果需要支持多种数据库,可以选择SQLAlchemy作为Backend。

在run一个flow或者load一个engine的时候可以指定一个backend,也就是说在一个应用中用户可以很方面选择不同的backend来存储TaskFlow的状态。

通过taskflow源码文件 setup.cfg 可以知道目前有以下几种实现:

taskflow.persistence =
    dir = taskflow.persistence.backends.impl_dir:DirBackend
    file = taskflow.persistence.backends.impl_dir:DirBackend
    memory = taskflow.persistence.backends.impl_memory:MemoryBackend
    mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
    postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
    sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
    zookeeper = taskflow.persistence.backends.impl_zookeeper:ZkBackend

Provides

Provides是指任务所提供的输出,即任务执行后产生的结果。Provides可以是一个值、一个对象或者一个状态,它们可以被其他任务所依赖和使用。

Provides的作用是让任务之间能够协同工作,提高任务执行的效率和可靠性。通过Provides,任务可以将自己的输出传递给其他任务,从而实现任务之间的协作。provides 在任务之间传递消息的使用方式如下:

image.png

核心流程

TaskFlow的核心流程分为3步:定义Flow –> 组件初始化 –> 执行Flow。下面分别细讲这三个步骤。

定义Flow

直接使用代码定义Flow对象,下面是定义一个flow的示例:

class InitEnvTask(task.Task):
    """定义环境初始化任务"""
    def __init__(self, ip_addr):
        super(InitEnvTask, self).__init__(name="InitEnvTask")
        self.ip_addr = ip_addr




    def execute(self, *args, **kwargs):
        print("对机器 %s 做环境初始化操作。" % self.ip_addr)
        time.sleep(15)
        return True


    def revert(self, *args, **kwargs):
        print("执行失败,开始回滚...")





class InstallSoftwareTask(task.Task):
    """定义软件安装任务:"""
    def __init__(self, ip_addr, sorftware):
        super(InstallSoftwareTask, self).__init__(name="InstallSoftwareTask")
        self.ip_addr = ip_addr

    def execute(self, *args, **kwargs):
        print("开始在机器 %s 上安装软件 %s ." % (self.ip_addr, sorftware))
        time.sleep(10)
        return True


def gen_install_taskflow(ip_addr, flow_name=None):
    """"生成一个Flow:
      
      
                   |---> 安装mysql  ---|
       初始化环境--->|                  |---> 安装springboot app应用
                   |---> 安装redis  ---|
                  
    """
    init_env_job = InitEnvTask(ip_addr)
    install_mysql_job = InstallSoftwareTask(ip_addr, 'mysql')
    install_redis_job = InstallSoftwareTask(ip_addr, 'redis')
    install_springboot_app_job = InstallSoftwareTask(ip_addr, 'springboot-app')
   
    if flow_name is None:
        flow_name = 'workflow for springboot app installation'
    # 加入所有的节点
    flow = gf.Flow(flow_name) \
        .add(*[init_env_job,  install_mysql_job, install_redis_job, install_springboot_app_job])
        
    # 定义task依赖关系:
    flow.link(u=init_env_job, v=install_mysql_job)
    flow.link(u=init_env_job, v=install_redis_job)
    flow.link(u=install_mysql_job, v=install_springboot_app_job)
    flow.link(u=install_redis_job, v=install_springboot_app_job)
    
    return flow

如下图所示:

image.png

环境初始化

调用load() 函数, 根据配置信息初始化storage,persistence,engine等组件并返回engine对象。

image.png

执行Flow

整个Flow的执行过程包括几个核心步骤: compile –> prepare –> validate –> build state machine –> start

compile 负责把Flow编译为内部可执行的ExecuteGraph;

prepare 负责将为准备好storage用于存储engine,flow以及task的状态数据;

build state machine 负责定义好Engine的状态机转换规则以及注册状态机变更钩子函数;

start 负责启动Engine状态机,开始执行任务,入口是 taskflow.engines.action_engine.engine.py 的 run_iter(self, timeout=None) : 函数:

def run_iter(self, timeout=None):
    # 将用户定义的Flow转换为引擎内部使用的执行图 execute_graph
    self.compile()
    # 准备好storage用于存储engine,flow以及task的状态数据
    self.prepare()
    # 启动前做一次校验操作
    self.validate()
    ... 省略部分代码 ...
    with _start_stop(self._task_executor, self._retry_executor):
        self._change_state(states.RUNNING)
        .... 省略部分代码 ....
        try:
            closed = False
            # 构建有限状态机,是驱动整个DAG调度的入口
            machine, memory = self._runtime.builder.build(
                self._statistics, timeout=timeout,
                gather_statistics=self._gather_statistics)
            # 使用状态机的FiniteRunner组件作为调度入口
            r = runners.FiniteRunner(machine)
            # 启动engine的有限状态机,开始执行调度DAG任务
            for transition in r.run_iter(builder.START):
                last_transitions.append(transition)
                _prior_state, new_state = transition
                ... 省略部分代码 ....
        ... 省略部分代码 ...
    finally:
            if w is not None:
                w.stop()
                self._statistics['active_for'] = w.elapsed()

上面代码中,构建有限状态机是核心,源码文件 taskflow.engines.action_engine.builder 的 build(self, statistics, timeout=None, gather_statistics=True) 函数定义了engine的状态列表,状态转换规则以及状态发生变更时触发的钩子函数:

build(self, statistics, timeout=None, gather_statistics=True):
        ... 省略部分代码...
        # FiniteMachine 是有限状态机库automaton的一种实现,另一种是HierarchicalFiniteMachine
        m = machines.FiniteMachine()
        # 定义所有的状态列表
        m.add_state(GAME_OVER, **state_kwargs)
        m.add_state(UNDEFINED, **state_kwargs)
        m.add_state(st.ANALYZING, **state_kwargs)
        m.add_state(st.RESUMING, **state_kwargs)
        m.add_state(st.REVERTED, terminal=True, **state_kwargs)
        m.add_state(st.SCHEDULING, **state_kwargs)
        m.add_state(st.SUCCESS, terminal=True, **state_kwargs)
        m.add_state(st.SUSPENDED, terminal=True, **state_kwargs)
        m.add_state(st.WAITING, **state_kwargs)
        m.add_state(st.FAILURE, terminal=True, **state_kwargs)
        m.default_start_state = UNDEFINED
        
        # 定义状态转换规则以及能够接受的事件类型。
        m.add_transition(GAME_OVER, st.REVERTED, REVERTED)
        m.add_transition(GAME_OVER, st.SUCCESS, SUCCESS)
        m.add_transition(GAME_OVER, st.SUSPENDED, SUSPENDED)
        m.add_transition(GAME_OVER, st.FAILURE, FAILED)
        # START事件是整个Flow调度的入口事件
        m.add_transition(UNDEFINED, st.RESUMING, START)
        m.add_transition(st.ANALYZING, GAME_OVER, FINISH)
        m.add_transition(st.ANALYZING, st.SCHEDULING, SCHEDULE)
        m.add_transition(st.ANALYZING, st.WAITING, WAIT)
        m.add_transition(st.RESUMING, st.SCHEDULING, SCHEDULE)
        m.add_transition(st.SCHEDULING, st.WAITING, WAIT)
        m.add_transition(st.WAITING, st.ANALYZING, ANALYZE)
        
        # 注册状态发生变更时(接收到的事件)需要调用钩子函数
        m.add_reaction(GAME_OVER, FINISH, game_over)
        m.add_reaction(st.ANALYZING, ANALYZE, analyze)
        m.add_reaction(st.RESUMING, START, resume)
        # 钩子函数schedule是启动整个Flow调度的入口,选择第一批次task来执行,并驱动整个Flow向后执行
        m.add_reaction(st.SCHEDULING, SCHEDULE, schedule)
        m.add_reaction(st.WAITING, WAIT, wait)

        m.freeze()
        return (m, memory)

整个DAG的驱动通过automaton的有限状态机来实现,而启动则从函数 run_iter(self, timeout=None) 开始,最终整个engine的状态转移过程可总结为下图:

Action engine state transitions

参考:docs.openstack.org/taskflow/la…

整个执行过程各种组价之间的调用关系,官方提供的图解如下:

image.png

状态机 automaton

本节内容从 telnetning-notebook.readthedocs.io/zh/latest/p… 处引用,由于taskflow的DAG的执行是依赖automaton来驱动的,因此automaton是理解taskflow内部执行调度的基础,下面内容直接从 链接地址拷贝而来。

automaton 是一个状态转换机库。OpenStack 的 taksflow 库就是基于 automaton 实现的。

在状态转换机中比较常见的是有限状态机,理解起来就是定义一个系统的几个状态,给出各个状态之间装换的条件,然后状态加触发条件就决定了系统的下一个状态。我们还可以在状态转换的各个阶段插入自己想要实现的逻辑。

因此,一个有限状态机中可以提取出几个重要的元素:

  • 状态列表,系统可能存在的状态
  • 转换条件,状态转换的条件
  • 钩子函数,状态转换前后调用函数

一个最简单的有限状态机定义如下:

from automaton import machines



m = machines.FiniteMachine()  #FiniteMachine 有限状态机
m.add_state('up')
m.add_state('down')
m.add_transition('down', 'up', 'jump')
m.add_transition('up', 'down', 'fall')
m.defualt_start_state = 'down'
print(m.pformat())



# output:
# +-------+-------+------+----------+---------+
# | Start | Event | End  | On Enter | On Exit |
# +-------+-------+------+----------+---------+
# |  down |  jump |  up  |    .     |    .    |
# |   up  |  fall | down |    .     |    .    |
# +-------+-------+------+----------+---------+

当要使用状态机时,需要先初始化状态机,然后可以调用 process_event 来触发转换条件:

from automaton import machines



m = machines.FiniteMachine()

m.add_state('up')
m.add_state('down')
m.add_transition('down', 'up', 'jump')
m.add_transition('up', 'down', 'fall')
m.default_start_state = 'down'



m.initialize()  # 初始化状态机
m.process_event('jump')
print(m.pformat())
        # +---------+-------+------+----------+---------+
        # |  Start  | Event | End  | On Enter | On Exit |
        # +---------+-------+------+----------+---------+
        # | down[^] |  jump |  up  |    .     |    .    |
        # |   @up   |  fall | down |    .     |    .    |
        # +---------+-------+------+----------+---------+
print(m.current_state)  # up
print(m.terminated)     # False
m.process_event('fall')
print(m.pformat())
        # +----------+-------+------+----------+---------+
        # |  Start   | Event | End  | On Enter | On Exit |
        # +----------+-------+------+----------+---------+
        # | @down[^] |  jump |  up  |    .     |    .    |
        # |    up    |  fall | down |    .     |    .    |
        # +----------+-------+------+----------+---------+
print(m.current_state) # down
print(m.terminated)    # False

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYRcoa6f' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片