f

fishgotfamous

V1

2022/12/08阅读:13主题:默认主题

基于 YMatrix 的海量数据轨迹关联计算实现

基于 YMatrix 的海量数据轨迹关联计算实现

1 前言

统计与分析轨迹和地理区域的关联问题是 GIS 领域主要的应用类别之一。最早的 GIS 应用以桌面应用程序为主,随着需处理应用越来越复杂,GIS 的开发框架也在逐年演进。就像复杂的数据操作和应用催生了以数据独立性为主旨的数据库一样,Spatial RDBMS (空间数据库) 也就是 GIS in Database 逐渐形成。我们知道,在数据库内,数据的操作被逐渐标准化,让更多的应用开发有了更低的门槛。 image
随着万物互联时代的到来,车联网、物联网以及人们的出行产生了大量时空数据。疫情的爆发也催生了海量时空数据交联的分析需求。如何在每天新增数十亿条的时空数据中发现时空关联关系成为重要挑战。将数据放到一个节点的数据库中分析已无法满足需求,我们需要在分布式的数据基座中来完成数据的关联。 超融合数据库 YMatrix 基于PostgreSQL 与 Greenplum 深度优化而来。具有前者优秀的 GIS 能力与后者的分布式、库内计算能力的同时,在存储引擎层的深度优化让其在性能上更上一个新的台阶。不仅支持海量时空数据的接入,存储与库内计算,而且可以完善支持标准SQL(从SQL92到SQL2016),支持其他多模态数据类型,包括关系数据、JSON数据、文本数据、图片等;支持多模式操作,包括高并发低延迟增删改查、点查、明细查询、聚合查询、窗口查询、关联查询、多维查询、库内机器学习等,是一款轻量级、高性能数据基座。 image
YMatrix 为 GIS 数据应用插上了分布式的翅膀,解放了资源的可扩展性,同时提高了单位资源的计算性能,本文将通过一个例子来介绍如何基于 YMatrix 来进行库内的时空数据分析。

2 案例描述

GIS 应用中常见的需求是轨迹与区域之间关联关系的查询实现。比如查询某段时间通过某一区域的轨迹有哪些,为方便实现上的理解,举一个防疫中时空关联的具体的例子来说明。 注意:为方便理解,将采用相对坐标说明。实际数据是以GPS转换后的GEOMETRY类型计算,精度会更高,但是实现原理一致。 image
通过上图,我们构建一个简单的地图坐标系,最左下角是(0 0)点。我们定义两个风险区域 Area1 与Area2 ,分别位于门头沟、昌平和海淀的交界与昌平顺义与怀柔的交界。每个风险区域有自己的开始结束时间。再定义3条轨迹,Line1、Line2与Line3 来模拟人的行动。实际上,轨迹和区域都会有很多,而这正是分布式计算的优势所在。

3 实现描述

考虑到实际情况中我们是对千万人和上百个区域的关联分析,并且人员的轨迹数据会持续的加载进数据库,业务方的查询并发也非常高。因此要采用一定的数据分层思路,通过定期对现有数据进行筛查并且将结果存入结果表,再以相对简单的逻辑提供给业务方查询,可以避免大量并发带来的重复计算。 以下为具体的操作步骤,在有测试环境的前提下,可以一步一步操作直到得出最终结果。(YMatrix 的安装参考见文末)

3.1 地理信息算法扩展

针对地理信息应用,YMatrix 支持地理信息数据插件 PostGIS,提供丰富的空间信息服务功能,如空间对象、空间索引、空间操作函数和空间操作符等。同时,地理信息数据插件遵循 OpenGIS 的规范,支持十几种标准空间数据类型,内置 200 多种相关的数据处理分析函数。基于此,我们可以建立空间索引来更完成高效的查询,也可以通过其内置的函数分析空间交叠、轨迹交叉等问题。其安装方法如下: 在每个节点上安装rpm包(postgis依赖geos39,所以先安装依赖):

