Linux中DataX使用第四期

news/2025/2/22 19:31:38

 简介

  紧接着上期关于定义如何一个简单的插件,本期了解下关系型数据库的数据读取和数据写入。

环境

  •  Windows10 (linux中命令相似,为了方面调试就用windows的)
  • JDK(1.8以上,推荐1.8)
  • Python(2或3都可以)
  • Apache Maven (推荐3.x版本)
  • IntelliJ IDEA 2023.2.2 (IDE没要求,能打开maven项目就行)
  • 源码下载地址

内容

  DataX运行的时序图

  •   对于关系型数据库DataX内有一个公共的调用工具。

CommonRdbmsReader

包含两个内部类JobTask的公共类,分别用于处理作业级别和任务级别的操作。

Job负责作业的初始化、预检查、拆分、以及作业结束时的清理工作。

  • init(Configuration originalConfig): 初始化作业配置。
  • preCheck(Configuration originalConfig, DataBaseType dataBaseType): 预检查数据库连接和查询语句的有效性。
  • split(Configuration originalConfig, int adviceNumber): 将作业拆分为多个任务。
  • post(Configuration originalConfig)destroy(Configuration originalConfig): 作业结束时的清理工作。

Task负责单个任务的初始化、数据读取和任务结束时的清理工作。

  • init(Configuration readerSliceConfig): 初始化任务配置,包括数据库连接信息。
  • startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize): 开始读取数据,执行查询并将结果发送到记录发送器。
  • post(Configuration originalConfig)destroy(Configuration originalConfig): 任务结束时的清理工作。
  • transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector): 处理并传输一条记录。
  • buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector): 根据结果集构建记录。
核心代码
java"> public void startRead(Configuration readerSliceConfig,
                              RecordSender recordSender,
                              TaskPluginCollector taskPluginCollector, int fetchSize) {
            // 从配置中获取查询SQL语句
            String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
            // 从配置中获取表名
            String table = readerSliceConfig.getString(Key.TABLE);

            // 将任务详情添加到性能跟踪实例中
            PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

            // 打印日志,表示开始读取记录
            LOG.info("Begin to read record by Sql: [{}\n] {}.",
                    querySql, basicMsg);
            // 创建一个性能记录对象,记录SQL查询阶段
            PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
            queryPerfRecord.start();

            // 获取数据库连接
            Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
                    username, password);

            // session config .etc related
            DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
                    this.dataBaseType, basicMsg);

            int columnNumber = 0;
            ResultSet rs = null;
            try {
                rs = DBUtil.query(conn, querySql, fetchSize);
                queryPerfRecord.end();

                ResultSetMetaData metaData = rs.getMetaData();
                columnNumber = metaData.getColumnCount();

                //这个统计干净的result_Next时间
                PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
                allResultPerfRecord.start();

                long rsNextUsedTime = 0;
                long lastTime = System.nanoTime();
                while (rs.next()) {
                    rsNextUsedTime += (System.nanoTime() - lastTime);
                    this.transportOneRecord(recordSender, rs,
                            metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
                    lastTime = System.nanoTime();
                }

                allResultPerfRecord.end(rsNextUsedTime);
                //目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
                LOG.info("Finished read record by Sql: [{}\n] {}.",
                        querySql, basicMsg);

            }catch (Exception e) {
                throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
            } finally {
                DBUtil.closeDBResources(null, conn);
            }
        }

这段Java代码是一个数据库读取任务的核心实现,用于从数据库中读取数据。它实现了startRead方法,该方法接收几个参数,包括配置信息、记录发送器、任务插件收集器以及每次查询的记录数。

方法参数
  • readerSliceConfig:包含数据库连接和查询配置的Configuration对象。
  • recordSender:用于发送读取到的记录的对象。
  • taskPluginCollector:用于收集任务插件的错误信息的对象。
  • fetchSize:每次查询数据库时获取的记录数。
1.变量初始化
  • 从配置中获取SQL查询语句和表名。
java">String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
2.性能跟踪
  • 使用PerfTracePerfRecord进行性能跟踪,记录任务开始时间和详细信息。
javascript">PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
3.数据库连接
  • 使用DBUtil工具类获取数据库连接,并根据配置处理会话配置。
java">Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, basicMsg);
4.查询数据库
  • 执行SQL查询并获取结果集,记录查询结束时间,获取结果集的元数据(列数)。
