Lanson

V1

2022/12/22阅读:49主题:丘比特忙

大数据Kudu(六):Kudu Java Api操作

Kudu Java Api操作

Kudu没有提供标准SQL操作,支持Nosql样式的API,这里使用Java 操作Kudu ,包括创建表、插入数据、修改删除数据、删除表等操作,值得注意的是,Java api直接操作Kudu在开发中不是常用的方式,常用方式是Spark操作Kudu、Kudu与Impala整合写SQL操作Kudu。这里为了后续学习,需要在Kudu中创建一些表。

一、​​​​​​​添加Maven依赖

Java操作Kudu需要在创建好的Maven项目中导入kudu-client依赖,此外我们这里使用的是CDH版本的kudu依赖包,maven默认不支持CHD相关依赖,需要在maven中导入Cloudera仓库支持CDH依赖包:

<!-- maven 不支持CDH 相关依赖,需要添加Cloudera 仓库,可以使maven下载CDH相关依赖 -->
<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<!-- 添加kudu-client依赖 -->
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.10.0-cdh6.3.2</version>
</dependency>

二、​​​​​​​​​​​​​​创建表

创建表有如下几个步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 准备创建表的Schema信息。
  3. 指定创建表的分区属性。
  4. 使用KuduClient对象,创建表。
  5. 关闭KuduClient对象。

代码如下:

/**
 * 1. 创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2. 准备创建表的Schema信息。
 */

ArrayList<ColumnSchema> schemaList = new ArrayList<>();
//添加列,key是指定当前列是否是主键列
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("id",Type.INT32).key(true).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("name",Type.STRING).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("age",Type.INT32).build());
schemaList.add(new ColumnSchema.ColumnSchemaBuilder("score",Type.DOUBLE).build());
Schema schema = new Schema(schemaList);

/**
 * 3. 指定创建表的分区属性。
 */

CreateTableOptions options = new CreateTableOptions();
//指定按照id进行hash分区到3个分区,默认id.hashcode % 3 ,决定数据进入哪个tablet
options.addHashPartitions(Arrays.asList("id"), 3);

/**
 * 4. 使用KuduClient对象,创建表。
 */

//参数:表名,表的schema信息,表的分区属性
kuduClient.createTable("personInfo",schema , options);

/**
 * 5. 关闭KuduClient对象。
 */

kuduClient.close();

执行完成以上命令可以登录Kudu查看到对应的表。

三、插入数据

向Kudu表中插入数据经过以下步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 获取插入数据的表。
  3. 准备插入数据对象Insert, 准备插入数据PartialRow对象。
  4. 开启session会话,应用插入操作,插入数据。
  5. 关闭KuduClient对象。

代码如下:

/**
 * 1.创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2.获取插入数据的表。
 */

KuduTable personInfo = kuduClient.openTable("personInfo");

/**
 * 3.准备插入数据对象Insert, 准备插入数据PartialRow对象。
 */

//创建插入操作
Insert insert = personInfo.newInsert();
//创建插入的数据对象PartialRow
PartialRow row = insert.getRow();
row.addInt("id"1);
row.addString("name","zhangsan" );
row.addInt("age",18 );
row.addDouble("score",100.0 );

/**
 * 4.开启session会话,应用插入操作插入数据。
 */

//创建kuduSession会话
KuduSession kuduSession = kuduClient.newSession();
//设置kudu刷新插入数据策略
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
//插入数据
kuduSession.apply(insert);

/**
 * 5.关闭KuduClient对象。
 */

kuduSession.close();
kuduClient.close();

经过以上操作,数据插入成功,但是无法在kudu webui中查看到对应的数据,需要使用API查询。

此外,JAVA API 提供了三种向 kudu 插入数据的刷新策略,分别为:

  • AUTO_FLUSH_SYNC

AUTO_FLUSH_SYNC(默认),意思是调用 KuduSession.apply()方法后,客户端会在当数据刷新到服务器后再返回,这种情况就不能批量插入数据,调用 KuduSession.flush()方法不会起任何作用,因为此时缓冲区数据已经被刷新到了服务器。

  • AUTO_FLUSH_BACKGROUND

AUTO_FLUSH_BACKGROUND,意思是调用KuduSession.apply()方法后,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。

  • MANUAL_FLUSH

MANUAL_FLUSH,意思是调用KuduSession.apply()方法后,会返回的非常快,但是写操作不会发送,直到用户使用flush()函数,如果缓冲区超过了配置的空间限制,KuduSession.apply()函数会返回一个错误。