sudo yum install https://ftp.postgresql.org/pub/repos/yum/common/redhat/rhel-7-x86_64/geos39-3.9.1-1.rhel7.x86_64.rpm
sudo yum install mxdb-postgis-2.5-1.el7.x86_64.rpm

连接数据库并创建扩展: CREATE EXTENSION postgis;

3.2 建表与数据录入

1. 结合坐标系,Area 按 PostGIS 的规则表示如下:

Area1: 'POLYGON((18 18,18.5 19,19 19,19 17.5,19.5 17,19.5 16.5,19 16.5,
                 18.5 16,19 15.5,19 14.5,18.5 14,18 14.5,17.5 14.5,
                 17.5 16,17 16,17 17,18 17,18 18))'
       '2022-11-23 10:00:00','2022-11-30 10:00:00'
Area2: 'POLYGON((26.5 21,27.5 21.5,28.5 21,28 20,26.5 19.5,26.5 21))'
       '2022-11-15 12:00:00','2022-11-24 12:00:00'

创建 area_of_interest 表并将该信息录入:

CREATE TABLE area_of_risk (
    area_id         bigint                        -- 区域ID
  , area_name       varchar(30)                   -- 区域名称
  , area_detail     geometry                      -- 描述区域的数据类型
  , start_time      timestamp without time zone   -- 风险开始时间
  , end_time        timestamp without time zone   -- 风险结束时间
DISTRIBUTED REPLICATED;
INSERT INTO area_of_risk VALUES
(1,
 'area1',
 ST_GeometryFromText('POLYGON((18 18,18.5 19,19 19,19 17.5,19.5 17,19.5 16.5,19 16.5,18.5 16,19 15.5,19 14.5,18.5 14,18 14.5,17.5 14.5,17.5 16,17 16,17 17,18 17,18 18))',4269),
 '2022-11-23 10:00:00',
 '2022-11-30 10:00:00'),
(2,
'area2',
ST_GeometryFromText('POLYGON((26.5 21,27.5 21.5,28.5 21,28 20,26.5 19.5,26.5 21))',4269),
 '2022-11-30 10:00:00',
 '2022-12-07 10:00:00');

数据插入后查看结果:

SELECT area_id,area_name,ST_AsText(area_detail) FROM area_of_risk;

image
2. 轨迹时序表: 如图所示,另外定义三个轨迹,Line1、Line2、Line3,分别来自于三个目标(也就是形成轨迹的人),实际工作中,我们的Line信息,随着时间产生,格式应该是时序数据,格式类似于如下:

时间:'2022-11-24 10:27:30',
目标ID:'Tar-0',
轨迹ID:'Tar-0-221124',
定位:ST_GeometryFromText('POINT(20 22)',4269))

因此首先,我们按上述格式创建travel_path_ts

CREATE EXTENSION IF NOT EXISTS matrixts;
ALTER EXTENSION matrixts UPDATE;

CREATE TABLE travel_path_ts (
    ts            timestamp without time zone     -- 时间
  , target_id     varchar(50)                     -- 目标ID
  , trajectory_id varchar(30)                     -- 轨迹编号
  , position      geography(POINT,4269)           -- 定位
)
USING mars2
DISTRIBUTED BY (target_id)
PARTITION BY RANGE(ts)
(
  START ('2022-08-01') INCLUSIVE
  END ('2022-09-30') EXCLUSIVE
  EVERY (INTERVAL '24 hour'),
  DEFAULT PARTITION default_p
);
CREATE INDEX travel_path_ts_index ON travel_path_ts USING mars2_btree(target_id,ts);

再将示例数据按时序格式录入至travel_path_ts 表:

