进击云原生

V1

2022/03/21阅读:32主题:前端之巅同款

ksqlDB 访问 Apache Kafka 记录标头,JSON 函数使处理非结构化数据更加容易

ksqlDB 0.24,访问 Apache Kafka 记录标头,JSON 函数使处理非结构化数据更加容易

我们很高兴地宣布 ksqlDB 0.24!它具有一系列改进和新功能。访问 Apache Kafka ® 记录标头将支持涉及分析和处理现有 Kafka 主题的大量新用例。JSON 函数使处理非结构化数据变得更加容易。最后,该LIMIT子句允许您限制从拉查询发出的记录数,从而允许您控制处理客户端的数据量。查看更新日志以获取完整的更新列表。

访问记录头数据

Kafka 记录由键、值、元数据和可选标头组成。ksqlDB 允许用户访问记录键、值和各种元数据。从 0.24 开始,用户还可以通过声明要由记录的标题填充的列来访问标题。标头通常包含有关记录的元数据,ksqlDB 可以将其用于路由或处理。

使用HEADERS语法,用户可以使用 SQL 访问完整的标头或标头中的特定键,如下例所示。

CREATE STREAM orders (
name STRING,
price DECIMAL(4,2),
headers ARRAY<STRUCT> HEADERS
WITH (
 kafka_topic=’uploads’;
 value_format=’json’,
 partitions=6;
);

SELECT * FROM uploads EMIT CHANGES;
+--------+--------+---------------------------------------+
| NAME   | PRICE  | HEADERS                               |
+--------+--------+---------------------------------------+
| chips  | 2.30   | [{‘currency’,’VVNE’}]                 |
+--------+--------+---------------------------------------+
| coffee | 3.05   | [{‘currency’,’Q0FE’},{‘id’,’MjQ2..’}] |
+--------+--------+---------------------------------------+
| burger | 8.72   | []                                    |
+--------+--------+---------------------------------------+

访问主题标题中的特定键:

CREATE STREAM uploads (
name STRING,
photo BYTES,
image_format BYTES HEADER(‘format’)
WITH (
 kafka_topic=’uploads’;
 value_format=’json’,
 partitions=6;
);

SELECT name, photo, from_bytes(image_format, ‘hex’) as format  FROM uploads EMIT CHANGES;
+--------+---------+--------+
| NAME   | PHOTO   | FORMAT |
+--------+---------+--------+
| stars  | JKwg..  | jpeg   |
+--------+---------+--------+
| horse  | Eqwq..  | png    |
+--------+---------+--------+
| ocean  | MxKs..  | NULL   |
+--------+---------+--------+

JSON 函数

ksqlDB 0.24 带有帮助处理 JSON 格式字符串的新函数。JSON 是 Web 应用程序和许多其他场景(如物联网)的事实上的格式。结果,我们的许多客户向 Kafka 发送 JSON 消息。

通过新添加的功能,ksqlDB 用户可以处理在字符串字段中编码的 JSON 结构,并将任何 ksqlDB 数据类型序列化为 JSON 字符串。

IS_JSON_STRING检查给定字符串是否包含有效的 JSON。此功能有助于确保成功解析下游结果。考虑从外部系统导入的消息流。由于无法控制导入消息的质量,最好过滤掉无效记录以防止下游重复错误消息:

CREATE STREAM raw_messages (message STRINGWITH (kafka_topic='messages', VALUE_FORMAT='JSON');
CREATE STREAM json_messages
    AS SELECT message as json_message
    FROM raw_messages
    WHERE IS_JSON_STRING(message);

JSON_CONCAT合并两个或多个 JSON 结构。考虑一个类似的示例,其中流包含多个列,其中包含仅在应用程序上下文中一起使用的 JSON 对象。使用 ksqlDB,我们可以将它们合并在一起:

CREATE STREAM raw_messages (message1 STRING, message2 STRINGWITH (kafka_topic='messages', VALUE_FORMAT='JSON');
CREATE STREAM json_messages
    AS SELECT message1, message2, JSON_CONCAT(message1, message2)
    FROM raw_messages
    WHERE IS_JSON_STRING(message1) AND IS_JSON_STRING(message2);

JSON_RECORDSJSON_KEYS从 JSON 格式的字符串中相应地提取键值对和键:

SELECT JSON_RECORDS(json_message) as keys FROM json_messages;


+-------------------------------------------------------------------+
|KEYS                                                        |
+-------------------------------------------------------------------+
|{type="deprecated", amount=123.5, id=1}                |
|{type="standard", amount=10, id=null}          |
|{type="extra", amount=5, id=3}                                     |

SELECT JSON_KEYS(json_message) as keys FROM json_messages;

+------------------------------------------------------------------------------------+
|KEYS                                                                                |
+------------------------------------------------------------------------------------+
|[type, amount, id]                                                                  |
|[type, amount, id]                                                                  |
|[type, amount, id]

JSON_ARRAY_LENGTH计算编码为 JSON 字符串的数组的长度:

SELECT json_message as phones, JSON_ARRAY_LENGTH(json_message) as phones_length FROM json_messages;

+----------------------------------------------------------------------+-------------+
|PHONES                                                                |PHONES_LENGTH|
+----------------------------------------------------------------------+-------------+
|["(984) 459-0666", "(253) 803-6544"]                                  |2            |
|["(232) 891-6803"]                                                    |1            |
|["(342) 603-4952", "(891) 987-2476", "(792) 932-4901"]                |4            |

最后,TO_JSON_STRING将任何 ksqlDB 数据类型转换回 JSON 字符串:

INSERT INTO raw_messages (message)
       VALUES (TO_JSON_STRING(STRUCT(
          c := ARRAY[51015],
          d := MAP(
            'x' := STRUCT(e := 'v1', f := true),
            'y' := STRUCT(e := 'v2', f := false)
       ))));

INSERT INTO raw_messages (message)
       VALUES (TO_JSON_STRING(STRUCT(
          c := ARRAY[248],
          d := MAP(
            'x' := STRUCT(e := 'v3', f := true),
            'y' := STRUCT(e := 'v4', f := true)
       ))));

INSERT INTO raw_messages (message)
       VALUES (TO_JSON_STRING(STRUCT(
          c := ARRAY[369],
          d := MAP(
            'x' := STRUCT(e := 'v5', f := false),
            'y' := STRUCT(e := 'v6', f := false)
       ))));

SELECT * FROM raw_messages;

+------------------------------------------------------------------------------------+
|MESSAGE                                                                             |
+------------------------------------------------------------------------------------+
|{"C":[5,10,15],"D":{"x":{"E":"v1","F":true},"y":{"E":"v2","F":false}}}              |
|{"C":[2,4,8],"D":{"x":{"E":"v3","F":true},"y":{"E":"v4","F":true}}}                 |
|{"C":[3,6,9],"D":{"x":{"E":"v5","F":false},"y":{"E":"v6","F":false}}}               |


拉取查询LIMIT子句

拉取查询现在支持LIMIT您可能已经熟悉的子句。用户现在可以通过使用子句对 aSTREAM或 a 执行拉取查询来限制返回的行数。拉取查询的语法是:TABLE``LIMIT

SELECT select_expr [, ...]
  FROM table/stream
  [ WHERE where_condition ]
  [ AND window_bounds ]
  [ LIMIT num_records ];

在 a 上执行带有LIMIT子句的拉取查询STREAM

CREATE STREAM STUDENTS (ID STRING KEY, SCORE INT)
  WITH (kafka_topic='students_topic', value_format='JSON'partitions=4);

SELECT * FROM STUDENTS
  LIMIT 3;

+---------------+---------------+
|ID             |SCORE          |
+---------------+---------------+
|ayala          |97             |
|spock          |48             |
|janice         |87             |
Limit Reached

LIMIT同样,用户也可以使用 a 子句执行拉取查询TABLE您可以在此处
了解有关拉取查询的更多信息。

使用来自模式 ID 的模式创建表和流

从 0.24 开始,用户可以指定 a key_schema_idor value_schema_idin stream 和 table-creation 命令。ID 指定的模式将在模式注册表中查找并用于创建逻辑模式以及序列化和反序列化数据。您可以在文档KLIP中了解更多信息。

例子:

在架构注册表中创建一个使用显式键架构和值架构的流。ksqlDB 将根据各自的 Avro 模式推断键和值列。

CREATE STREAM pageviews WITH (
    KAFKA_TOPIC='pageviews-input',
    FORMAT='AVRO',

    KEY_SCHEMA_ID=1,
    VALUE_SCHEMA_ID=2
  );

创建一个写入表并应用显式值模式的持久查询。记录值将根据此模式写入输出主题。

CREATE TABLE pageviews_count
  WITH (
    VALUE_FORMAT='AVRO',
    VALUE_SCHEMA_ID=3,
    KAFKA_TOPIC='pageviews-count-output'
  ) AS
  SELECT
    pageId,
    count(*)
  FROM pageviews
  GROUP BY pageID

开始使用 ksqlDB

我们很高兴能够推出这些功能并改进产品。有关更改的更多详细信息,请参阅更改日志立即通过独立发行版Confluent开始使用 ksqlDB ,并加入社区提出问题并寻找新资源。

翻译

Announcing ksqlDB 0.24.0

关注

本文首发于微信公众号【进击云原生】,扫左侧码关注,了解更多咨询,更有免费资源供您学习 扫码关注,加群学习

分类:

后端

标签:

后端

作者介绍

进击云原生
V1

公众号:进击云原生