龙空技术网

你不再需要两个独立的系统来进行批处理和流处理

闻数起舞 1102

前言:

现时各位老铁们对“工作流python”大致比较关切,大家都需要分析一些“工作流python”的相关内容。那么小编也在网摘上汇集了一些对于“工作流python””的相关内容,希望看官们能喜欢,同学们快快来了解一下吧!

一个更灵活的工作流协调系统能否处理这两种模式?

安娜-盖勒 7分钟 阅读

许多人认为,流处理技术是实现实时分析的唯一途径。由于现在的大多数数据工作负载是通过批处理平台协调的,实时性要求迫使数据团队采用一套新的工具。然而,为批处理和实时流媒体维护两个独立的系统会带来额外的负担和成本。

这篇文章提供了一种替代方法,允许你从一个协调平台上同时处理批处理和实时流管道。

了解问题

过去,数据行业正试图通过引入新的架构来解决这个问题,将流媒体层与批处理和服务层分开(想想:Lambda和Kappa架构)。但是,这些批处理和流处理范式在本质上是不同的吗?在实践中,两者都收集和处理数据供下游应用使用。批处理和流处理之间唯一真正的区别是,它们的操作时间间隔略有不同。长期以来,批处理调度器和调度器的不灵活性一直迫使数据团队维护一套单独的系统来满足现代事件驱动的实时数据工作负载。但它不一定是这样的。

在Prefect,我们致力于始终了解我们的用户所面临的问题。 Orion是第一个工作流协调引擎,它允许你在任何时间以任何理由运行任何代码。它灵活的API使您能够启动新的流程运行,以响应来自流媒体系统或消息队列的事件,以及定期运行您的工作流程或作为长期运行的过程。你可以从一个单一的协调平台管理各种数据访问模式。

一个基本的流处理数据管道

Orion API将协调引擎与被协调的代码分开。你不需要把整个工作流代码重写成一个有向无环图(DAG)。事实上,你根本不需要一个DAG。你可以自由地协调无限长的流程运行,有多少循环和条件语句都是你需要的。

为了说明Orion的这一能力,让我们用Python实现一个实时流管道的例子。

作为一个流式数据源的例子,这里有一些API的代码,可以根据请求生成数据。这个API模仿了通过电子商务系统创建的实时订单。

from datetime import datetimefrom fastapi import FastAPIimport randomimport uuidapp = FastAPI(title="Sample Real-Time Orders")@app.get("/orders")def get_data(min_value: int = 1, max_value: int = 20):    sample_order = dict(        timestamp=datetime.utcnow(),        order_id=str(uuid.uuid4()),        amount=random.randint(min_value, max_value),        price=round(random.uniform(5, 200), 2),        channel=random.choice(["shop", "reseller", "invalid_value_to_make_flow_fail"]),    )    return random.choice([sample_order, {}])

通过运行以下程序启动API uvicorn服务器。

uvicorn main:app

下面是一个使用该数据的最小工作流程例子。

# 01_no_flow_yet.pyimport requestsimport timedef get_real_time_data():    data = requests.get(";).json()    print(data)def main():    get_real_time_data()if __name__ == "__main__":    while True:        main()        time.sleep(5)

如果你启动API并运行该脚本,你应该看到随机生成的流数据出现在你的标准输出中。

{    "amount": 10,    "channel": "shop",    "order_id": "52e1d692-0dd1-4e62-947d-fb419ccf9d93",    "price": 119.59,    "timestamp": "2021-11-22T21:13:20.273579"}

该流程可能包含一个长期运行的消费者进程,该进程从分布式发布-订阅消息系统中持续接收新的事件,而不是从REST API中请求实时数据。

用Orion改进管道

这个简单的数据管道运作良好,但如果我们。

想知道它何时失败并得到相关通知吗? 需要观察到该管道的哪些部分发生了故障? 需要在失败时重试失败的组件吗? 希望在某个特定的任务成功完成后,运行一些下游的程序? 想阅读管道的日志以确保系统按预期工作?

这就是Orion可以帮助的地方。修改之前的脚本以使其适用于Orion,就像导入Prefect并在主函数中添加一个装饰器@flow一样简单。

