z

zhaodongsheng

V1

2022/05/04阅读:17主题:极客黑

NiFi基于ConvertJsonToSQL实现oracle merge

前言

通过NiFi自定义Processor文章,我们掌握了NiFi自定义开发Processor的流程。

通过基于PutDatabaseRecord的方式实现Oracle mergeg功能。该方案在实现过程中需要较多注意点:

  • 打包时,需要移除依赖nifi-standard-processors依赖,否则会引入编译版本号的新的标准Processor。
  • 需要adapter,则引入nifi-standard-processors依赖,为了规避上面提到的问题,只能重写DatabaseAdapter。

NiFi提供了ConvertJsonToSQL,可以Json转换SQL,通过PutSQL将数据写入数据库。本文尝试演示通过改写ConvertJsonToSQL源码实现Oracle Merge的功能。

如果想直接运行代码,请clone: https://github.com/dawsongzhao1104/nifi/tree/main/nifi-processor

开发环境准备

参考NiFi自定义Processor完成。

工程目录为:convertjsontosqlwithmerge

NiFi开发工程目录如下:

  • nifi-cvte-nar:负责打包工程文件的代码为NiFi支持的nar插件格式。
  • nifi-cvte-processors:开发processor源码及测试代码存放该工程。其中③标记的文件为SPI加载需要的配置文件。

修改工程pom中nifi-nar-bundles版本号为1.15.3

改写ConvertJsonToSQL

复制ConvertJsonToSQL.java到项目工程

下载1.15.3分支NiFi源码,复制ConvertJsonToSQL.java到2节创建的工程中,删除测试NiFi maven开发环境的MyProcessor类。

复制过来后需要修改:

  • 删除MyProcessor类
  • 引入依赖包,处理ConvertJSONToSQL类报错,修改nifi-cvte-processors的pom
<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements. See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License. You may obtain a copy of the License at
  http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com</groupId>
        <artifactId>cvte</artifactId>
        <version>1.0.0</version>
    </parent>

    <artifactId>nifi-cvte-processors</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-api</artifactId>
<!--            增加以下版本号及编译时限制,基础代码不打包到nar包中-->
            <version>1.15.3</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-utils</artifactId>
            <!--            增加以下版本号及编译时限制,基础代码不打包到nar包中-->
            <version>1.15.3</version>
            <scope>compile</scope>
        </dependency>


<!--        import com.github.benmanes.caffeine.cache.Cache;-->
        <dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
            <version>2.8.1</version>
            <scope>compile</scope>
        </dependency>

<!--        import org.codehaus.jackson.JsonNode;-->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.9.13</version>
            <scope>compile</scope>
        </dependency>
<!--        import org.codehaus.jackson.map.ObjectMapper;-->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
            <scope>compile</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-services-api-nar</artifactId>
            <version>1.15.3</version>
            <type>nar</type>
            <scope>compile</scope>
        </dependency>

        <!--        import org.apache.nifi.dbcp.DBCPService;-->
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-dbcp-service-api</artifactId>
            <version>1.15.3</version>
            <scope>compile</scope>
        </dependency>




        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-mock</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

修改MyProcessorTest为ConvertJSONToSQLTest

修改ConvertJSONToSQL

  • 注释掉ConvertJSONToSQL中的
//@SeeAlso(PutSQL.class)
  • ConvertJSONToSQL类名为ConvertJSONToSQLWithMerge
  • 修改org.apache.nifi.processor.Processor内容为:sql.ConvertJSONToSQLWithMerge 完成修改后,如下图所示:
  • 增加MERGE类型
  • 增加Merge函数
private String generateMerge(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
                                  final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
                                  final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix)
 
