You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

api-binding.mdx 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # 第六章:API 绑定
  2. **Author** : **`Leon 李扬`**
  3. 欢迎回到 `dora` 教程!到目前为止,我们已经学习了数据流 ( 第 1 章 )作为应用程序蓝图, 节点 ( 第 2 章 )和操作符 ( 第 3 章 )作为构建块, 事件流 ( 第 4 章 )作为其通信通道,以及数据消息/箭头数据 ( 第 5 章 )作为高效数据传输的格式。
  4. 现在,我们来谈谈如何实际编写这些节点和操作符的代码 ,让它们完成各自的工作。你用 `Python` 或 `Rust` 编写的自定义逻辑如何连接到 `dora 运行时`来接收输入并发送输出?这时, **`API 绑定`** 就派上用场了。
  5. ## 代码的翻译器和工具包
  6. 假设你用 `Python` 设计了一个很棒的物体检测算法。你想把它包装在一个 `dora` 节点中,这样它就可以从摄像头节点接收图像,并将检测结果发送到绘图节点,所有这些都按照你的 `Dataflow YAML` 文件中的定义进行。
  7. 您的 `Python` 代码需要特定的工具来:
  8. 1. 告诉 `dora` 运行时它已准备好启动。
  9. 2. 监听来自事件流的传入 `INPUT` 事件( 第 4 章 )。
  10. 3. 从事件中提取实际图像数据消息 ( 第 5 章 )。
  11. 4. 将计算出的边界框数据作为新的数据消息 ( 第 5 章 )发送回运行时,并指定它应该转到哪个输出。
  12. 5. 如果 `dora` 运行时发送了 `STOP` 信号,则知道何时停止( 第 4 章 )。
  13. 这些工具以库或模块的形式提供给不同的编程语言,它们被称为 `API 绑定` 。它们是连接自定义应用程序逻辑和 `dora 运行时`环境的软件层。
  14. 将 `API 绑定`视为:
  15. - **翻译器:** 它们将代码中的高级指令(如“发送此数据”)转换为 `dora` 运行时可以理解的低级消息和协议。
  16. - **工具包:** 它们提供现成的功能和结构( `send_output` 、 `next` 、 `on_event` 等),简化与 `dora` 核心机制(如事件流和数据消息处理)的交互。
  17. `dora` 为多种语言提供了 `API 绑定`,包括 `Rust`、`Python`、`C` 和 `C++`。因此,您可以在同一个数据流中混合搭配使用不同语言编写的节点,只要它们使用各自语言的 `dora API 绑定`进行通信即可。
  18. ## `API 绑定`提供的核心功能
  19. 无论使用哪种语言, `dora API 绑定`通常都会提供用于以下基本任务的函数或方法:
  20. 1. **初始化:** 用于将 `Node/Operator` 代码连接到正在运行的 `dora` 数据流实例的函数。此步骤对于运行时识别组件并设置其通信通道至关重要。
  21. 2. **接收事件:** 监听并从组件的事件流中获取事件的机制。正如我们所见,这通常涉及一个循环或一个回调函数来处理传入的事件( `INPUT` 、 `STOP` 、 `InputClosed` 等)。
  22. 3. **发送输出数据:** 用于在组件的输出流上发布数据的函数。您可以指定要发送数据的输出 ID,并提供数据负载(通常使用 `Apache Arrow` 进行结构化)。
  23. 4. **访问数据/元数据:** 辅助函数可轻松访问 INPUT 事件中包含的数据( `event["value"]` )和可选 `metadata` ( `event["metadata"]` )。
  24. 5. **处理控制信号:** 针对 `STOP` 等控制事件的内置处理或清晰的通知机制。
  25. ## 使用 `API 绑定`:代码示例
  26. 让我们看一下使用 `Python API` 绑定( `dora-rs` 库)的简化示例,这是 `AI` 和数据处理任务的常见选择。
  27. ### 示例 1:使用 `API` 的简单 `Python` 节点
  28. 这与我们在第二章:`Node` 中看到的结构类似,但现在重点关注 `API` 调用本身:
  29. ```py
  30. from dora import Node # Import the Node class from the API binding
  31. # 1. Initialize the node
  32. node = Node()
  33. print("Python Node initialized. Waiting for events...")
  34. # 2. Enter the event loop to receive events
  35. for event in node:
  36. event_type = event["type"]
  37. event_id = event["id"] # Often the input ID
  38. print(f"Received event: Type={event_type}, ID={event_id}")
  39. if event_type == "INPUT":
  40. # 3. Access input data and process
  41. if event_id == "my_input":
  42. input_data = event["value"]
  43. print(f" -> Processing data from input '{event_id}'...")
  44. # --- Your custom processing logic here ---
  45. processed_data = process_data(input_data)
  46. # ---------------------------------------
  47. # 4. Send output data
  48. print(" -> Sending processed data on output 'my_output'")
  49. node.send_output("my_output", processed_data)
  50. elif event_type == "STOP":
  51. # 5. Handle stop signal
  52. print(" -> Received STOP command. Exiting loop.")
  53. break
  54. print("Python Node stopping gracefully.")
  55. # Dummy processing function for illustration
  56. def process_data(data):
  57. # In a real node, this would transform the data
  58. print(" (Doing dummy processing...)")
  59. # Often data is an Arrow Array, need to convert/process it
  60. # For simplicity, let's just return a dummy byte array
  61. return b"processed_output_data"
  62. ```
  63. - `from dora import Node`:从 `API 绑定`导入必要的类。
  64. - `node = Node()`:初始化与 `dora` 运行时的连接。运行时使用启动期间设置的环境变量(基于你的 `Dataflow YAML` )来识别此节点。
  65. - `for event in node`: 这是接收事件的核心。`API` 会处理阻塞,直到下一个事件在事件流中可用,并以结构化格式(类似 `Python` 字典)提供。
  66. - `event["type"]` , `event["id"]` , `event["value"]`:访问已接收事件的详细信息。`API` 提供以下标准键。
  67. - `node.send_output("my_output", processed_data)`:调用 `API` 函数发送数据。您需要指定 `Dataflow YAML` 中定义的输出 ID `和数据负载。API` 负责格式化数据(如果需要,例如,格式化为 `Arrow` 格式),可能还会使用共享内存 ,并通知运行时。
  68. ### 示例 2:使用 `API` 的简单 `Python` 运算符
  69. 操作员使用稍微不同的 `API` 结构,专门设计用于 `dora-runtime` 进程( 第 3 章:操作员 )。
  70. ```py
  71. # operators/my_operator.py
  72. from dora import DoraStatus # Import necessary status enum
  73. # The Operator logic is typically implemented in a class with specific methods
  74. class Operator:
  75. def on_event(self, dora_event, send_output):
  76. """
  77. This method is called by the dora-runtime whenever an event occurs
  78. for this operator (like receiving an input).
  79. """
  80. event_type = dora_event["type"]
  81. event_id = dora_event["id"] # Often the input ID
  82. print(f"Operator received event: Type={event_type}, ID={event_id}")
  83. if event_type == "INPUT":
  84. if event_id == "operator_input":
  85. input_data = dora_event["value"]
  86. print(f" -> Processing data from operator input '{event_id}'")
  87. # --- Your custom processing logic here ---
  88. processed_data = process_operator_data(input_data)
  89. # ---------------------------------------
  90. # Send output data using the provided send_output function
  91. print(" -> Sending processed data on operator output 'operator_output'")
  92. send_output("operator_output", processed_data, dora_event["metadata"]) # Operators also pass metadata
  93. elif event_type == "STOP":
  94. print(" -> Received STOP command. Operator stopping.")
  95. return DoraStatus.STOP # Return STOP status to signal shutdown
  96. # By default, continue running
  97. return DoraStatus.CONTINUE
  98. # Dummy processing function for illustration
  99. def process_operator_data(data):
  100. print(" (Operator doing dummy processing...)")
  101. return b"processed_operator_data"
  102. ```
  103. - `from dora import DoraStatus`:导入必要的状态值来向运行时发出信号(例如, `DoraStatus.CONTINUE` , `DoraStatus.STOP` )。
  104. - `class Operator`: 在 `Python` 中定义运算符的标准方法。
  105. - `on_event(self, dora_event, send_output)`: `dora-runtime` 期望此特定的方法签名。运行时调用此方法,并传递事件详情( `dora_event` )以及特定于此运算符实例的 `send_output` 函数。
  106. - `dora_event["type"] 、 dora_event["id"] 、 dora_event["value"]`:访问事件详细信息,类似于 `Node API`,但通常直接作为参数提供或嵌套在事件对象中。
  107. - `send_output("operator_output", processed_data, dora_event["metadata"])`:调用提供的函数来从此特定运算符发送输出数据。然后,运行时会根据数据流 YAML 将此输出路由到同一节点内的其他运算符,或者路由到节点外的其他运算符。
  108. - `return DoraStatus.CONTINUE / DoraStatus.STOP`:操作符在处理事件后明确向运行时返回状态,指示它们是否应该继续或停止。
  109. 您可以在 `Rust` 绑定( `dora-node-api` 、 `dora-operator-api` )和 `C/C++` 绑定(通过 `CXX` 和原始 `C FFI` 公开,参见 `apis/rust/node/src/lib.rs` 、 `apis/c++/node/src/lib.rs` 、 `apis/c/node/src/lib.rs` )中看到类似的 `API` 结构和概念。它们提供了针对特定语言范式定制的相同基本功能(初始化、接收、发送、处理信号)。
  110. ## 底层:与运行时交互的 `API` 绑定
  111. 那么,当您调用 `node.send_output(...)` 或 `send_output(...)` 时会发生什么?
  112. `API 绑定库`本身并不实现整个 `dora` 通信逻辑。相反,它与启动节点或操作员的 `dora` 运行时进程( `Dora Daemon/Coordinator` )进行通信。
  113. 当你的 `Node/Operator` 进程启动时, `dora 运行时`会设置高效的通信渠道:
  114. 1. **事件通道:** 这是 `dora 运行时`向你的 `Node/Operator` 发送事件的地方(即事件流 )。`API` 绑定的事件循环( f`or event in node` )正在监听此通道。
  115. 2. **控制通道:** 这是你的节点/操作员向 `dora 运行时`发送命令的地方(例如“发送此输出”、“我已完成此共享内存块的操作”)。`API` 绑定的 `send_output` 函数使用此通道。
  116. 3. **共享内存:** 如第 5 章所述,对于大数据,数据本身通过共享内存传输,并通过控制通道上的消息进行协调。
  117. 这是一个简化的序列图:
  118. ![](/api01.png)
  119. **`API 绑定`** 隐藏了这些通道和协议的复杂性,让您可以专注于应用程序逻辑。它将您语言的数据类型转换为适合 `dora` 的格式(例如 `Arrow` ),并管理高效通信所需的低级交互。
  120. `Rust` 和 `C++ API 绑定`通常会暴露底层细节或允许更细粒度的控制以提高性能,而 `Python` 绑定则优先考虑易用性。但它们的根本目的都是为你的自定义代码提供一种结构化的方式,以便与 `dora` 运行时进行通信。
  121. # 总结
  122. `API 绑定`是必不可少的软件库,它使您能够使用熟悉的编程语言(例如 P`ython`、`Rust`、`C` 和 `C++`)为自定义 `dora` 节点和操作符编写核心逻辑。它们提供了标准化的函数工具包( `init` 、 `send_output` 、事件循环/回调),这些函数抽象了进程间通信、共享内存和事件处理的复杂性,使您可以专注于特定的应用程序任务,同时无缝集成到 `dora` 数据流中。
  123. 现在您已经了解了如何使用 `API 绑定`为各个组件编写代码,让我们缩小范围并查看用于管理和运行整个数据流的主要工具: `Dora CLI` 。

Dora中文社区官网

Contributors (1)