数据流 API
本文介绍使用 TapFlow API 管理数据流的完整参考,包括定义任务来源/目标、执行数据处理等操作。
创建数据流任务
创建数据流任务的核心 API 包括 read_from、write_to 和 save,此外,您还可以根据需求添加处理节点或设置任务同步类型,请跟随下述教程了解基础和进阶用法:
- 基础用法
- 进阶用法
在快速入门部分,我们将介绍如何使用基本的 read_from、write_to 和 save API 来创建数据流任务。适用于简单数据实时同步,具体流程如下:
-
read_from:指定数据流任务的主数据源表,可通过
data_source_name.table_name的方式指定,其中data_source_name可通过show dbs获取,或新建数据源,示例如下:# 指定要读取的源表
tap> myflow = Flow("DataFlow_Test") \
.read_from("MongoDB_Demo.ecom_orders") -
write_to:指定数据流任务的目标表,可通过
data_source_name.table_name的方式定义简单目标表,其中data_source_name可通过show dbs获取,或新建数据源,示例如下:# 将源表实时写入 ecom_orders 表
tap> myflow = Flow("DataFlow_Test") \
.write_to("MongoDB_Demo.ecom_orders") -
save:保存当前任务的配置,使其成 为持久化的任务。调用
save()后,该数据流任务即可被启动或停止,示例如下:# 保存数据流任务
tap> myflow.save();
最简示例
将上述所有步骤合并成一个完整示例,用于从 MySQL 读取订单数据并写入 MongoDB,保存后可执行 start 命令来启动该任务。
# 创建数据流任务
tap> myflow = Flow("DataFlow_Advanced") \
.read_from("MySQL_Demo.ecom_orders") \
.write_to("MongoDB_Demo.Orders") \
.save();
对于更复杂的用法,您可以进一步配置多表读取、数据处理节点和同步类型等,详细内容请见进阶用法标签页。
在此部分,我们将介绍如何进一步配置和定制数据流任务,适用于多表读取、数据处理节点添加和同步类型设置等更复杂场景,具体流程如下:
-
read_from:指定数据流任务的源表,可通过
data_source_name.table_name简单定义,也可使用Source API实例化对象进行复杂配置(如多表同步、性能优化),更多介绍,见源端进阶设置,示例如下:# 使用 source API 实例化源表
tap> source = Source('MySQL_ECommerce', table=['ecom_orders', 'ecom_customers'])
# 配置源端读取行为
tap> source.initial_read_size(500) # 设置全量读取批次大小为 500 条
tap> myflow = Flow("DataFlow_Advanced") \
.read_from(source)如需使用自定义查询,可通过
query参数直接指定,例如myflow.read_from("MongoDB_Demo.ecom_orders", query="SELECT * FROM ecom_orders WHERE status='active'")。 -
添加处理节点:在将数据写入到目标端之前,您可以添加不同类型的处理节点,实现数据预处理、数据结构调整等复杂需求,更多使用方法及参数介绍,见处理节点说明。
-
write_to:指定数据流任务的目标表,可通过
data_source_name.table_name简单定义,也可以通过Sink API实例化对象进行复杂配置(如高并发写入、写入行为等)的场景,更多介绍,见目标端进阶设置,示例如下:# 使用 Sink API 实例化目标表
tap> sink = Sink("MongoDB_Demo", table="ecom_orders")
# 配置目标端写入行为
tap> sink.keep_data() # 保留目标表原有数据
tap> sink.set_write_batch(500) # 每批次写入 500 条记录
tap> myflow = Flow("DataFlow_Test") \
.write_to(sink) -
save:保存当前任务的配置,使其成为持久化的任务。调用
save()后,该数据流任务即可被启动或停止。# 保存并创建数据流任务
tap> myflow.save();
完整示例
本示例展示了如何从 MySQL 读取多个表,配置批量写入、保留原有数据,并添加过滤节点保留订单金额大于 100 的记录。最终,将处理后的数据实时同步到 MongoDB 目标表。保存任务后,可执行 start 命令来启动任务。
# 引用已有数据源,设置为同步多表的数据复制任务
source = Source('MySQL_ECommerce', table=['ecom_orders', 'ecom_customers'])
# 源端高级配置
source.initial_read_size(500) # 设置全量读取批次大小为 500 条
print("数据源高级配置完成,准备创建数据流任务...")
# 定义目标表
sink = Sink('MongoDB_Demo', table=['ecom_orders', 'ecom_customers'])
# 目标端高级配置
sink.keep_data() # 保留目标表结构和数据
sink.set_write_batch(500) # 每批次写入 500 条记录
print("目标端写入配置完成!")
# 创建数据流任务并添加处理节点
flow = (
Flow("DataFlow_Advanced")
.read_from(source)
.filter("order_amount > 100") # 添加过滤节点,保留订单金额大于 100 的数据
.write_to(sink)
.save()
)
print("数据流任务配置完成!")
源端进阶配置
在 TapFlow 中,Source API 是数据流任务的起点,用于定义数据源、表名和任务类型,并加载源表数据支持任务执行。同时,Source 提供高级功能和配置选项,满足数据同步、实时变更捕获(CDC)及性能优化等需求。
Source API 的高级配置仅适用于后续要配置的数据流任务,不会修改全局数据源的默认设置或影响其他已定义的数据流任务。
定义源表和任务类型
Source 支持灵活的表选择和任务模式配置,适用于多种数据流场景:
-
数据转换任务(单表):当任务仅处理一个特定表时,
Source将自动设置任务类型为 数据转换任务,适用于数据建模、ETL、数据清理或构建宽表等场景,且目标通常为单表。# 数据转换任务:只处理 ecom_orders 单表
source = Source('MySQL_ECommerce', table='ecom_orders') -
数据复制任务(多表):当需要处理多个表或使用正则表达式匹配表名时,任务将被设置为 数据复制任务,适用于数据库迁移、数据库上云、数据库备份或多表整库同步等场景。
# 数据复制任务:指定多个表
source = Source('MySQL_ECommerce', table=['ecom_orders', 'ecom_customers'])
# 数据复制任务:使用正则表达式匹配表名
source = Source('MySQL_ECommerce', table_re='sales_.*')提示正则匹配适用于需要动态监控新增表并自动纳入同步范围的场景。
启用 DDL 同步
在任务中启用 DDL 同步 功能(默认关闭状态),确保源库的表结构变化实时同步到目标库,支持采集源库的 DDL 事件通常包含新增字段、修改字段名、修改字段属性、删除字段。
source.enableDDL()
除开启该开关外,还需要目标库支持 DDL 应用,您可以通过支持的数据源文档,查询各类数据源对 DDL 事件采集和 DDL 应用的支持情况。更多介绍,见 DDL 变更处理最佳实践。
启用 MongoDB PreImage
在 MongoDB 数据源任务中启用 PreImage 功能(默认关闭状态),可在捕获增量更新事件时补充更新前的旧值,以便实现审计或回滚。
source.enablePreImage()
禁用更新字段补全
更新字段补全功能(默认开启)用于在捕获更新操作时,将所有字段(包括未更新的字段)都写入到目标库,以确保数据一致性。启用该功能可能会增加目标库的存储成本。通过以下方法可禁用字段补全:
source.disable_filling_modified_data()
设置增量读取批量大小
定义增量模式下每批读取的数据条数(默认值为 1)。增大该值可以提升吞吐量,但可能会增加延迟。
# 设置每批读取 10 条数据
source.increase_read_size(10)