{

        final Set<String> updateKeyNames;
        if (updateKeys == null) {
            updateKeyNames = schema.getPrimaryKeyColumnNames();
        } else {
            updateKeyNames = new HashSet<>();
            for (final String updateKey : updateKeys.split(",")) {
                updateKeyNames.add(updateKey.trim());
            }
        }

        if (updateKeyNames.isEmpty()) {
            throw new ProcessException("目标表: '" + tableName + "' 没有设置数据级主键,也没有配置处理器的'Update Keys'属性。请设置。");
        }

        final StringBuilder sqlBuilder = new StringBuilder();
        int fieldCount = 0;
        sqlBuilder.append("MERGE INTO ");
        if (quoteTableName) {
            sqlBuilder.append(schema.getQuotedIdentifierString())
                    .append(tableName)
                    .append(schema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }

        sqlBuilder.append(" t1 USING ( \n\t SELECT ");


        // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
        // for each of the Update Key fields.
        StringBuilder sqlOnCause = new StringBuilder("( ");
        final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
        final Set<String> normalizedUpdateNames = new HashSet<>();
        for (final String uk : updateKeyNames) {
            final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
            normalizedUpdateNames.add(normalizedUK);

            if (!normalizedFieldNames.contains(normalizedUK)) {
                String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
                if (failUnmappedColumns) {
                    getLogger().error(missingColMessage);
                    throw new ProcessException(missingColMessage);
                } else if (warningUnmappedColumns) {
                    getLogger().warn(missingColMessage);
                }
            }else{
                sqlOnCause.append(" t1.").append(normalizedUK).append("=").append(" t2.").append(normalizedUK).append(" AND ");
            }
        }
        String onCause = sqlOnCause.toString().replace(" AND "," ) WHEN MATCHED THEN UPDATE SET ");

        // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
        // adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
        // columns that we are inserting into
        StringBuilder sqlUpdateSet = new StringBuilder();
        StringBuilder sqlInsertField =  new StringBuilder(" \t WHEN NOT MATCHED THEN INSERT( ");
        StringBuilder sqlInsertValues =  new StringBuilder(" VALUES(");

        Iterator<String> fieldNames = rootNode.getFieldNames();
        while (fieldNames.hasNext()) {
            final String fieldName = fieldNames.next();

            final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
            final ColumnDescription desc = schema.getColumns().get(normalizedColName);
            if (desc == null) {
                if (!ignoreUnmappedFields) {
                    throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database.["+schema.toString()+"]");
                } else {
                    continue;
                }
            }

            // Check if this column is an Update Key. If so, skip it for now. We will come
            // back to it after we finish the SET clause
//            if (normalizedUpdateNames.contains(normalizedColName)) {
//                continue;
//            }

            if (fieldCount++ > 0) {
                sqlBuilder.append(", ");
                if (!normalizedUpdateNames.contains(normalizedColName)) {
                    //如果是更新Key则不用添加到sqlUpdateSet、sqlInsertField、sqlInsertValues
                    sqlUpdateSet.append(", ");
                    sqlInsertField.append(", ");
                    sqlInsertValues.append(", ");
                }
            }

            sqlBuilder.append(" \t ? ");
            if(escapeColumnNames){
                StringBuilder sb = new StringBuilder();
                sb.append(schema.getQuotedIdentifierString())
                        .append(desc.getColumnName())
                        .append(schema.getQuotedIdentifierString());

                sqlBuilder.append(sb);

                if (!normalizedUpdateNames.contains(normalizedColName)) {
                    //如果是更新Key则不用添加到sqlUpdateSet、sqlInsertField、sqlInsertValues
                    sqlUpdateSet.append(" t1.").append(sb).append(" = t2.").append(sb);
                    sqlInsertField.append(" t1.").append(sb);
                    sqlInsertValues.append(" t2.").append(sb);
                }


            } else {
                String sb = desc.getColumnName();
                sqlBuilder.append(sb);

                if (!normalizedUpdateNames.contains(normalizedColName)) {
                    //如果是更新Key则不用添加到sqlUpdateSet、sqlInsertField、sqlInsertValues
                    sqlUpdateSet.append(" t1.").append(sb).append(" = t2.").append(sb);
                    sqlInsertField.append(" t1.").append(sb);
                    sqlInsertValues.append(" t2.").append(sb);
                }
            }


            final int sqlType = desc.getDataType();
            attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));

            final Integer colSize = desc.getColumnSize();

            final JsonNode fieldNode = rootNode.get(fieldName);
            if (!fieldNode.isNull()) {
                String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
                attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
            }
        }

        // Set the ON  clause based on the Update Key values
        sqlBuilder.append(" from dual) t2 \n on ")
                .append(onCause)
                .append(sqlUpdateSet)
                .append(sqlInsertField).append(")")
                .append(sqlInsertValues).append(")");
        return sqlBuilder.toString();
    }

