Skip to content

Commit 1e60ff7

Browse files
[HUDI-8670][DOCS] Blog post for non-blocking concurrency control (#12443)
* [HUDI-8670][DOCS] Blog post for non-blocking concurrency control * Fix the details of header * [DOCS] Updating blog --------- Co-authored-by: vinoth chandar <[email protected]>
1 parent 5afbfef commit 1e60ff7

File tree

4 files changed

+167
-0
lines changed

4 files changed

+167
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
---
2+
title: "Introducing Hudi's Non-blocking Concurrency Control for streaming, high-frequency writes"
3+
excerpt: "Announcing the Non-blocking Concurrency Control in Apache Hudi"
4+
author: Danny Chan
5+
category: blog
6+
image: /assets/images/blog/non-blocking-concurrency-control/lsm_archive_timeline.png
7+
tags:
8+
- design
9+
- streaming ingestion
10+
- multi-writer
11+
- concurrency-control
12+
- blog
13+
---
14+
15+
## Introduction
16+
17+
In streaming ingestion scenarios, there are plenty of use cases that require concurrent ingestion from multiple streaming sources.
18+
The user can union all the upstream source inputs into one downstream table to collect the records for unified access across federated queries.
19+
Another very common scenario is multiple stream sources joined together to supplement dimensions of the records to build a wide-dimension table where each source
20+
stream is taking records with partial table schema fields. Common and strong demand for multi-stream concurrent ingestion has always been there.
21+
The Hudi community has collected so many feedbacks from users ever since the day Hudi supported streaming ingestion and processing.
22+
23+
Starting from [Hudi 1.0.0](https://hudi.apache.org/releases/release-1.0.0), we are thrilled to announce a new general-purpose
24+
concurrency model for Apache Hudi - the Non-blocking Concurrency Control (NBCC)- aimed at the stream processing or high-contention/frequent writing scenarios.
25+
In contrast to [Optimistic Concurrency Control](/blog/2021/12/16/lakehouse-concurrency-control-are-we-too-optimistic/), where writers abort the transaction
26+
if there is a hint of contention, this innovation allows multiple streaming writes to the same Hudi table without any overhead of conflict resolution, while
27+
keeping the semantics of [event-time ordering](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/) found in streaming systems, along with
28+
asynchronous table service such as compaction, archiving and cleaning.
29+
30+
NBCC works seamlessly without any new infrastructure or operational overhead. In the subsequent sections of this blog, we will give a brief introduction to Hudi's internals
31+
about the data file layout and TrueTime semantics for time generation, a pre-requisite for discussing NBCC. Following that, we will delve into the design and workflows of NBCC,
32+
and then a simple SQL demo to show the NBCC related config options. The blog will conclude with insights into future work for NBCC.
33+
34+
## Older Design
35+
36+
It's important to understand the Hudi [storage layout](/docs/next/storage_layouts) and it evolves/manages data versions. In older release before 1.0.0,
37+
Hudi organizes the data files with units as `FileGroup`. Each file group contains multiple `FileSlice`s. Every compaction on this file group generates a new file slice.
38+
Each file slice may comprise an optional base file(columnar file format like Apache Parquet or ORC) and multiple log files(row file format in Apache Avro or Parquet).
39+
40+
<img src="/assets/images/blog/non-blocking-concurrency-control/legacy_file_layout.png" alt="Legacy file layout" width="800" align="middle"/>
41+
42+
The timestamp in the base file name is the instant time of the compaction that writes it, it is also called as "requested instant time" in Hudi's notion.
43+
The timestamp in the log file name is the same timestamp as the current file slice base instant time. Data files with the same instant time belong to one file slice.
44+
In effect, a file group represented a linear ordered sequence of base files (checkpoints) followed by logs files (deltas), followed by base files (checkpoints).
45+
46+
The instant time naming convention in log files becomes a hash limitation in concurrency mode. Each log file contains incremental changes from
47+
multiple commits. Each writer needs to query the file layout to get the base instant time and figure out the full file name before flushing the records.
48+
A more severe problem is the base instant time can be variable with the async compaction pushing forward. In order to make the base instant time deterministic for the log writers, Hudi
49+
forces the schedule sequence between a write commit and compaction scheduling: a compaction can be scheduled only if there is no ongoing ingestion into the Hudi table. Without this, a log file
50+
can be written with a wrong base instant time which could introduce data loss. This means a compaction scheduling could block all the writers in concurrency mode.
51+
52+
## NBCC Design
53+
54+
In order to resolve these pains, since 1.0.0, Hudi introduces a new storage layout based on both requested and completion times of actions, viewing them as an interval.
55+
Each commit in 1.x Hudi has two [important notions of time](/docs/next/timeline): instant time(or requested time) and completion time.
56+
All the generated timestamp are globally monotonically increasing. Instead of putting the base instant time in the log file name, Hudi now just uses the requested instant time
57+
of the write. During file slicing, Hudi queries the completion time for each log file with the instant time, and we have a new rule for file slicing:
58+
59+
*A log file belongs to the file slice with the maximum base requested time smaller than(or equals with) it's completion time.*[^1]
60+
61+
<img src="/assets/images/blog/non-blocking-concurrency-control/new_file_layout.png" alt="New file layout" width="800" align="middle"/>
62+
63+
With the flexibility of the new file layout, the overhead of querying base instant time is eliminated for log writers and a compaction can be scheduled anywhere with any instant time.
64+
See [RFC-66](https://github.com/apache/hudi/blob/master/rfc/rfc-66/rfc-66.md) for more.
65+
66+
### True Time API
67+
68+
In order to ensure the monotonicity of timestamp generation, Hudi introduces the "[TrueTime API](/docs/next/timeline#timeline-components)" since 1.x release.
69+
Basically there are two ways to make the time generation monotonically increasing, inline with TrueTime semantics:
70+
71+
- A global lock to guard the time generation with mutex, along with a wait for an estimated max allowed clock skew on distributed hosts;
72+
- Globally synchronized time generation service, e.g. Google Spanner Time Service, the service itself can ensure the monotonicity.
73+
74+
Hudi now implements the "TrueTime" semantics with the first solution, a configurable max waiting time is supported.
75+
76+
### LSM timeline
77+
78+
The new file layout requires efficient queries from instant time to get the completion time. Hudi re-implements the archived timeline since 1.x, the
79+
new archived timeline data files are organized as [an LSM tree](/docs/next/timeline#lsm-timeline-history) to support fast time range filtering queries with instant time data-skipping on it.
80+
81+
<img src="/assets/images/blog/non-blocking-concurrency-control/lsm_archive_timeline.png" alt="LSM archive timeline" align="middle"/>
82+
83+
84+
With the powerful new file layout, it is quite straight-forward to implement non-blocking concurrency control. The function is implemented with the simple bucket index on MOR table for Flink.
85+
The bucket index ensures fixed record key to file group mappings for multiple workloads. The log writer writes the records into avro logs and the compaction table service would take care of
86+
the conflict resolution. Because each log file name contains the instant time and each record contains the event time ordering field, Hudi reader can merge the records either
87+
with natural order(processing time sequence) or event time order.
88+
89+
The concurrency mode should be configured as `NON_BLOCKING_CONCURRENCY_CONTROL`, you can enable the table services on one job and disable it for the others.
90+
91+
## Flink SQL demo
92+
93+
Here is a demo to show 2 pipelines that ingest into the same downstream table, the two sink table views share the same table path.
94+
95+
```sql
96+
-- NB-CC demo
97+
98+
-- The source table
99+
CREATE TABLE sourceT (
100+
uuid varchar(20),
101+
name varchar(10),
102+
age int,
103+
ts timestamp(3),
104+
`partition` as 'par1'
105+
) WITH (
106+
'connector' = 'datagen',
107+
'rows-per-second' = '200'
108+
);
109+
110+
-- table view for writer1
111+
create table t1(
112+
uuid varchar(20),
113+
name varchar(10),
114+
age int,
115+
ts timestamp(3),
116+
`partition` varchar(20)
117+
)
118+
with (
119+
'connector' = 'hudi',
120+
'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
121+
'table.type' = 'MERGE_ON_READ',
122+
'index.type' = 'BUCKET',
123+
'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
124+
'write.tasks' = '2'
125+
);
126+
127+
insert into t1/*+options('metadata.enabled'='true')*/ select * from sourceT;
128+
129+
-- table view for writer2
130+
-- compaction and cleaning are disabled because writer1 has taken care of it.
131+
create table t1_2(
132+
uuid varchar(20),
133+
name varchar(10),
134+
age int,
135+
ts timestamp(3),
136+
`partition` varchar(20)
137+
)
138+
with (
139+
'connector' = 'hudi',
140+
'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
141+
'table.type' = 'MERGE_ON_READ',
142+
'index.type' = 'BUCKET',
143+
'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
144+
'write.tasks' = '2',
145+
'compaction.schedule.enabled' = 'false',
146+
'compaction.async.enabled' = 'false',
147+
'clean.async.enabled' = 'false'
148+
);
149+
150+
-- executes the ingestion workloads
151+
insert into t1 select * from sourceT;
152+
insert into t1_2 select * from sourceT;
153+
```
154+
155+
## Future Roadmap
156+
157+
While non-blocking concurrency control is a very powerful feature for streaming users, it is a general solution for multiple writer conflict resolution,
158+
here are some plans that improve the Hudi core features:
159+
160+
- NBCC support for metadata table
161+
- NBCC for clustering
162+
- NBCC for other index type
163+
164+
165+
---
166+
167+
[^1] [RFC-66](https://github.com/apache/hudi/blob/master/rfc/rfc-66/rfc-66.md) well-explained the completion time based file slicing with a pseudocode.
Loading
Loading
Loading

0 commit comments

Comments
 (0)