Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ rustflags = [

# Flags for all targets.
[target.'cfg(all())']
rustflags = ["--cfg", "tokio_unstable"]
rustflags = ["--cfg", "tokio_unstable", "-Zhigher-ranked-assumptions"]

[build]
rustdocflags = ["-Zhigher-ranked-assumptions"]

# We have large git dependencies. This can make cloning faster.
# https://doc.rust-lang.org/nightly/cargo/reference/unstable.html#git
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/doc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ echo "--- Set openssl static link env vars"
configure_static_openssl

echo "--- Build documentation"
RUSTDOCFLAGS="-Dwarnings" cargo doc --document-private-items --no-deps
RUSTDOCFLAGS="-Dwarnings -Zhigher-ranked-assumptions" cargo doc --document-private-items --no-deps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we by default to also set -Dwarnings in config.toml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good idea. -Dwarnings is used here because we want to quickly detect the warning and return with error in CI, but not a general requirement when we build the doc.


echo "--- Show sccache stats"
sccache --show-stats
sccache --zero-stats

echo "--- Run doctest"
RUSTDOCFLAGS="-Clink-arg=-fuse-ld=lld" cargo test --doc
cargo test --doc

echo "--- Show sccache stats"
sccache --show-stats
Expand Down
16 changes: 8 additions & 8 deletions src/storage/hummock_test/src/hummock_vector_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn test_flat_vector() {

let epoch1_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch1_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand All @@ -95,7 +95,7 @@ async fn test_flat_vector() {

let epoch2_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch2_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand Down Expand Up @@ -152,7 +152,7 @@ async fn test_flat_vector() {

let output = read_snapshot_epoch
.nearest(
query.clone(),
query.to_ref(),
VectorNearestOptions {
top_n,
measure: DistanceMeasurement::InnerProduct,
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn test_flat_vector() {
vector_writer.init_for_test(epoch3).await.unwrap();
let epoch3_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch3_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand Down Expand Up @@ -264,7 +264,7 @@ async fn test_hnsw_vector() {

let epoch1_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch1_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand All @@ -277,7 +277,7 @@ async fn test_hnsw_vector() {

let epoch2_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch2_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand Down Expand Up @@ -334,7 +334,7 @@ async fn test_hnsw_vector() {
let top_n = 10;
let output = read_snapshot_epoch
.nearest(
query.clone(),
query.to_ref(),
VectorNearestOptions {
top_n,
measure: DistanceMeasurement::InnerProduct,
Expand Down Expand Up @@ -374,7 +374,7 @@ async fn test_hnsw_vector() {
vector_writer.init_for_test(epoch3).await.unwrap();
let epoch3_vectors = (0..100).map(|_| next_input()).collect_vec();
for (vec, info) in &epoch3_vectors {
vector_writer.insert(vec.clone(), info.clone()).unwrap();
vector_writer.insert(vec.to_ref(), info.clone()).unwrap();
vector_writer.try_flush().await.unwrap();
}

Expand Down
23 changes: 12 additions & 11 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::array::VectorRef;
use risingwave_common::catalog::TableId;
use risingwave_common::dispatch_distance_measurement;
use risingwave_common::util::epoch::is_max_epoch;
Expand Down Expand Up @@ -260,11 +261,11 @@ impl HummockStorageReadSnapshot {
/// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned,
/// the key is not found. If `Err()` is returned, the searching for the key
/// failed due to other non-EOF errors.
async fn get_inner<O>(
&self,
async fn get_inner<'a, O>(
&'a self,
key: TableKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
on_key_value_fn: impl KeyValueFn<'a, O>,
) -> StorageResult<Option<O>> {
let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));

Expand Down Expand Up @@ -639,12 +640,12 @@ pub struct HummockStorageReadSnapshot {
}

impl StateStoreGet for HummockStorageReadSnapshot {
fn on_key_value<O: Send + 'static>(
&self,
fn on_key_value<'a, O: Send + 'a>(
&'a self,
key: TableKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
) -> impl StorageFuture<'_, Option<O>> {
on_key_value_fn: impl KeyValueFn<'a, O>,
) -> impl StorageFuture<'a, Option<O>> {
self.get_inner(key, read_options, on_key_value_fn)
}
}
Expand Down Expand Up @@ -687,11 +688,11 @@ impl StateStoreRead for HummockStorageReadSnapshot {
}

impl StateStoreReadVector for HummockStorageReadSnapshot {
async fn nearest<O: Send + 'static>(
&self,
vec: Vector,
async fn nearest<'a, O: Send + 'a>(
&'a self,
vec: VectorRef<'a>,
options: VectorNearestOptions,
on_nearest_item_fn: impl OnNearestItemFn<O>,
on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
) -> StorageResult<Vec<O>> {
let version = match self.epoch {
HummockReadEpoch::Committed(epoch)
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ pub struct LocalHummockStorage {
}

impl LocalHummockFlushedSnapshotReader {
async fn get_flushed<O>(
hummock_version_reader: &HummockVersionReader,
async fn get_flushed<'a, O>(
hummock_version_reader: &'a HummockVersionReader,
read_version: &HummockReadVersionRef,
user_key: UserKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl crate::store::KeyValueFn<O>,
on_key_value_fn: impl KeyValueFn<'a, O>,
) -> StorageResult<Option<O>> {
let table_key_range = (
Bound::Included(user_key.table_key.clone()),
Expand Down Expand Up @@ -266,11 +266,11 @@ pub struct LocalHummockFlushedSnapshotReader {
}

impl StateStoreGet for LocalHummockFlushedSnapshotReader {
async fn on_key_value<O: Send + 'static>(
&self,
async fn on_key_value<'a, O: Send + 'a>(
&'a self,
key: TableKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
on_key_value_fn: impl KeyValueFn<'a, O>,
) -> StorageResult<Option<O>> {
let key = UserKey::new(self.table_id, key);
Self::get_flushed(
Expand Down Expand Up @@ -308,11 +308,11 @@ impl StateStoreRead for LocalHummockFlushedSnapshotReader {
}

impl StateStoreGet for LocalHummockStorage {
async fn on_key_value<O: Send + 'static>(
&self,
async fn on_key_value<'a, O: Send + 'a>(
&'a self,
key: TableKey<Bytes>,
read_options: ReadOptions,
on_key_value_fn: impl KeyValueFn<O>,
on_key_value_fn: impl KeyValueFn<'a, O>,
) -> StorageResult<Option<O>> {
let key = UserKey::new(self.table_id, key);
match self.mem_table.buffer.get(&key.table_key) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/hummock/store/vector_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use bytes::Bytes;
use risingwave_common::array::VectorRef;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::EpochPair;
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -171,7 +172,7 @@ impl StateStoreWriteEpochControl for HummockVectorWriter {
}

impl StateStoreWriteVector for HummockVectorWriter {
fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
Ok(self
.state
.as_mut()
Expand Down
21 changes: 11 additions & 10 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use bytes::Bytes;
use futures::future::try_join_all;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::array::VectorRef;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -67,7 +68,7 @@ use crate::monitor::{
GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
};
use crate::store::{
OnNearestItemFn, ReadLogOptions, ReadOptions, Vector, VectorNearestOptions, gen_min_epoch,
OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
};
use crate::vector::hnsw::nearest;
use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
Expand Down Expand Up @@ -588,14 +589,14 @@ impl HummockVersionReader {
const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;

impl HummockVersionReader {
pub async fn get<O>(
&self,
pub async fn get<'a, O>(
&'a self,
table_key: TableKey<Bytes>,
epoch: u64,
table_id: TableId,
read_options: ReadOptions,
read_version_tuple: ReadVersionTuple,
on_key_value_fn: impl crate::store::KeyValueFn<O>,
on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
) -> StorageResult<Option<O>> {
let (imms, uncommitted_ssts, committed_version) = read_version_tuple;

Expand Down Expand Up @@ -1201,13 +1202,13 @@ impl HummockVersionReader {
.await
}

pub async fn nearest<M: MeasureDistanceBuilder, O: Send>(
&self,
pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
&'a self,
version: PinnedVersion,
table_id: TableId,
target: Vector,
target: VectorRef<'a>,
options: VectorNearestOptions,
on_nearest_item_fn: impl OnNearestItemFn<O>,
on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
) -> HummockResult<Vec<O>> {
let Some(index) = version.vector_indexes.get(&table_id) else {
return Ok(vec![]);
Expand All @@ -1221,7 +1222,7 @@ impl HummockVersionReader {
}
match &index.inner {
VectorIndexImpl::Flat(flat) => {
let mut builder = NearestBuilder::<'_, O, M>::new(target.to_ref(), options.top_n);
let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
for vector_file in &flat.vector_store_info.vector_files {
let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
for (i, block_meta) in meta.block_metas.iter().enumerate() {
Expand All @@ -1246,7 +1247,7 @@ impl HummockVersionReader {
let (items, _stats) = nearest::<O, M>(
&vector_store,
&*graph,
target.to_ref(),
target,
on_nearest_item_fn,
options.hnsw_ef_search,
options.top_n,
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/vector/writer/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use bytes::{Bytes, BytesMut};
use prost::Message;
use rand::SeedableRng;
use rand::rngs::StdRng;
use risingwave_common::array::VectorRef;
use risingwave_common::dispatch_distance_measurement;
use risingwave_common::vector::distance::DistanceMeasurement;
use risingwave_hummock_sdk::HummockObjectId;
Expand All @@ -30,7 +31,6 @@ use crate::hummock::vector::file::FileVectorStore;
use crate::hummock::vector::writer::VectorObjectIdManagerRef;
use crate::hummock::{HummockResult, SstableStoreRef};
use crate::opts::StorageOpts;
use crate::store::Vector;
use crate::vector::hnsw::{
HnswBuilderOptions, HnswGraphBuilder, VectorAccessor, insert_graph, new_node,
};
Expand Down Expand Up @@ -90,13 +90,13 @@ impl HnswFlatIndexWriter {
})
}

pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
self.vector_store
.building_vectors
.as_mut()
.expect("for write")
.file_builder
.add(vec.to_ref(), &info);
.add(vec, &info);
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/hummock/vector/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::FutureExt;
use hnsw::HnswFlatIndexWriter;
use risingwave_common::array::VectorRef;
use risingwave_common::vector::distance::DistanceMeasurement;
use risingwave_hummock_sdk::vector_index::{
FlatIndex, FlatIndexAdd, VectorFileInfo, VectorIndex, VectorIndexAdd, VectorIndexImpl,
Expand All @@ -29,7 +30,6 @@ use risingwave_hummock_sdk::{HummockObjectId, HummockRawObjectId};
use crate::hummock::vector::file::VectorFileBuilder;
use crate::hummock::{HummockResult, ObjectIdManager, SstableStoreRef};
use crate::opts::StorageOpts;
use crate::vector::Vector;

#[async_trait::async_trait]
pub trait VectorObjectIdManager: Send + Sync {
Expand Down Expand Up @@ -105,7 +105,7 @@ impl VectorWriterImpl {
})
}

pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
match self {
VectorWriterImpl::Flat(writer) => writer.insert(vec, info),
VectorWriterImpl::HnswFlat(writer) => writer.insert(vec, info),
Expand Down Expand Up @@ -163,8 +163,8 @@ impl FlatIndexWriter {
}
}

pub(crate) fn insert(&mut self, vec: Vector, info: Bytes) -> HummockResult<()> {
self.vector_file_builder.add(vec.to_ref(), info.as_ref());
pub(crate) fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> HummockResult<()> {
self.vector_file_builder.add(vec, info.as_ref());
Ok(())
}

Expand Down
Loading
Loading