# 02_initial_flow.pyimport requestsimport time# Import the Prefect flow module.from prefect import flowdef get_real_time_data():    data = requests.get(";).json()    print(data)# Decorate the main function with @flow.@flow(name="Streaming Pipeline")def main():    get_real_time_data()if __name__ == "__main__":    while True:        main()        time.sleep(5)

要运行上述流程,你需要 install Orion.

只增加了两行代码,但注意到脚本运行时输出的不同。

21:14:13.046 | Beginning flow run 'stirring-caiman' for flow 'Streaming Pipeline'...21:14:13.046 | Starting executor `SequentialExecutor`...{'timestamp': '2021-11-22T20:14:13.074502', 'order_id': '0161abd1-e188-4152-8a21-3c897c117c99',  'amount': 10, 'price': 95.28, 'channel': 'reseller'}21:14:13.076 | Shutting down executor `SequentialExecutor`...21:14:13.107 | Flow run 'stirring-caiman' finished in state Completed(message=None, type=COMPLETED)

通过简单地添加@flow装饰器,Orion自动注册了该流程,并将其第一个流程运行到后端。它还为其分配了一个执行器。

让我们看一下用户界面。

探索Orion组件

当你启动Orion服务器时,它将启动一个用户界面,你可以在其中探索你的流程、部署,以及流程运行和任务运行。

prefect orion start

# INFO: Uvicorn running on <; (Press CTRL+C to quit)

Orion UI

流处理流程及其相应的第一次运行已经自动在后台注册,你可以检查所有的流媒体运行和它们的状态。

更进一步:增加任务

将整个逻辑封装在一个流程中是一个简单的第一步。但要想对流程内发生的事情有更多的了解,你可以添加一些 tasks.

当我们的工作流程的某些部分成功时,任务是有益的。 例如,如果API没有生成任何订单(例如,它返回了一个空字典),你可以跳过将订单加载到分析数据存储的步骤。

要将这些函数标记为任务,你需要做的就是在每个函数中添加@任务装饰器。

# 03_flow_with_tasks.pyimport requestsimport timefrom prefect import flow, task# The @task decorator is all you need to turn a function into a task.@taskdef get_real_time_data():    data = requests.get(";).json()    print(data)    return data@taskdef load_to_dwh(order):    print(f"Loading a new order to DWH: {order}")    # implement your load logic here@flow(name="Streaming Pipeline with Tasks")def main():    orders = get_real_time_data()    load_to_dwh(orders)if __name__ == "__main__":    while True:        main()        time.sleep(5)

现在输出显示这两项任务都成功完成了。