java">int columnNumber = 0;
ResultSet rs = null;
try {
    rs = DBUtil.query(conn, querySql, fetchSize);
    queryPerfRecord.end();
    ResultSetMetaData metaData = rs.getMetaData();
    columnNumber = metaData.getColumnCount();
5.读取结果集
  • 使用while循环遍历结果集,记录每条记录的读取时间,并通过transportOneRecord方法发送记录。
  • transportOneRecord表示从数据库查询结果集中读取数据,并根据数据类型将其转换为相应的数据列,最终构建一个完整的记录对象
java">PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
while (rs.next()) {
    rsNextUsedTime += (System.nanoTime() - lastTime);
    this.transportOneRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
    lastTime = System.nanoTime();
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, basicMsg);
6.异常处理
  • 捕获异常并抛出自定义的数据库查询异常,最后确保关闭数据库连接。
java">} catch (Exception e) {
    throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
    DBUtil.closeDBResources(null, conn);
}
总结

这段代码的主要功能是从数据库中读取数据,并通过性能跟踪记录查询和读取过程的时间。它使用了JDBC来连接和查询数据库,并通过RecordSender发送读取到的记录。代码中还包含了异常处理和资源释放的逻辑,确保在发生异常时能够正确处理并释放数据库连接。

CommonRdbmsWriter

也包含两个内部类JobTask的公共类,分别用于处理作业级别和任务级别的操作。

Job负责作业的初始化、预检查、拆分、以及作业结束时的清理工作。

  • init(Configuration originalConfig):初始化作业配置。
  • writerPreCheck(Configuration originalConfig, DataBaseType dataBaseType):进行写前检查,包括SQL语法检查和权限检查。
  • prepare(Configuration originalConfig):执行作业前的准备工作,如执行预SQL语句。
  • split(Configuration originalConfig, int mandatoryNumber):将作业配置分割成多个任务配置。
  • post(Configuration originalConfig):执行作业后的操作,如执行后SQL语句。
  • destroy(Configuration originalConfig):销毁作业资源。

Task负责单个任务的初始化、数据读取和任务结束时的清理工作。

  • init(Configuration writerSliceConfig):初始化任务配置。
  • prepare(Configuration writerSliceConfig):执行任务前的准备工作。
  • startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection):使用给定的数据库连接开始写入数据。
  • startWrite(RecordReceiver recordReceiver, Configuration writerSliceConfig, TaskPluginCollector taskPluginCollector):开始写入数据。
  • post(Configuration writerSliceConfig):执行任务后的操作。
  • destroy(Configuration writerSliceConfig):销毁任务资源。
  • doBatchInsert(Connection connection, List<Record> buffer):批量插入数据。
  • doOneInsert(Connection connection, List<Record> buffer):逐条插入数据。
  • fillPreparedStatement(PreparedStatement preparedStatement, Record record):填充PreparedStatement对象。
  • fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column):根据字段类型填充PreparedStatement对象。
  • calcWriteRecordSql():计算写入记录的SQL语句。
  • calcValueHolder(String columnType):计算值占位符。
核心代码
java"> public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
            System.err.println("CommonRdbmsWriter Task startWriteWithConnection");
            this.taskPluginCollector = taskPluginCollector;

            // 用于写入数据的时候的类型根据目的表字段类型转换
            this.resultSetMetaData = DBUtil.getColumnMetaData(connection,
                    this.table, StringUtils.join(this.columns, ","));
            // 写数据库的SQL语句
            calcWriteRecordSql();

            List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
            int bufferBytes = 0;
            try {
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    if (record.getColumnNumber() != this.columnNumber) {
                        // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                this.columnNumber));
                    }

                    writeBuffer.add(record);
                    bufferBytes += record.getMemorySize();

                    if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
                        doBatchInsert(connection, writeBuffer);
                        writeBuffer.clear();
                        bufferBytes = 0;
                    }
                }
                if (!writeBuffer.isEmpty()) {
                    doBatchInsert(connection, writeBuffer);
                    writeBuffer.clear();
                    bufferBytes = 0;
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                writeBuffer.clear();
                bufferBytes = 0;
                DBUtil.closeDBResources(null, null, connection);
            }
        }
方法参数
  • recordReceiver:用于接收从数据源读取的数据记录。
  • taskPluginCollector:用于收集任务执行过程中的信息,比如错误信息。
  • connection:数据库连接对象,用于与数据库进行交互。
1.初始化任务收集器

将传入的任务收集器赋值给当前对象的成员变量。

java">this.taskPluginCollector = taskPluginCollector;
2.获取列元数据

调用DBUtil工具类的方法,获取目标表的列元数据,包括列名、类型等。

java">this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(this.columns, ","));
3.计算写入SQL语句

调用方法计算写入数据库的SQL语句。

java">calcWriteRecordSql();
4.初始化缓冲区

初始化一个记录列表作为缓冲区,用于批量写入数据。

java">List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
int bufferBytes = 0;
5.读取并写入数据

  • recordReceiver中读取记录,直到没有更多记录。
  • 检查读取的记录列数是否与目标表列数一致,不一致则抛出异常。
  • 将记录添加到缓冲区,并计算缓冲区的大小。
  • 当缓冲区达到指定大小(行数或字节数)时,调用doBatchInsert方法批量写入数据,并清空缓冲区