编译打包

mvn clean install

发布测试

准备测试数据库

安装/申请Oracle数据库(Oracle安装部署较麻烦,建议向企业运维团队申请)。

  • 配置Oracle Controller Service
  • 配置NiFi Oracle Merge测试data pipeline

支持SQL查询,任意选择一条数据作为测试输入数据。

注释事项:

  1. 执行语句类型:MERGE
  2. 表名保持与Oracle数据库名一致,注意大小写
  3. Catalog名不填写
  4. Schema名,保持与Oracle数据库中一致,必须注意大小写
  5. Unmatched Field Behavior: Fail.报错就失败。
  6. Update Keys:数据库中更新的主键
  • 查看生成的MERGE SQL并在数据库上执行验证
merge
into
 MD.MD_CVTE_BRAND t1
  using (
 select
  ? ID,
  ? BRAND_ID,
  ? BRAND_CODE,
  ? BRAND_CLASS,
  ? BRAND_NAME,
  ? BRAND_EN_NAME,
  ? BRAND_TYPE_ID,
  ? BRAND_TYPE,
  ? REMARK,
  ? LIFECYCLE,
  ? CRT_USER,
  ? CRT_TIME,
  ? UPD_USER,
  ? UPD_TIME,
  ? IS_DELETED,
  ? IS_ENABLED,
  ? W_INSERT_DT,
  ? W_UPDATE_DT
 from
  dual) t2 on
 ( t1.ID = t2.ID )
 when matched then
update
set
 t1.BRAND_ID = t2.BRAND_ID,
 t1.BRAND_CODE = t2.BRAND_CODE,
 t1.BRAND_CLASS = t2.BRAND_CLASS,
 t1.BRAND_NAME = t2.BRAND_NAME,
 t1.BRAND_EN_NAME = t2.BRAND_EN_NAME,
 t1.BRAND_TYPE_ID = t2.BRAND_TYPE_ID,
 t1.BRAND_TYPE = t2.BRAND_TYPE,
 t1.REMARK = t2.REMARK,
 t1.LIFECYCLE = t2.LIFECYCLE,
 t1.CRT_USER = t2.CRT_USER,
 t1.CRT_TIME = t2.CRT_TIME,
 t1.UPD_USER = t2.UPD_USER,
 t1.UPD_TIME = t2.UPD_TIME,
 t1.IS_DELETED = t2.IS_DELETED,
 t1.IS_ENABLED = t2.IS_ENABLED,
 t1.W_INSERT_DT = t2.W_INSERT_DT,
 t1.W_UPDATE_DT = t2.W_UPDATE_DT
 when not matched then
insert
 ( 
 t1.BRAND_ID,
 t1.BRAND_CODE,
 t1.BRAND_CLASS,
 t1.BRAND_NAME,
 t1.BRAND_EN_NAME,
 t1.BRAND_TYPE_ID,
 t1.BRAND_TYPE,
 t1.REMARK,
 t1.LIFECYCLE,
 t1.CRT_USER,
 t1.CRT_TIME,
 t1.UPD_USER,
 t1.UPD_TIME,
 t1.IS_DELETED,
 t1.IS_ENABLED,
 t1.W_INSERT_DT,
 t1.W_UPDATE_DT)
values(
t2.BRAND_ID,
t2.BRAND_CODE,
t2.BRAND_CLASS,
t2.BRAND_NAME,
t2.BRAND_EN_NAME,
t2.BRAND_TYPE_ID,
t2.BRAND_TYPE,
t2.REMARK,
t2.LIFECYCLE,
t2.CRT_USER,
t2.CRT_TIME,
t2.UPD_USER,
t2.UPD_TIME,
t2.IS_DELETED,
t2.IS_ENABLED,
t2.W_INSERT_DT,
t2.W_UPDATE_DT)

将占位符号?修改为具体数据,数据库执行。OK.

总结

本文讲解了改写ConvertJsonToSQL源码的方式实现Oracle merge,该方法简单直接,易于调试。

对于文中任何疑问,欢迎加微信讨论:

分类:

人工智能

标签:

人工智能

作者介绍

z
zhaodongsheng
V1