21:17:35.186 | Beginning flow run 'caped-kagu' for flow 'Streaming Pipeline with Tasks'...21:17:35.186 | Starting executor `SequentialExecutor`...21:17:35.276 | Submitting task run 'get_real_time_data-d88483db-0' to executor...{'timestamp': '2021-11-22T20:17:35.328103', 'order_id': 'd514836e-978d-447a-b44f-138535071b87', 'amount': 1, 'price': 189.21, 'channel': 'shop'}21:17:35.425 | Task run 'get_real_time_data-d88483db-0' finished in state Completed(message=None, type=COMPLETED)21:17:35.470 | Submitting task run 'load_to_dwh-70978bd5-0' to executor...Loading a new order to DWH: {'timestamp': '2021-11-22T20:17:35.328103', 'order_id': 'd514836e-978d-447a-b44f-138535071b87', 'amount': 1, 'price': 189.21, 'channel': 'shop'}21:17:35.557 | Task run 'load_to_dwh-70978bd5-0' finished in state Completed(message=None, type=COMPLETED)21:17:35.558 | Shutting down executor `SequentialExecutor`...21:17:35.593 | Flow run 'caped-kagu' finished in state Completed(message='All
在流程中添加条件逻辑

如前所述,如果API返回一个空的字典,流程应该跳过加载步骤。为了在流程中的任务之间启用这种条件逻辑,你需要添加一个条件,检查第一个任务的输出,只有当返回的数据不是空字典时才执行第二个任务。

有了Orion,你所需要的只是本地Python的if-else语句。

有一个注意事项:由于Orion支持并行和异步执行,订单对象是一个PrefectFuture,它代表了在执行器中发生的任务计算的状态和结果。

为了获得一项任务的结果。

通过使用orders_state = orders.wait() 来等待,直到任务执行完毕。 通过使用orders_state.result() 获得任务的结果。

下面是一个完整的例子。

# 04_flow_with_tasks_and_conditional_logic.pyimport requestsimport timefrom prefect import flow, task@taskdef get_real_time_data():    data = requests.get(";).json()    print(data)    return data@taskdef load_to_dwh(order):    print(f"Loading a new order to DWH: {order}")@flow(name="Streaming Pipeline with Tasks and Conditional Logic")def main():    orders = get_real_time_data()    # Wait for the get_real_time_data() task to complete.    orders_state = orders.wait()    # Load the result if it's not empty.    if orders_state.result():        load_to_dwh(orders)    else:        print("Skipping the load task...")if __name__ == "__main__":    while True:        main()        time.sleep(5)

有订单时的输出。

21:24:36.639 | Beginning flow run 'tested-barracuda' for flow 'Streaming Pipeline with Tasks'...21:24:36.639 | Starting executor `SequentialExecutor`...21:24:36.707 | Submitting task run 'get_real_time_data-d88483db-0' to executor...{'timestamp': '2021-11-22T20:24:36.759188', 'order_id': '045eef59-894f-4292-bdb1-54b0225ec105', 'amount': 4, 'price': 102.04, 'channel': 'reseller'}21:24:36.852 | Task run 'get_real_time_data-d88483db-0' finished in state Completed(message=None, type=COMPLETED)21:24:36.891 | Submitting task run 'load_to_dwh-70978bd5-0' to executor...Loading a new order to DWH: {'timestamp': '2021-11-22T20:24:36.759188', 'order_id': '045eef59-894f-4292-bdb1-54b0225ec105', 'amount': 4, 'price': 102.04, 'channel': 'reseller'}21:24:36.970 | Task run 'load_to_dwh-70978bd5-0' finished in state Completed(message=None, type=COMPLETED)21:24:36.971 | Shutting down executor `SequentialExecutor`...21:24:37.000 | Flow run 'tested-barracuda' finished in state Completed(message='All states completed.', type=COMPLETED)

实时API没有返回数据时的输出。

22:09:21.016 | Beginning flow run 'lilac-turkey' for flow 'Streaming Pipeline with Tasks'...22:09:21.016 | Starting executor `SequentialExecutor`...22:09:21.096 | Submitting task run 'get_real_time_data-d88483db-3' to executor...{}22:09:21.192 | Task run 'get_real_time_data-d88483db-3' finished in state Completed(message=None, type=COMPLETED)Skipping the load task...22:09:21.193 | Shutting down executor `SequentialExecutor`...22:09:21.218 | Flow run 'lilac-turkey' finished in state Completed(message='All states completed.', type=COMPLETED)
如果出了问题会怎样?重试!

当你的代码进行API调用和执行数据库操作时,有很多东西都可能出错。

API可能暂时无法访问或遇到其他服务器端的问题。 在进行API调用时,可能存在客户端的网络问题。 数据库连接可能被破坏。

这样的操作问题往往可以通过重试一个任务来解决。任务 natively support retries因此,你不必编写你自己的重试逻辑。

在工作流程中添加自动重试,例如,在@task装饰器上,最多可重试5次,重试之间有10秒的延迟。

# 05_flow_with_tasks_and_conditional_logic_and_retries.pyimport requestsimport timefrom prefect import flow, task@task(retries=5, retry_delay_seconds=10)def get_real_time_data():    data = requests.get(";).json()    print(data)    return data@task(retries=5, retry_delay_seconds=10)def load_to_dwh(order):    print(f"Loading a new order to DWH: {order}")@flow(name="Streaming Pipeline with Tasks, Conditional Logic and Retries")def main():    orders = get_real_time_data()    orders_state = orders.wait()    if orders_state.result():        load_to_dwh(orders)    else:        print("Skipping the load task...")if __name__ == "__main__":    while True:        main()        time.sleep(5)

在Orion的实时数据管道中添加重试的简单性掩盖了它所解决的许多问题。在分布式消息系统中实现类似的功能,可能需要配置额外的组件,如死信队列,并在一个单独的过程中管理其状态。

如果你需要对脚本进行参数化处理怎么办?

由于Orion允许你运行任何Python代码,给Orion flow设置参数的方式与给任何Python脚本传递参数的方式相同。此外,如果你在用@flow装饰的函数上为你的参数提供类型注解,Orion会用 Pydantic来验证参数值。

在这个例子中,指定max_value: int = 20作为流的函数的参数,定义了输入应该是一个整数。因此,代码不再需要明确地转换输入参数。

# 06_flow_with_tasks_conditional_logic_retries_parameters.pyimport requestsimport timefrom prefect import flow, taskimport sys@task(retries=5, retry_delay_seconds=10)def get_real_time_data(max_value: int = 20):    data = requests.get(        ";, params=dict(max_value=max_value)    ).json()    print(data)    return data@task(retries=5, retry_delay_seconds=10)def load_to_dwh(order):    print(f"Loading a new order to DWH: {order}")@flow(name="Streaming Pipeline with Tasks, Conditional Logic, Retries and Parameters")def main(max_value: int = 20):    orders = get_real_time_data(max_value)    orders_state = orders.wait()    if orders_state.result():        load_to_dwh(orders)    else:        print("Skipping the load task...")if __name__ == "__main__":    max_order_amount = 20  # default value    if len(sys.argv) > 1:        max_order_amount = sys.argv[1]    while True:        main(max_order_amount)        time.sleep(5)

例如,你现在可以在运行这个流程时将max_value设置为一个更大的数字,如400。

python 06_flow_with_tasks_conditional_logic_retries_parameters.py 400

输出

21:32:37.609 | Beginning flow run 'fair-mustang' for flow 'Streaming pipeline with tasks, conditional logic, retries & parameters'...21:32:37.609 | Starting executor `SequentialExecutor`...21:32:37.677 | Submitting task run 'get_real_time_data-d88483db-3' to executor...{'timestamp': '2021-11-22T20:32:37.717727', 'order_id': 'c5707def-d577-4739-892d-bd0ed61acee5', 'amount': 297, 'price': 83.71, 'channel': 'invalid_value_to_make_flow_fail'}21:32:37.756 | Task run 'get_real_time_data-d88483db-3' finished in state Completed(message=None, type=COMPLETED)21:32:37.796 | Submitting task run 'load_to_dwh-70978bd5-0' to executor...Loading a new order to DWH: {'timestamp': '2021-11-22T20:32:37.717727', 'order_id': 'c5707def-d577-4739-892d-bd0ed61acee5', 'amount': 297, 'price': 83.71, 'channel': 'invalid_value_to_make_flow_fail'}21:32:37.868 | Task run 'load_to_dwh-70978bd5-0' finished in state Completed(message=None, type=COMPLETED)21:32:37.869 | Shutting down executor `SequentialExecutor`...21:32:37.898 | Flow run 'fair-mustang' finished in state Completed(message='All states completed.', type=COMPLETED)

统一的协调器对实时和批量处理的好处

有了像Orion这样的统一协调器。

你获得了对整个数据平台健康状况的可见性--你不再需要猜测在哪里可以找到关于你的数据管道的关键信息。一切都可以在一个灵活的系统中得到。 你可以通过将现有的函数装饰成任务和流程,并将你的工作流逻辑(包括异步、重试、条件分支和参数化)定义为纯Python代码来建立你的工作流。 你可以快速确定特定工作流程的依赖性是什么。 如果任何关键进程失败,你可以得到通知。 你可以迅速了解出错的原因,并在必要时部署修复措施。 你可以建立数据管道和应用程序,同时需要实时事件流和从运营系统中批量提取。 你有信心知道,如果某些实时或批量数据没有及时到达 ,哪些确切的下游工作流程会受到影响。

标签: #工作流python