java">Record record;
while ((record = recordReceiver.getFromReader()) != null) {
    if (record.getColumnNumber() != this.columnNumber) {
        throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", record.getColumnNumber(), this.columnNumber));
    }

    writeBuffer.add(record);
    bufferBytes += record.getMemorySize();

    if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
        doBatchInsert(connection, writeBuffer);
        writeBuffer.clear();
        bufferBytes = 0;
    }
}
6.处理剩余记录

如果在读取完所有记录后,缓冲区中还有剩余的记录,则进行最后一次批量写入。

java">if (!writeBuffer.isEmpty()) {
    doBatchInsert(connection, writeBuffer);
    writeBuffer.clear();
    bufferBytes = 0;
}
7.异常处理

捕获并抛出自定义的DataX异常。

java">} catch (Exception e) {
    throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
8.资源清理

清空缓冲区,释放数据库连接资源。

java">finally {
    writeBuffer.clear();
    bufferBytes = 0;
    DBUtil.closeDBResources(null, null, connection);
}
总结 

  这段代码的主要目的是从recordReceiver中读取记录读取数据,然后按照配置的批量大小和字节数,将数据写入目标数据库。

结语

  对于关系型数据库DataX基本都实现了,如果有定制化需求可以通过对上面的代码模块进行复制后改造来实现。同时DataX还支持非结构化存储(图片、文件、视频等)数据同步,可参考plugin-unstructured-storage-util模块。


http://www.niftyadmin.cn/n/5862689.html

相关文章

天 锐 蓝盾终端安全管理系统:办公U盘拷贝使用管控限制

天 锐 蓝盾终端安全管理系统以终端安全为基石&#xff0c;深度融合安全、管理与维护三大要素&#xff0c;通过对桌面终端系统的精准把控&#xff0c;助力企业用户构筑起更为安全、稳固且可靠的网络运行环境。它实现了管理的标准化&#xff0c;有效破解终端安全管理难题&#xf…

经典Embedding方法:Word2Vec与Skip-Gram算法)

引言 在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;词嵌入&#xff08;Word Embedding&#xff09;是表示词汇语义的关键技术。2013年&#xff0c;Google提出的Word2Vec模型彻底改变了这一领域&#xff0c;其核心思想是通过无监督学习将单词映射到低维稠密向量空间…

UE5中按钮圆角,设置边框

FSlateBrush NormalBrush; NormalBrush.DrawAs ESlateBrushDrawType::RoundedBox; NormalBrush.TintColor FLinearColor(245 / 255.f, 245 / 255.f, 1.0f); // NormalBrush.OutlineSettings.CornerRadii FVector4(1.0f, 1.0f, 1.0f, 1.0f); // 圆角半径&#xff0c;分别对应…

普通人使用生成式语言模型的几个阶段

随着生成式语言模型&#xff08;如 ChatGPT、Grok 等&#xff09;逐渐走进大众生活&#xff0c;普通人从最初的懵懂尝试&#xff0c;到熟练运用&#xff0c;再到理性判断其输出结果是否可靠&#xff0c;经历了一个逐步进阶的过程。以下&#xff0c;我将详细描述普通人使用生成式…

R语言安装生物信息数据库包

R语言安装生物信息数据库包 在生物信息学领域&#xff0c;R语言是重要的数据分析工具。今天&#xff0c;我们就来聊聊在R语言环境下&#xff0c;安装生物信息数据库包&#xff08;org.*.*.db&#xff09;的步骤。 为什么要安装org.*.*.db系列包 生物信息学分析中&#xff0c…

网络安全高级软件编程技术

安全软件开发入门 软件安全问题 有趣的《黑客帝国》终极解释&#xff1a; 《黑客帝国》故事里面的人物关系&#xff0c;就像电脑里面的各种程序的关系一样&#xff1a; 电脑里面的系统程序&#xff1a;Matrix&#xff1b; 病毒程序&#xff1a;以Neo为首的人类&#xff1b; …

Java+SpringBoot+Vue+数据可视化的美食餐饮连锁店管理系统

感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;我会一一回复&#xff0c;希望帮助更多的人。 系统介绍 在当下这个瞬息万变的时代&#xff0c;餐饮行业正以惊人的速度蓬勃发展。随着人们生活水…

AI边缘计算盒子价格各异,如何精准选型成企业难题

在如今的数字化转型浪潮下&#xff0c;AI 边缘计算盒子正成为企业智能化升级的核心设备&#xff01;但产品价格因品牌、型号、性能而异&#xff0c;如何精准选型成企业难题&#xff01;下面万物这就带大家了解下AI边缘计算盒子价格&#xff1a; 企业产品线丰富&#xff0c;满足…