# 操作符 欢迎回来!在[数据流](./dataflow)中,我们学习了 `dora` 应用程序的整体蓝图以及它如何连接各个部分。在[节点](./node)中,我们深入探讨了这些节点部分,并了解到每个节点通常作为一个独立的进程运行,执行特定的任务,并通过数据流 `YAML` 中定义的输入和输出进行通信。 现在,让我们探索一种在 `Node` 内部添加更多结构和可重用性的方法: **`Operator 操作符`** 的概念。 ## 节点内部更小、可重复使用的步骤 假设你有一个负责处理图像的节点。该节点可能需要执行几个不同的步骤: 1. 将图像转换为灰度。 2. 将图像调整为特定尺寸。 3. 对图像应用过滤器。 4. 在过滤后的图像中寻找边缘。 你可以在一个节点的单一代码实现中编写所有这些逻辑。但是,如果你之后想在另一个节点或不同的数据流中重用“调整图像大小”或“应用滤镜”逻辑,你就必须复制粘贴代码,这并不理想。 这时, **`Operator`** 就派上用场了。`Operator` 是一种较小的模块化逻辑,设计用于在一种名为 `Dora Runtime Node` 的特殊节点内运行。`Operator` 允许你将单个节点的工作分解为可重用的独立步骤。 可以将节点想象成工厂车间的工作站(一栋独立的建筑),而操作符则是该工作站内的专用工具或机器。工作站(节点)是独立的单元,但其内部的工作由其工具集(操作符)完成。 ## 操作符与自定义节点 让我们快速比较一下: | 特征 | 自定义节点 | Dora 运行时节点和操作符 | | :----------- | :------------------------------------------ | :------------------------------------------------------------- | | **运行方式** | 独立节点 | 单进程 | | **包含** | 一段自定义的代码 | 一个或多个操作符 | | **逻辑单元** | 整个节点代码 | 单个操作符 | | **通讯** | 使用 `dora Node API` 直接与 `dora` 系统交互 | 节点运行时使用 `dora Node API`; 操作符使用 `dora Operator API` | | **可重用性** | 重用整个 `Node` 进程 | 重复使用单个操作符逻辑模块 | 因此,虽然自定义节点是其自己独立的程序( `.py` 、 `.rs` 可执行文件等),但 `Dora 运行时` 节点是一个运行 `dora-runtime` 程序的进程,然后加载和管理数据流 `YAML` 中指定的一个或多个操作员的代码。 ## 在数据流 `YAML` 中定义操作符 运算符是在 `Dataflow YAML` 中节点的定义内定义的。您无需为节点的实现指定单个 `path` 或 `custom` 块,而是使用 `operators` 符块。 让我们想象一下之前的图像处理节点,现在使用操作符来实现。 ```py title="dataflow.yml" nodes: # ... other nodes like camera ... # Define our image processing Node that uses Operators - id: image-processor-node # This is the ID of the Node process inputs: image: camera/image # This Node takes image data from the camera Node outputs: - processed_image # This Node outputs the final processed image # This node is a Runtime Node and will load operators operators: # First Operator: Convert to Grayscale - id: grayscale-converter # ID of the operator within THIS node source: python # How to load this operator's code path: operators/grayscale_op.py # The code file for this operator inputs: input_image: image-processor-node/image # Takes input from the NODE's image input outputs: - gray_image # Outputs grayscale image # Second Operator: Resize - id: resizer # ID of the second operator source: python path: operators/resize_op.py inputs: input_image: grayscale-converter/gray_image # Takes input from the 'gray_image' output of 'grayscale-converter' OPERATOR outputs: - resized_image # Outputs resized image # Third Operator: Apply Filter - id: filter source: python path: operators/filter_op.py inputs: input_image: resizer/resized_image # Takes input from the 'resized_image' output of 'resizer' OPERATOR outputs: - filtered_image # Outputs filtered image # Fourth Operator: Edge Detector - id: edge-detector source: python path: operators/edge_op.py inputs: input_image: filter/filtered_image # Takes input from the 'filtered_image' output of 'filter' OPERATOR outputs: - processed_image # Outputs the final processed image # Notice: the 'processed_image' output of the 'edge-detector' operator # matches the 'processed_image' output of the 'image-processor-node'. # The runtime routes this automatically. ``` 需要注意的关键事项: - `nodes` 下的 `id` ( `image-processor-node` ) 是 `Node` 进程的 `ID`,和之前一样。 - 这里有一个 `operators` 列表,而不是 `path` 或 `custom` 。 - `operators` 列表中的每个项目定义了在此节点内运行的操作员 。 - 每个 `Operator` 都有自己的 `id` ( 在此节点内唯一)、 `source` (例如 `python` 、 `shared-library` )和指向其代码的 `path` 。 - 操作员也有 `inputs` 和 `outputs` 。 - 连接语法仍然是 `source_id/stream_id` 。 - 节点自己的 `ID`( `image-processor-node/image` ):从外部数据流进入节点的数据被路由到操作员的输入。 - 同一节点内另一个 `Operator` 的 `ID`( `grayscale-converter/gray_image` ):数据在同一节点进程内的 `Operator` 之间直接流动。 - 如果 `Operator` 的输出 `id` 与 `Node` 的某个 `outputs` 匹配,则该输出将自动成为该 `Node` 到外部 `Dataflow` 的输出( `edge-detector/processed_image` 变为 `image-processor-node/processed_image` )。 这允许您在单个 `Node` 进程中定义一个迷你数据流。 ## 操作员如何工作(操作员 API) 就像 `Node` 的代码使用 `dora Node API` 一样,`Operator` 的代码也使用 `dora Operator API` 。这个 `API` 更简单,因为 `Operator` 不需要担心网络连接或进程管理之类的事情;`Runtime Node` 会处理这些事情。 操作员的主要工作是对事件做出反应。最常见的事件是接收输入数据。 下面是我们的一个示例运算符(`grayscale-converter` 的 `Python` 代码的简化概念图,代码片段非常简短): ```py title="operators/grayscale_op.py" # operators/grayscale_op.py from dora import DoraStatus class Operator: """Converts incoming image to grayscale.""" def on_event( self, dora_event, # The event received by the operator send_output, # Function to send outputs ) -> DoraStatus: if dora_event["type"] == "INPUT": # Check if the input is the one we expect if dora_event["id"] == "input_image": image_data = dora_event["value"] # Get the incoming image data print(f"Received image on input_image.") # --- Operator's specific logic --- # (Actual image processing code goes here) grayscale_image_data = convert_to_grayscale(image_data) # --------------------------------- # Send the result to the operator's output send_output("gray_image", grayscale_image_data, dora_event["metadata"]) return DoraStatus.CONTINUE # Keep running ``` 在此 `Python` 代码片段中: - `Operator` 代码的结构为 `Python` 类(或 `Rust` 中的函数)。 - 它有一个 `on_event` 方法(或函数),每当发生与该操作符有关的事件(例如接收输入或停止信号)时, `dora-runtime` 就会调用该方法。 - `dora_event` 字典包含有关事件的信息(其 `type` 、 `id` 、 `value` 等。我们将在下一章介绍**事件流** )。 - `send_output` 函数(或方法)由运行时提供,供 `Operator` 在其定义的输出上发送数据。它接受输出 `id` 、 `data` 以及可选的 `metadata` 。 这个模式( `on_event -> process -> send_output` )是 `Operator` 逻辑的核心。 ## 底层:运行时节点和操作符 当您使用 `dora run` 和定义了带有操作符的运行时节点的 `Dataflow YAML` 时,下面是发生情况的简化视图: 1. `dora CLI` 和 `Dora Daemon / Dora Coordinator` 读取数据流 `YAML`。 2. 他们将 `image-processor-node` 标识为一个节点。 3. 他们看到它有一个 `operators` 块,表明它是一个运行时节点。 4. 它们为该节点启动一个单独的进程 。该进程运行 `dora-runtime` 可执行文件。 5. `dora-runtime` 读取 `image-processor-node` 块中的 Operator 定义。 6. 它根据 `source` 和 `path` 加载 `grayscale-converter` 、 `resizer` 、 `filter` 和 `edge-detector` 的代码。 7. 它建立了沟通渠道: - 从外部数据流的 `camera/image` 到 `image-processor-node` 的 image 输入。 - 内部,从运行时节点的 `image` 输入到 `grayscale-converter` 操作器的 input_image 输入。 - 在内部,从 `grayscale-converter/gray_image` 输出到 `resizer/input_image` 输入。 - 内部,从 `resizer/resized_image` 输出到 `filter/input_image` 输入。 - 内部,从 `filter/filtered_image` 输出到 `edge-detector/input_image` 输入。 - 从 `edge-detector/processed_image` 输出到外部 `Dataflow` 的 `image-processor-node/processed_image` 输出。 8. 然后, `dora-runtime` 进入事件循环,从外部 `Dataflow` 接收事件(例如新图片),并根据 YAML 中定义的内部连接,将它们分发给相关的 `Operator`。当一个 `Operator` 生成输出时,运行时会将其内部路由到另一个 `Operator`,或者将其作为 `Node` 输出发送出去。 **这是一个简化的序列图:** ![序列图](/operator01.png) 这显示了数据如何流入单个运行时节点进程,通过运行时在操作员之间内部传递,然后作为节点输出流出。 ## 总结 在本章中,我们了解到 `Operator` 是运行在 `Dora Runtime` 节点进程中的模块化逻辑。它们在 `Dataflow YAML` 的 `Node` 块中定义,允许您在单个节点内构建复杂的处理流水线。`Operator` 使用 `Operator API` 接收输入并发送输出, `dora-runtime` 负责加载它们、在它们之间建立内部通信,以及在节点的外部输入/输出和 `Operator` 的输入/输出之间路由数据。这提供了一种创建可复用处理步骤的方法。 现在我们了解了节点和操作符,让我们看看数据在系统中移动的基本概念: **`Event Stream 事件流`** 。 内容编写 : **`李扬`**