四、查询数据

查询数据需要以下几个步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 获取查询数据的表。
  3. 开启scan扫描器,获取查询数据。
  4. 关闭KuduClient对象。

代码如下:

/**
 * 1.创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2.获取查询数据的表。
 */

KuduTable personInfo = kuduClient.openTable("personInfo");

/**
 * 3.开启scan扫描器,获取查询数据。
 */

//设置查询的表
KuduScanner scanner = kuduClient.newScannerBuilder(personInfo)
        //设置查询的列
        .setProjectedColumnNames(Arrays.asList("id""name""age""score"))
        .build();
//scanner 中是多个tablet
while(scanner.hasMoreRows()){
    //获取一个tablet 数据
    RowResultIterator rowResults = scanner.nextRows();
    while(rowResults.hasNext()){
        RowResult next = rowResults.next();
        int id = next.getInt("id");
        String name = next.getString("name");
        int age = next.getInt("age");
        double score = next.getDouble("score");
        System.out.println("id = "+id+",name = "+name+",age = "+age+",score = "+score);
    }
}
/**
 * 4.关闭KuduClient对象。
 */

kuduClient.close();

经过以上查询,可以查询出插入的数据。

五、​​​​​​​​​​​​​​修改数据

Kudu中修改数据与Kudu中插入数据很类似,需要更新一张表的某些数据,需要经过以下步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 获取更新数据的表。
  3. 准备更新数据对象Update, 准备更新数据PartialRow对象。
  4. 开启session会话,应用更新操作,更新数据。
  5. 关闭KuduClient对象。

代码如下:

/**
 * 1.创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2.获取更新数据的表。
 */

KuduTable personInfo = kuduClient.openTable("personInfo");

/**
 * 3.准备更新数据对象Update, 准备更新数据PartialRow对象。
 */

Update update = personInfo.newUpdate();
PartialRow row = update.getRow();
row.addInt("id"1);//主键列
//需要更新什么列就加上什么列,不能修改主键列
row.addString("name","lisi" );

/**
 * 4.开启session会话,应用更新操作,更新数据。
 */

KuduSession kuduSession = kuduClient.newSession();
kuduSession.apply(update);

/**
 * 5.关闭KuduClient对象。
 */

kuduSession.close();
kuduClient.close();

经过以上代码执行,可以查询表中数据,对应的列已经修改。

此外,更新数据除了有newUpdate()方法,还有newUpsert()方法,两者都可以对表中存在的主键进行更新操作,两者区别为如果更新的主键在表中不存在,newUpdate()操作没有任何变化,也不会报错,newUpsert()方法会将此条数据插入到表中,但是插入的这条数据所有列都应设置值,否则也将不会有任何操作。

六、​​​​​​​删除数据

Kudu中删除数据与Kudu中插入数据很类似,需要删除一张表的某些数据,需要经过以下步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 获取删除数据的表。
  3. 准备删除数据对象Delete, 准备删除数据PartialRow对象。
  4. 开启session会话,应用删除操作,删除数据。
  5. 关闭KuduClient对象。

代码如下:

/**
 * 1.创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2.获取删除数据的表。
 */

KuduTable personInfo = kuduClient.openTable("personInfo");

/**
 * 3.准备删除数据对象Delete, 准备删除数据PartialRow对象。
 */

Delete delete = personInfo.newDelete();
PartialRow row = delete.getRow();
//删除数据,只需要指定主键即可
row.addInt("id"1);

/**
 * 4.开启session会话,应用删除操作,删除数据。
 */

KuduSession kuduSession = kuduClient.newSession();
kuduSession.apply(delete);

/**
 * 5.关闭KuduClient对象。
 */

kuduSession.close();
kuduClient.close();

七、删除表

删除表需要经过如下步骤:

  1. 创建KuduClient对象,连接Kudu集群。
  2. 删除表。
  3. 关闭KuduClient对象。

代码如下:

/**
 * 1.创建KuduClient对象,连接Kudu集群。
 */

KuduClient kuduClient = new KuduClient.KuduClientBuilder("cm1:7051,cm2:7051").build();

/**
 * 2.删除表。
 */

if(kuduClient.tableExists("personInfo")){
    kuduClient.deleteTable("personInfo");
}

/**
 * 3.关闭KuduClient对象。
 */

kuduClient.close();

分类:

后端

标签:

大数据

作者介绍

Lanson
V1

CSDN大数据领域博客专家