INSERT INTO travel_path_ts VALUES
('2022-11-24 10:27:30','Tar-0','Tar-0-20221124',ST_GeometryFromText('POINT(09 15)',4269)),
('2022-11-24 10:57:30','Tar-0','Tar-0-20221124',ST_GeometryFromText('POINT(14 16)',4269)),
('2022-11-24 11:17:30','Tar-0','Tar-0-20221124',ST_GeometryFromText('POINT(19 19)',4269)),
('2022-11-24 12:27:30','Tar-0','Tar-0-20221124',ST_GeometryFromText('POINT(27 21)',4269)),
('2022-11-24 13:07:30','Tar-0','Tar-0-20221124',ST_GeometryFromText('POINT(35 26)',4269)),
('2022-11-30 08:39:30','Tar-1','Tar-1-20221031',ST_GeometryFromText('POINT(27 21)',4269)),
('2022-11-30 09:27:30','Tar-1','Tar-1-20221031',ST_GeometryFromText('POINT(30 13)',4269)),
('2022-11-30 10:37:30','Tar-1','Tar-1-20221031',ST_GeometryFromText('POINT(34 10)',4269)),
('2022-11-30 11:42:30','Tar-1','Tar-1-20221031',ST_GeometryFromText('POINT(35 03)',4269)),
('2022-11-15 15:21:30','Tar-2','Tar-2-20221015',ST_GeometryFromText('POINT(14 08)',4269)),
('2022-11-15 16:53:30','Tar-2','Tar-2-20221015',ST_GeometryFromText('POINT(25 06)',4269)),
('2022-11-15 18:32:30','Tar-2','Tar-2-20221015',ST_GeometryFromText('POINT(34 10)',4269));

数据插入后查看结果:

