|
| 1 | +# Apache GeaFlow 流图计算引擎 |
| 2 | + |
| 3 | +> Apache GeaFlow 是蚂蚁集团开源的性能世界一流的 OLAP 图数据库,支持万亿级图存储、图表混合处理、实时图计算、交互式图分析等核心能力。 |
| 4 | +
|
| 5 | +**项目地址**:[https://github.com/apache/geaflow](https://github.com/apache/geaflow) |
| 6 | + |
| 7 | +## 产品简介 |
| 8 | + |
| 9 | +### GeaFlow 起源 |
| 10 | + |
| 11 | +早期的大数据分析主要以离线处理为主,以 Hadoop 为代表的技术栈很好地解决了大规模数据的分析问题。然而数据处理的时效性不足,很难满足高实时需求的场景。 |
| 12 | + |
| 13 | +以 Storm 为代表的流式计算引擎的出现很好地解决了数据实时处理的问题,提高了数据处理的时效性。然而,Storm 本身不提供状态管理的能力,对于聚合等有状态的计算显得无能为力。 |
| 14 | + |
| 15 | +Flink 的出现很好地弥补了这一短板,通过引入状态管理以及 Checkpoint 机制,实现了高效的有状态流计算能力。 |
| 16 | + |
| 17 | +**面临的挑战** |
| 18 | + |
| 19 | +随着数据实时处理场景的丰富,尤其是在实时数仓场景下,实时关系运算(即 Stream Join)越来越多地成为数据实时化的难点。 |
| 20 | + |
| 21 | +Flink 虽然具备优秀的状态管理能力和出色的性能,然而在处理 Join 运算,尤其是 3 度以上 Join 时,性能瓶颈越来越明显。由于需要在 Join 两端存放各个输入的数据状态,当 Join 变多时,状态的数据量急剧扩大,性能也变得难以接受。 |
| 22 | + |
| 23 | +产生这个问题的本质原因是 Flink 等流计算系统以**表**作为数据模型,而表模型本身是一个二维结构,不包含关系的定义和关系的存储,在处理关系运算时只能通过 Join 运算方式实现,成本很高。 |
| 24 | + |
| 25 | +**GeaFlow 的解决方案** |
| 26 | + |
| 27 | +在蚂蚁的大数据应用场景中,尤其是金融风控、实时数仓等场景下,存在大量 Join 运算,如何提高 Join 的时效性和性能成为我们面临的重要挑战,为此我们引入了**图模型**。 |
| 28 | + |
| 29 | +图模型是一种以点边结构描述实体关系的数据模型,在图模型里面: |
| 30 | + |
| 31 | +- **点**代表实体 |
| 32 | +- **边**代表关系 |
| 33 | +- 数据存储层面点边存放在一起 |
| 34 | + |
| 35 | +因此,图模型天然定义了数据的关系,同时存储层面物化了点边关系。基于图模型,我们实现了新一代实时计算引擎 GeaFlow,很好地解决了复杂关系运算实时化的问题。 |
| 36 | + |
| 37 | +目前 GeaFlow 已广泛应用于**数仓加速**、**金融风控**、**知识图谱**以及**社交网络**等场景。 |
| 38 | + |
| 39 | +### 性能优势 |
| 40 | + |
| 41 | +相比传统的流式计算引擎如 Flink、Storm 这些以表为模型的实时处理系统,GeaFlow 以图为数据模型,在处理 Join 关系运算,尤其是复杂多跳的关系运算(如 3 跳以上的 Join、复杂环路查找)上具备极大的性能优势。 |
| 42 | + |
| 43 | +GeaFlow 在 [LDBC SNB-BI 基准测试](https://ldbcouncil.org/benchmarks/snb-bi/)中取得了世界一流的性能表现。 |
| 44 | + |
| 45 | +## 技术架构 |
| 46 | + |
| 47 | +GeaFlow 整体架构自下而上包括以下几层: |
| 48 | + |
| 49 | +### 1. DSL 层(语言层) |
| 50 | + |
| 51 | +GeaFlow 设计了 **SQL+GQL** 的融合分析语言,支持对表模型和图模型统一处理。 |
| 52 | + |
| 53 | +**DSL 架构特点**: |
| 54 | + |
| 55 | +- 典型的编译器技术架构:语法分析 → 语义分析 → 中间代码生成(IR) → 代码优化 → 目标代码生成 |
| 56 | +- 通过扩展 [Apache Calcite](https://calcite.apache.org/) 实现 SQL+GQL 的语法解析器 |
| 57 | +- 实现了大量优化规则(RBO),未来也会引入 CBO |
| 58 | +- 提供丰富的内置系统函数,支持自定义函数 |
| 59 | +- 支持自定义 Connector 插件,对接不同数据源 |
| 60 | + |
| 61 | +**两级 DAG 物理执行计划**: |
| 62 | + |
| 63 | +- **外层 DAG**:包含表处理算子和图处理迭代算子,是物理执行逻辑的主体 |
| 64 | +- **内层 DAG**:将图计算逻辑通过 DAG 方式展开,代表图迭代计算的具体执行方式 |
| 65 | + |
| 66 | +### 2. Framework 层(框架层) |
| 67 | + |
| 68 | +GeaFlow 设计了面向 Graph 和 Stream 的两套 API,支持流、批、图融合计算,并实现了基于 Cycle 的统一分布式调度模型。 |
| 69 | + |
| 70 | +### 3. State 层(存储层) |
| 71 | + |
| 72 | +GeaFlow 设计了面向 Graph 和 KV 的两套 API,支持表数据和图数据的混合存储: |
| 73 | + |
| 74 | +- 整体采用 Sharing Nothing 的设计 |
| 75 | +- 支持将数据持久化到远程存储 |
| 76 | +- 支持多种存储后端(如 RocksDB) |
| 77 | + |
| 78 | +### 4. Console 平台 |
| 79 | + |
| 80 | +GeaFlow 提供了一站式图研发平台,实现了: |
| 81 | + |
| 82 | +- 图数据的建模能力 |
| 83 | +- 图数据的加工能力 |
| 84 | +- 图数据的分析能力 |
| 85 | +- 图作业的运维管控支持 |
| 86 | + |
| 87 | +### 5. 执行环境 |
| 88 | + |
| 89 | +GeaFlow 可以运行在多种异构执行环境: |
| 90 | + |
| 91 | +- Kubernetes (K8S) |
| 92 | +- Ray |
| 93 | +- 本地模式 |
| 94 | + |
| 95 | +## 核心概念 |
| 96 | + |
| 97 | +### GraphView |
| 98 | + |
| 99 | +**GraphView** 是 GeaFlow 中最核心的数据抽象,表示基于图结构的虚拟视图。它是图物理存储的一种抽象,可以表示和操作分布在多个节点上的图数据。 |
| 100 | + |
| 101 | +在 GeaFlow 中,GraphView 是一等公民,用户对图的所有操作都是基于 GraphView。 |
| 102 | + |
| 103 | +**主要功能**: |
| 104 | + |
| 105 | +1. **图操作**:可以添加或删除点和边数据,也可以进行查询和基于某个时间点切片快照 |
| 106 | +2. **图介质**:可以存储到图数据库或其他存储介质(如文件系统、KV 存储、宽表存储、Native Graph 等) |
| 107 | +3. **图切分**:支持不同的图切分方法 |
| 108 | +4. **图计算**:可以进行图的迭代遍历或者计算 |
| 109 | + |
| 110 | +**示例**:定义一个 Social Network 的 GraphView,描述人际关系 |
| 111 | + |
| 112 | +```sql |
| 113 | +CREATE GRAPH social_network ( |
| 114 | + Vertex person ( |
| 115 | + id int ID, |
| 116 | + name varchar |
| 117 | + ), |
| 118 | + Edge knows ( |
| 119 | + person1 int SOURCE ID, |
| 120 | + person2 int DESTINATION ID, |
| 121 | + weight int |
| 122 | + ) |
| 123 | +) WITH ( |
| 124 | + storeType='rocksdb', |
| 125 | + shardCount = 128 |
| 126 | +); |
| 127 | +``` |
| 128 | + |
| 129 | +## 应用场景 |
| 130 | + |
| 131 | +### 1. 实时数仓加速 |
| 132 | + |
| 133 | +**场景描述**: |
| 134 | + |
| 135 | +数仓场景存在大量 Join 运算,在 DWD 层往往需要将多张表展开成一张大宽表,以加速后续查询。当 Join 的表数量变多时,传统的实时计算引擎很难保证 Join 的时效性和性能,这也成为目前实时数仓领域一个棘手的问题。 |
| 136 | + |
| 137 | +**GeaFlow 的解决方案**: |
| 138 | + |
| 139 | +- 以图作为数据模型,替代 DWD 层的宽表 |
| 140 | +- 实现数据实时构图 |
| 141 | +- 在查询阶段利用图的点边物化特性,极大加速关系运算的查询 |
| 142 | + |
| 143 | +### 2. 实时归因分析 |
| 144 | + |
| 145 | +**场景描述**: |
| 146 | + |
| 147 | +在信息化的大背景下,对用户行为进行渠道归因和路径分析是流量分析领域中的核心。通过实时计算用户的有效行为路径,构建出完整的转化路径,能够快速帮助业务看清楚产品的价值,帮助运营及时调整运营思路。 |
| 148 | + |
| 149 | +实时归因分析的核心要点: |
| 150 | + |
| 151 | +- **准确性**:在成本可控下保证用户行为路径分析的准确性 |
| 152 | +- **实效性**:计算的实时性足够高,才能快速帮助业务决策 |
| 153 | + |
| 154 | +**GeaFlow 的实现方式**: |
| 155 | + |
| 156 | +1. 通过实时构图将用户行为日志转换成用户行为拓扑图 |
| 157 | + - 用户作为图中的点 |
| 158 | + - 每个行为构建成从该用户指向埋点页面的一条边 |
| 159 | +2. 利用流图计算能力分析用户行为子图 |
| 160 | +3. 在子图上基于归因路径匹配规则进行匹配计算 |
| 161 | +4. 得出成交行为相应用户的归因路径,并输出到下游系统 |
| 162 | + |
| 163 | +### 3. 实时反套现 |
| 164 | + |
| 165 | +**场景描述**: |
| 166 | + |
| 167 | +在信贷风控场景下,如何进行信用卡反套现是一个典型的风控诉求。基于现有的套现模式分析,可以看到套现是一个环路子图,如何快速、高效地在大图中判定套现,将极大地增加风险的识别效率。 |
| 168 | + |
| 169 | +**GeaFlow 的实现方式**: |
| 170 | + |
| 171 | +1. 将实时交易流、转账流等输入数据源转换成实时交易图 |
| 172 | +2. 根据风控策略对用户交易行为做图特征分析(如环路检查等特征计算) |
| 173 | +3. 实时提供给决策和监控平台进行反套现行为判定 |
| 174 | + |
| 175 | +通过 GeaFlow 实时构图和实时图计算能力,可以快速发现套现等异常交易行为,极大降低平台风险。 |
| 176 | + |
| 177 | +## 快速上手 |
| 178 | + |
| 179 | +### 环境准备 |
| 180 | + |
| 181 | +编译 GeaFlow 依赖以下环境: |
| 182 | + |
| 183 | +- JDK 8 |
| 184 | +- Maven(推荐 3.6.3 及以上版本) |
| 185 | +- Git |
| 186 | +- Docker(可选,用于运行 Console) |
| 187 | + |
| 188 | +### 编译源码 |
| 189 | + |
| 190 | +执行以下命令来编译 GeaFlow 源码: |
| 191 | + |
| 192 | +```bash |
| 193 | +# 下载源码 |
| 194 | +git clone https://github.com/apache/geaflow.git |
| 195 | +cd geaflow/ |
| 196 | + |
| 197 | +# 构建项目 |
| 198 | +./build.sh --module=geaflow --output=package |
| 199 | +``` |
| 200 | + |
| 201 | +### 运行示例任务 |
| 202 | + |
| 203 | +**方式一:命令行提交任务** |
| 204 | + |
| 205 | +运行一个实时环路查找的图计算作业: |
| 206 | + |
| 207 | +```bash |
| 208 | +./bin/gql_submit.sh --gql geaflow/geaflow-examples/gql/loop_detection_file_demo.sql |
| 209 | +``` |
| 210 | + |
| 211 | +这是一段实时查询图中所有四度环路的 DSL 计算作业。 |
| 212 | + |
| 213 | +**方式二:使用 Console 平台** |
| 214 | + |
| 215 | +1. 构建 Console jar 和镜像(需启动 Docker): |
| 216 | + |
| 217 | +```bash |
| 218 | +./build.sh --module=geaflow-console |
| 219 | +``` |
| 220 | + |
| 221 | +2. 启动 Console: |
| 222 | + |
| 223 | +```bash |
| 224 | +docker run -d --name geaflow-console -p 8888:8888 geaflow-console:0.1 |
| 225 | +``` |
| 226 | + |
| 227 | +3. 在浏览器中访问 `http://localhost:8888`,体验白屏化的图作业提交 |
| 228 | + |
| 229 | +### 示例:环路检测 |
| 230 | + |
| 231 | +以下是一个完整的环路检测示例 SQL: |
| 232 | + |
| 233 | +```sql |
| 234 | +set geaflow.dsl.window.size = 1; |
| 235 | +set geaflow.dsl.ignore.exception = true; |
| 236 | + |
| 237 | +-- 创建图模型 |
| 238 | +CREATE GRAPH IF NOT EXISTS dy_modern ( |
| 239 | + Vertex person ( |
| 240 | + id bigint ID, |
| 241 | + name varchar |
| 242 | + ), |
| 243 | + Edge knows ( |
| 244 | + srcId bigint SOURCE ID, |
| 245 | + targetId bigint DESTINATION ID, |
| 246 | + weight double |
| 247 | + ) |
| 248 | +) WITH ( |
| 249 | + storeType='rocksdb', |
| 250 | + shardCount = 1 |
| 251 | +); |
| 252 | + |
| 253 | +-- 创建数据源表 |
| 254 | +CREATE TABLE IF NOT EXISTS tbl_source ( |
| 255 | + text varchar |
| 256 | +) WITH ( |
| 257 | + type='file', |
| 258 | + `geaflow.dsl.file.path` = 'resource:///demo/demo_job_data.txt', |
| 259 | + `geaflow.dsl.column.separator`='|' |
| 260 | +); |
| 261 | + |
| 262 | +-- 创建结果表 |
| 263 | +CREATE TABLE IF NOT EXISTS tbl_result ( |
| 264 | + a_id bigint, |
| 265 | + b_id bigint, |
| 266 | + c_id bigint, |
| 267 | + d_id bigint, |
| 268 | + a1_id bigint |
| 269 | +) WITH ( |
| 270 | + type='file', |
| 271 | + `geaflow.dsl.file.path` = '/tmp/geaflow/demo_job_result' |
| 272 | +); |
| 273 | + |
| 274 | +-- 使用图 |
| 275 | +USE GRAPH dy_modern; |
| 276 | + |
| 277 | +-- 插入点数据 |
| 278 | +INSERT INTO dy_modern.person(id, name) |
| 279 | + SELECT |
| 280 | + cast(trim(split_ex(t1, ',', 0)) as bigint), |
| 281 | + split_ex(trim(t1), ',', 1) |
| 282 | + FROM ( |
| 283 | + Select trim(substr(text, 2)) as t1 |
| 284 | + FROM tbl_source |
| 285 | + WHERE substr(text, 1, 1) = '.' |
| 286 | + ); |
| 287 | + |
| 288 | +-- 插入边数据 |
| 289 | +INSERT INTO dy_modern.knows |
| 290 | + SELECT |
| 291 | + cast(split_ex(t1, ',', 0) as bigint), |
| 292 | + cast(split_ex(t1, ',', 1) as bigint), |
| 293 | + cast(split_ex(t1, ',', 2) as double) |
| 294 | + FROM ( |
| 295 | + Select trim(substr(text, 2)) as t1 |
| 296 | + FROM tbl_source |
| 297 | + WHERE substr(text, 1, 1) = '-' |
| 298 | + ); |
| 299 | + |
| 300 | +-- 查询环路并输出结果 |
| 301 | +INSERT INTO tbl_result |
| 302 | + SELECT DISTINCT |
| 303 | + a_id, |
| 304 | + b_id, |
| 305 | + c_id, |
| 306 | + d_id, |
| 307 | + a1_id |
| 308 | + FROM ( |
| 309 | + MATCH (a:person) -[:knows]->(b:person) -[:knows]-> (c:person) |
| 310 | + -[:knows]-> (d:person) -> (a) |
| 311 | + RETURN a.id as a_id, b.id as b_id, c.id as c_id, |
| 312 | + d.id as d_id, a.id as a1_id |
| 313 | + ); |
| 314 | +``` |
| 315 | + |
| 316 | +## 开发手册 |
| 317 | + |
| 318 | +GeaFlow 支持 **DSL** 和 **API** 两套编程接口: |
| 319 | + |
| 320 | +### DSL 开发 |
| 321 | + |
| 322 | +通过 GeaFlow 提供的类 SQL 扩展语言 SQL+ISO/GQL 进行流图计算作业的开发。 |
| 323 | + |
| 324 | +**特点**: |
| 325 | + |
| 326 | +- 融合了 SQL 和 GQL(图查询语言) |
| 327 | +- 支持图表一体化分析 |
| 328 | +- 声明式编程,简单易用 |
| 329 | + |
| 330 | +**适用场景**: |
| 331 | + |
| 332 | +- 复杂的图查询和分析 |
| 333 | +- 图表混合处理 |
| 334 | +- 快速原型开发 |
| 335 | + |
| 336 | +### API 开发 |
| 337 | + |
| 338 | +通过 GeaFlow 的高阶 API 编程接口,使用 Java 语言进行应用开发。 |
| 339 | + |
| 340 | +**特点**: |
| 341 | + |
| 342 | +- 面向 Graph 和 Stream 的两套 API |
| 343 | +- 支持流、批、图融合计算 |
| 344 | +- 灵活的编程控制 |
| 345 | + |
| 346 | +**适用场景**: |
| 347 | + |
| 348 | +- 需要精细控制计算逻辑 |
| 349 | +- 复杂的自定义算法实现 |
| 350 | +- 与其他系统深度集成 |
| 351 | + |
| 352 | +## 相关学习资源 |
| 353 | + |
| 354 | +- 设计论文:[GeaFlow: A Graph Extended and Accelerated Dataflow System](https://dl.acm.org/doi/abs/10.1145/3589771) |
| 355 | +- 官方文档:https://geaflow.apache.org/ |
| 356 | +- LDBC SNB-BI 基准测试:https://ldbcouncil.org/benchmarks/snb-bi/ |
| 357 | + |
| 358 | +--- |
| 359 | + |
| 360 | +*本文档基于 Apache GeaFlow 官方文档整理,更多详细信息请访问 [GeaFlow 官方文档](https://geaflow.apache.org/)。* |
0 commit comments