DataIntegration
Contents
💠
💠 2024-11-20 19:35:19
Data Integration
数据集成
大数据ETL工具对比——SeaTunnel、DataX、Sqoop、Flume、Flink CDC、Dlinky、TIS、Chunjun等对比_seatunnel和datax-CSDN博客
Datax
注意 这是一次性的开源项目,bug基本需要自己处理,从代码行数提交情况和issue,PR的活跃情况可以看出
离线数据同步框架, 扩展读/写 Plugin 以支持多种数据源。
核心框架负责处理流控,缓存,并发,自定义的数据转换Transformer等。
- 特性
- 轻量: 一份JSON配置启动一个Java进程
- 支持线程级并发同步,按指定分批字段拆分数据 限制分批字段整型或字符串
- 运行时定期展示流量,行数等进度信息
- 支持脏数据探测,failfast
- 支持流控策略配置 字节数,行数
- 限制
- 不支持实时增量,离线增量需要手动调整JSON配置实现
- 单进程模式,无法集群式同步,资源利用不够高(单任务在做好读端和写端的优化话是可以打满网卡的)
使用
Tips
- 配置的json文件要
严格按照案例JSON来配置
,因为他不是按对象解析是按无结构json来顺序遍历字段去解析的- 踩过一个坑就是writer部分在reader部分前面,然后驱动加载出问题了,查看对应源码和jvm的加载类发现是有的,很隐蔽的报错,完全想不到是json配置顺序问题。
为什么不建议使用DataX读写GreenPlum 不建议用 postgresqlwriter,可以用 HashData DataX 的 gpdbwriter 插件替代
打包指定模块 mvn clean package -DskipTests assembly:assembly -pl plugin-rdbms-util -am
设计
- Job 管理子任务: JobContainer 和 TaskGroupContainer
- Reader将全部数据拆分成若干任务,在TaskGroupContainer中消费完(包含错误重试,并发限制,超时提示)
- Task 执行读写: TaskGroupContainer.TaskExecutor
- readerThread writerThread 实现多线程的生产者消费者模型 通过 Communication 通信
- 读写线程的逻辑实现为 ReaderRunner 和 WriterRunner
组件
Reader
- table模式: 只配置源表的 column,不灵活(需要源表对目标表字段名和类型一致)但支持并发。
- querySQL模式:配置源表查询SQL,可以join,别名,函数计算。更灵活但是不支持并发,同步性能差
并行同步: 通过splitPk:拆分字段
只支持Long,字符串
和 speed.channel: 并发数 去拆分上游数据
-
SingleTableSplitUtil#genPKSql
- 查询出 分片字段在上游表的最小和最大值,确认拆分的边界
-
SingleTableSplitUtil#splitSingleTable
- 参数 expectSliceNumber 的来源于Datax.json的直接指定和 限速channel,限速速率等取较小值。
- 由于拆分是按ascii实现(先将字符串按ascii转为超大整数BigInteger,做完分段拆分后将若干段的边界值(超大整数)转回ascii字符),这个方式是有风险的 问题如下。
-
TaskGroupContainer#start
拆分出的若干SQL,包装为 TaskExecutor 绑定读写线程后启动- 依据前文拆分的若干分片SQL,while true 顺序遍历依据限制的并发值启动线程执行, 并对失败任务重试
- 注意绑定的读写线程都通过设置线程对象的 setContextClassLoader 来实现对各种插件的加载,且和Datax主JVM进程的类加载器隔离
- 每条SQL均是游标查询方式 CommonRdbmsReader.Task#startRead
ResultSet query(Connection conn, String sql, int fetchSize)
-
问题
- 超大整数转ascii字符时,转出了单引号但是未转义,然后直接拼到SQL里,导致SQL语法错误。
- 分段后数据范围有交叉导致同步的数据量大于上游数据总量,是概率性出现问题,因为这个字符转int的做法导致了字符的边界互相影响了,范围SQL产生了交集:
- 从ascii码来计算的完整切分分段,在GP执行时会有问题,因为GP的字符串比较方式并不是严格按照字符ascii的值
- Comparison Functions and Operators
1 2 3 4 5 6 7 8 9
int channel = 5; List<String> result = List<String> result = RdbmsRangeSplitWrap.splitAndWrap("P010", "P024", channel * 5, "prodcode", "'", DataBaseType.PostgreSQL); // 按Datax逻辑来说分段有26段,其中有4段的条件都命中了 P010 的数据, 导致了P010这个部分的数据重复了。 // 例如这两段SQL: 在GP里面 select 'P010' > 'P01<'; 返回为True // 'P01<' <= prodcode AND prodcode < 'P01B' // 'P010' <= prodcode AND prodcode < 'P016' List<String> result = RdbmsRangeSplitWrap.splitAndWrap("2016-02-06", "2024-05-06", 4, "period", "'", DataBaseType.PostgreSQL); // 结果的数组中有元素的字面值包含了控制字符 \r. 将生成的SQL去查数据库没有问题,拆分的四段只有13段能查出数据 24段数据为空
-
扩展
- 特定优化思路:将拆分列查出全部的去重值 构造出分批in的SQL。 优点:将以该列的数据分布情况并发同步,贴合数据的业务特点。缺点:如果该列的去重值非常多,SQL会超长。
Writer
- CommonRdbmsWriter.Task#startWriteWithConnection 模板类 消费Reader的数据 批量写入目标库
- 错误重试机制 相关配置项为
core.container.task.failOver
- 当一个批次遇到任意SQLException时,都会将这个批次的数据做事务回滚,逐条事务写入(估计是为了规避事务死锁)
- 但是这个机制没有考虑下游数据源不支持事务的情况,此时这个机制就引发下游出现重复数据了。
- 错误重试机制 相关配置项为
两个参数,任一条件满足就执行一次insert
- batchSize 默认2048
- batchByteSize 默认32mib
- 该参数值需要谨慎设置,此大小是每个Task都需要的缓存区大小,如果设置过大,会发生OOM
- 例如设置堆内存1G 5并发 该值200Mib时,刚开始同步就会触发OOM,因为堆内存不够,没有留空间给datax自身
SeaTunnel
使用 Spark、Flink 作为底层数据同步引擎使其具备分布式执行能力,开放并完善的插件体系和API集成。
核心流程为 Source -> Transform -> Sink 。 Source 和 Sink 统称为Connector 负责读写数据库, Transform负责数据转换:别名映射,函数处理过滤。
这个架构设计将读和转换分离了,就没有Datax的两个模式所面临的问题,既支持读数据时做别名,也支持并发。
并行读取 支持 数值,字符串,日期 类型字段
- 生成拆分列逻辑 ChunkSplitter#generateSplits 字符串类型字段采用的是hash后取模方式
JdbcDialect#hashModForField
pg,oracle,mssql。 - 执行数据拆分 FixedChunkSplitter#createSplitStatement
FlinkX ChunJun
Flink CDC
Tapdata
支持集群架构,但是不支持单个任务的分布式(单个任务的效率无法通过扩充节点来横向提升)
Kettle
Kettle大量数据快速导出的解决方案
奇怪的是SpringBoot项目同样Fetch方式加流式Excel导出,整体导出效率低很多
- 关联源码在 org.pentaho.di.trans.steps 下的 tableinput 和 excelwriter 包
Author Kuangcp
LastMod 2024-04-10