# 事件流处理 欢迎回到 `dora` 教程!在[节点](./node)中,我们了解到节点是数据流中独立的工作单元。在[操作符](./operator)中,我们了解了操作符如何在一种特殊的节点内部构建工作。 `节点`和`操作符`都需要一种方法来感知需要关注的事件发生——例如新数据到达或停止命令。它们是如何接收这些重要通知的? 这就是 **`Event Stream` 事件流** 发挥作用的地方。 ## 节点(或操作符)的收件箱 想象一下,一个节点或操作符就像一个坐在办公桌前准备处理任务的微型办公室职员。他们不会随意开始工作;他们需要被告知新任务何时到来以及任务是什么 。 `事件流`就像是节点或操作符的收件箱或通知源 。它是一个连续的消息流,告知工蜂完成工作所需的一切信息。 无论您的代码是作为独立节点进程运行还是作为运行时节点内的操作符运行,它都会通过其专用`事件流`接收信息。 ## 事件流中包含哪些类型的消息? 事件流不仅仅用于传入数据。它包含不同类型的消息, `dora` 称之为 **“事件”** 。这些事件会告诉您的节点或操作符数据流和系统中正在发生的事情。 以下是您通常会在信息流中发现的主要事件类型: - **INPUT:** 这是最常见的事件。它表示新数据已到达节点(或算子)定义的某个输入。该事件将包含数据本身,并告知您数据来自哪个输入。这就是节点和算子接收需要处理的数据的方式。 - **InputClosed:** 此事件表示节点(或算子)的某个输入源已关闭或断开连接。如果正在为此输入生成数据的节点/算子完成其工作或崩溃,则可能会发生这种情况。您的代码可能需要对此做出反应,例如在关键输入丢失时优雅地关闭。 - **STOP:** 此事件是来自 `dora 运行时`的命令,告知 `Node` 或 `Operator` 关闭。当您使用 `dora CLI` 停止数据流,或者数据流自然结束时(例如,所有输入都关闭,没有其他工作要做)时,就会发生这种情况。您的代码应该监听此事件并干净地退出。 - **RELOAD:** 此事件表示节点或操作符应重新加载其配置或逻辑(如果支持热重载)。这是一项高级功能,允许在不完全重启的情况下更改数据流。 - **ERROR:** 表示事件流本身发生了不可恢复的错误。 ## 节点和操作符如何监听流 您的节点或操作符代码使用您选择的语言的 `dora API 绑定`与事件流进行交互(我们将在第 6 章:API 绑定中介绍 API)。 典型的模式是进入一个循环,反复向事件流询问“下一个”事件。然后,代码检查事件的类型及其详细信息,以决定采取什么操作。 下面是一个简化的 `Python` 示例,展示了节点如何监听其事件流: ```py from dora import Node # Initialize the node - connects to the dora runtime node = Node() print("Node initialized. Waiting for events...") # Enter the loop to process events from the stream for event in node: event_type = event["type"] event_id = event["id"] # Often the input ID print(f"Received event: Type={event_type}, ID={event_id}") if event_type == "INPUT": # Handle incoming data print(f" -> Data received on input '{event_id}'") data = event["value"] # Now you would process the 'data'... # Example: If it's from an input named 'camera_image' if event_id == "camera_image": print(" -> Processing camera image...") # process_image(data) # node.send_output(...) # Send results if needed pass # Placeholder for actual processing elif event_type == "STOP": # Handle stop command print(" -> Received STOP command. Exiting loop.") break # Exit the main loop elif event_type == "InputClosed": # Handle input closing print(f" -> Input '{event_id}' closed.") # Decide if the node should continue or stop elif event_type == "RELOAD": # Handle reload command print(" -> Received RELOAD command.") # Reload configuration if implemented elif event_type == "ERROR": # Handle a stream error print(f" -> Received ERROR: {event['error']}") # Log or react to the error print("Node stopping gracefully.") ``` 这个简单的循环是许多 `dora Node` 和 `Operator` 实现的核心。`dora API`( `for event in node`: 处理了等待流中下一个事件的复杂任务,并将其呈现为一个简单的类似字典的对象( `event` ) `dora` 以便您的代码可以轻松理解和响应。 `Operator` 响应事件的代码结构非常相似,但它使用 `Operator API` 的 `on_event` 函数,而不是直接循环遍历 `Node` 对象(如`操作符` 中所示)。原理相同:接收事件,检查其类型和 ID,并做出相应的响应。 ## 底层:`Stream` 的工作原理 当 `dora 运行时`( `Dora Daemon` 或 `Dora Coordinator` )基于您的 `Dataflow YAML` 启动 `Node` 进程时,它还会设置形成该特定 `Node` 的事件流的通信通道。 回想一下,数据流就像管道图一样。当你在 `YAML` 中将 `camera/image` 连接到 `object-detection/image` 时,运行时并不会神奇地传输数据。它会在 `camera` 节点的输出机制和 `object-detection` 节点的事件流输入机制之间建立专用通道。 当 `camera` 头节点生成 `image` 输出时, `dora 运行时`会获取该数据,并将 `INPUT` 事件放置到事件流中 ,专门用于 `object-detection` 节点 。 `object-detection` 节点的代码在其 `for event in node` 循环中等待,然后接收此 `INPUT` 事件,从中获取图像数据,并开始处理。 `STOP` 和 `RELOAD` 命令的工作原理类似。当您从 `CLI` 停止数据流时,运行时会向所有正在运行的节点和算子的事件流发送 `STOP` 事件。 以下是此交互的简化视图: ![event-stream](/event-stream01.png) 该图显示,`事件流`充当中介,由 `dora 运行时`管理,将各种类型的事件传递给正在主动监听它们的 `Node` 或 `Operator` 进程。`dora` 优化该流的内部机制(通常使用共享内存来存储数据,我们将会看到),以提高性能。 ## 总结 `事件流` 是 `dora` 中的一个基本概念。它是节点和操作符接收其运行所需的所有信息的重要通信渠道,包括传入数据 ( `INPUT` )、输入源关闭信号 ( `InputClosed` ) 以及系统命令( `STOP` 、 `RELOAD` )。通过监听此事件流,您的节点和操作符代码可以动态地响应数据流的状态。`dora` 运行管理这些事件流,并根据数据流蓝图将事件传递给相应的节点和操作符。 现在您已经了解了节点和操作符如何接收通知,让我们仔细看看 `INPUT` 事件中经常包含的 **`DataMessage/Arrow Data 数据`** 本身 。