SELECT ts,target_id,trajectory_id,ST_AsText(positionFROM travel_path_ts;

image

3.3 按照目标ID精准查询轨迹

至此,我们可以通过系统进行指定目标、指定轨迹的精准查询:

select target_id,
       time_bucket_gapfill('3 second', ts) tb,
       last_not_null_value(position, ts) as signals
from travel_path_ts
where target_id = 'Tar-0'
    and ts >= '2022-11-24 00:00:00'
    and ts < '2022-11-24 23:59:59'
group by target_id, tb
order by tb;

3.4 聚合定时任务查询,维度表和时序表关联结果表

除按目标ID的精准查询外,还有一类查询非常常用,就是前文提到的根据指定区域查经过这个区域的轨迹。当然可以直接进行关联查询,但对于海量数据的查询,为提高后续查询效率,考虑到区域的信息变动会远小于人们的行程数据接入。可以定时的将人员信息聚合,并进一步和风险区域进行聚合,发现高危人员。因此,设定人员整合表如下:

CREATE TABLE travel_path_agg (
    target_id           varchar(50)                   -- 目标ID
  , trajectory_id       varchar(30)                   -- 轨迹编号
  , trajectory_path     text                          -- 轨迹,用于存储聚合后的linestring
  , start_time          timestamp without time zone   -- 轨迹开始时间
  , end_time            timestamp without time zone   -- 轨迹结束时间
  , area_of_risk   bigint []
DISTRIBUTED BY (target_id);
-- 将一系列point数据通过makeline聚合成航迹信息,存入trajectory_path中。
-- 起止实际聚合用于后续可能的过滤条件。
-- 其他信息同航迹内相同,可直接存储

再通过未来每日执行的聚集任务来插入 trajectory_agg 表,设计类似如下语句:

INSERT INTO travel_path_agg
SELECT target_id,trajectory_id,
       ST_MakeLine(point) as trajectory_path,
       min(ts) as trajectory_start_time,
       max(ts) as trajectory_end_time
FROM travel_path_ts
GROUP BY target_id,trajectory_id;
查看初步聚合结果:
select
    target_id,
    trajectory_id,
    ST_AsText(trajectory_path),
    start_time,
    end_time
from travel_path_agg;

image
接下来,我们要更新 travel_path_agg 表中的 area_of_risk 字段,这个字段用于存储该轨迹与多少我们关注的风险区域之间存在时空关联,是我们未来查询指定区域经过了多少轨迹的依据。 我们通过如下语句实现轨迹的经过判断,并将经过的区域存储于 travel_path_agg 表中的 area_of_risk 字段:

with
aoi_ta_join as (
select * from travel_path_agg ta
left join area_of_risk aor
on ta.trajectory_path && aor.area_detail
and  (ta.start_time, ta.end_time) OVERLAPS (aor.start_time, aor.end_time)),
aoi_ta_join_group as (
select trajectory_id,array_agg(area_id)as areas_of_risk from aoi_ta_join
where area_id is not null group by trajectory_id)
update travel_path_agg
set area_of_risk = atjg.areas_of_risk
from aoi_ta_join_group atjg
where travel_path_agg.trajectory_id = atjg.trajectory_id;

至此,我们通过每日定时任务,拿到了轨迹聚合信息与经过的关键区域信息,基于示例数据的查询结果如下,其中 area_of_risk 是一个数组,保存了经过关键区域的 area_idimage
这里需要注意,定时任务的聚合计算会有更高的效率,但同时其只判断行程起止时间和风险区域的时间是否相关,可以说是进行了一次初筛。初筛不会漏掉风险信息,但是可能会造成信息的误判,要再执行精准查询来确定,初筛最大的作用是过滤掉明细无风险的条目。但由于精准查询要查询最底层的时序表,数据量非常大,如果不进行初筛会造成非常大的资源消耗,远远大于初筛和精准查询的组合。精准查询基于初筛表,执行逻辑如下:

with path_of_interest_detail as
(select ts,ti.target_id,trajectory_id,ST_AsText(position),
        area_of_risk.area_detail,area_of_risk.area_id,
        area_of_risk.start_time,area_of_risk.end_time
 from
    (select target_id,area_of_risk
     from travel_path_agg
     where array_length(area_of_risk,1) >= 1)ti
 left join travel_path_ts
 on ti.target_id = travel_path_ts.target_id
 left join area_of_risk
 on array_position(ti.area_of_risk,area_of_risk.area_id)>0
and area_of_risk.start_time <= travel_path_ts.ts
nd travel_path_ts.ts <= area_of_risk.end_time
and position && area_of_risk.area_detail),
aoi_ta_join_group as
(select trajectory_id,array_agg(area_id) as areas_of_risk
from path_of_interest_detail
where area_id is not null
group by trajectory_id),
acture_result as
(select ti.trajectory_id,areas_of_risk
 from aoi_ta_join_group
 right join
(select trajectory_id
 from travel_path_agg
 where array_length(area_of_risk,1) >= 1)ti
 on ti.trajectory_id = aoi_ta_join_group.trajectory_id)
update travel_path_agg
set area_of_risk = ar.areas_of_risk
from acture_result ar
where travel_path_agg.trajectory_id = ar.trajectory_id;

结果如下,可以看到,初筛中因时间范围不够精细而误判的Tar-1风险关联被正确的排除,但真正关联的Tar-0不会因此被忽略,从结果上看,与样例图中的轨迹经过关系一致: image
值得一提的是,上述任务可以的执行时机不仅限于行程数据的输入,在风险区域新增时亦可执行来根据新的风险区域数据更新 travel_path_agg 表,让查询的结果具备更高的实效性。

3.5 查询经过指定区域的轨迹信息

select trajectory_id from travel_path_agg
where array_position(area_of_risk,2::bigint) > 0;
-- 2::bigint 指 area_id,转换成bigint类型以适应数组元素类型。
-- 整个查询的核心是判断经过关系,也可以加入其他列的查询与其他 where 条件限制。

3.6 小结

经过上述步骤,我们初步实现了轨迹空间经过关系批量处理的数据框架。实际上还会更复杂,比如经过的判定与风险区域的时间窗口更新机制等,均可基于相同的思路来实现。

4 总结

YMatrix 作为一款轻量级的分布式超融合数据基座,与时空交联类应用复杂、性能挑战大的场景匹配。这些系统基于 YMatrix 来实现不但可以达到甚至超出性能预期,在实施和后续的运维中也将事半功倍。让使用者从繁琐的数据底层抽身,更关注业务本身,更好的完成任务。

5 附录

YMatrix 安装连接:YMatrix - 快速安装

分类:

后端

标签:

大数据

作者介绍

f
fishgotfamous
V1