diff --git a/bin/docker-test.sh b/bin/docker-test.sh index cc54b2f..694aba0 100755 --- a/bin/docker-test.sh +++ b/bin/docker-test.sh @@ -64,6 +64,8 @@ function get_tag_name() { TAG_NAME=$(get_tag_name) INCLUDE_HNSWLIB=${INCLUDE_HNSWLIB:-true} +DATA_DIR=${DATA_DIR:-$(pwd)/data} + echo "Building docker image with tag name: $TAG_NAME" @@ -83,11 +85,13 @@ if [ -z "$1" ] then # This will build the image and run the container with the default make target # (i.e., print help message) - docker run -it --volume $(pwd)/data:/root/data --rm flatnav:$TAG_NAME make help + docker run -it --volume ${DATA_DIR}:/root/data --rm flatnav:$TAG_NAME make help exit 0 fi # Run the container and mount the data/ directory as volume to /root/data # Pass the make target as argument to the container. -docker run -it --volume $(pwd)/data:/root/data --rm flatnav:$TAG_NAME make $1 \ No newline at end of file +ARG1=$1 +docker run -it --volume ${DATA_DIR}:/root/data flatnav:$TAG_NAME /bin/bash \ + -c "make ${ARG1}; tail -f /dev/null" diff --git a/experiments/Makefile b/experiments/Makefile index 4d703f6..2029066 100644 --- a/experiments/Makefile +++ b/experiments/Makefile @@ -23,12 +23,30 @@ sift-bench: hubness-test: poetry run python hubness.py \ - --datasets mnist-784-euclidean cauchy-10-euclidean cauchy-256-euclidean cauchy-1024-euclidean sift-128-euclidean \ + --datasets normal-1-angular normal-2-angular normal-4-angular normal-8-angular normal-16-angular normal-32-angular normal-64-angular normal-128-angular normal-256-angular normal-1024-angular normal-1536-angular normal-1-euclidean normal-2-euclidean normal-4-euclidean normal-8-euclidean normal-16-euclidean normal-32-euclidean normal-64-euclidean normal-128-euclidean normal-256-euclidean normal-1024-euclidean normal-1536-euclidean \ --k 100 \ - --metric l2 \ --ef-construction 100 \ --ef-search 200 \ - --num-node-links 32 + --num-node-links 32 \ + --hubness-scores hubness-scores.json + + +generate-datasets: + poetry run python generate-datasets.py \ + --base-path /home/ubuntu/flatnav-experimental/data \ + --dataset-size 1010000 \ + --num-queries 10000 \ + --dimensions 1 2 4 8 16 32 64 128 1536 \ + --k 100 + +# Generate hubness scores for each desired dataset +generate-hubness-scores: + poetry run python generate-hubness-scores.py \ + --dataset-names normal-1-angular normal-2-angular normal-4-angular normal-8-angular normal-16-angular normal-32-angular normal-64-angular normal-128-angular normal-256-angular normal-1024-angular normal-1536-angular normal-2048-angular normal-1-euclidean normal-2-euclidean normal-4-euclidean normal-8-euclidean normal-16-euclidean normal-32-euclidean normal-64-euclidean normal-128-euclidean normal-256-euclidean normal-1024-euclidean normal-1536-euclidean normal-2048-euclidean \ + --base-path /media/scratch/ \ + --k 100 \ + --save-file hubness-scores-full.json + setup: install-flatnav install-hnswlib diff --git a/experiments/generate-datasets.py b/experiments/generate-datasets.py new file mode 100644 index 0000000..bfddbd1 --- /dev/null +++ b/experiments/generate-datasets.py @@ -0,0 +1,186 @@ +import os +import numpy as np +from sklearn.neighbors import NearestNeighbors +import argparse +import faiss + + +def generate_iid_normal_dataset( + num_samples: int, + num_dimensions: int, + num_queries: int, + k: int, + directory_path: str, + dataset_name: str, + metric: str = "cosine", +): + """ + Generatest a dataset with the specified number of samples and dimensions using + the standard normal distribution. + Separates a subset for queries and computes their true k nearest neighbors. + :param num_samples: Number of samples in the dataset. + :param num_dimensions: Number of dimensions for each sample. + :param num_queries: Number of queries to be separated from the dataset. + :param k: The number of nearest neighbors to find. + :param directory_path: Base path to save the dataset, queries, and ground truth labels. + :param dataset_name: Name of the dataset (should be something like normal-10-angular) + :param metric: Metric to use for computing nearest neighbors. + """ + + def normalize_rows_inplace(matrix): + for row in matrix: + norm = np.linalg.norm(row) + norm = norm if norm > 0 else 1e-30 + row /= norm + + def add_data_in_batches(index, data, batch_size=10000): + for i in range(0, data.shape[0], batch_size): + index.add(data[i : i + batch_size]) + + dataset = np.random.normal(size=(num_samples, num_dimensions)) + np.random.shuffle(dataset) + query_set = dataset[:num_queries] + dataset_without_queries = dataset[num_queries:] + + if metric in ["cosine", "angular", "ip"]: + normalize_rows_inplace(dataset_without_queries) + normalize_rows_inplace(query_set) + + print("Finished normalizing") + index = faiss.IndexFlatIP(dataset.shape[1]) + else: + index = faiss.IndexFlatL2(dataset.shape[1]) + + add_data_in_batches(index, dataset_without_queries) + + print("kNN search") + _, ground_truth_labels = index.search(query_set, k=k) + + if np.any(ground_truth_labels < 0): + raise ValueError("Indices cannot be negative") + + # Create directory if it doesn't exist + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + # dataset_without_queries = dataset_without_queries.astype(np.float32, copy=False) + # query_set = query_set.astype(np.float32, copy=False) + ground_truth_labels = ground_truth_labels.astype(np.int32, copy=False) + + print("Saving dataset") + # Save the dataset + np.save( + f"{directory_path}/{dataset_name}.train.npy", + dataset_without_queries, + ) + np.save(f"{directory_path}/{dataset_name}.test.npy", query_set) + np.save( + f"{directory_path}/{dataset_name}.gtruth.npy", + ground_truth_labels, + ) + + +def generate_cauchy_dataset( + num_samples: int, + num_dimensions: int, + num_queries: int, + k: int, + base_path: str, + metric: str = "minkowski", + p: int = 2, +): + """ + Generates a dataset with the specified number of samples and dimensions using + the Cauchy distribution. + Separates a subset for queries and computes their true k nearest neighbors. + + :param num_samples: Number of samples in the dataset. + :param num_dimensions: Number of dimensions for each sample. + :param num_queries: Number of queries to be separated from the dataset. + :param k: The number of nearest neighbors to find. + :param base_path: Base path to save the dataset, queries, and ground truth labels. + :param metric: Metric to use for computing nearest neighbors. + :param p: Parameter for the metric. + + NOTE: metric="minkowski" and p=2 is equivalent to Euclidean distance. + See: https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html + """ + # Generate the dataset + dataset = np.random.standard_cauchy(size=(num_samples, num_dimensions)) + + # Separate out a subset for queries + np.random.shuffle(dataset) + query_set = dataset[:num_queries] + dataset_without_queries = dataset[num_queries:] + + # Compute the true k nearest neighbors for the query set + nbrs = NearestNeighbors(n_neighbors=k, algorithm="brute", p=p, metric=metric).fit( + dataset_without_queries + ) + ground_truth_labels = nbrs.kneighbors(query_set, return_distance=False) + + # Save the dataset without queries, the queries, and the ground truth labels + np.save(f"{base_path}/train.npy", dataset_without_queries.astype(np.float32)) + np.save(f"{base_path}/test.npy", query_set.astype(np.float32)) + np.save(f"{base_path}/ground_truth.npy", ground_truth_labels.astype(np.int32)) + + +def check_datasets_exists(base_path: str, dataset_name: str) -> bool: + train_path = os.path.join(base_path, f"{dataset_name}.train.npy") + queries = os.path.join(base_path, f"{dataset_name}.test.npy") + ground_truth = os.path.join(base_path, f"{dataset_name}.gtruth.npy") + + all_exists = all( + [ + os.path.exists(train_path), + os.path.exists(queries), + os.path.exists(ground_truth), + ] + ) + return all_exists + + +def parse_arguments() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--base-path", type=str, required=True) + parser.add_argument("--dataset-size", type=int, required=True) + parser.add_argument("--num-queries", type=int, required=True) + parser.add_argument("--dimensions", type=int, nargs="+", required=True) + parser.add_argument("--k", type=int, default=100) + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_arguments() + base_path = args.base_path + dimensions = args.dimensions + + DATASET_NAMES = [f"normal-{d}-angular" for d in dimensions] + DATASET_NAMES += [f"normal-{d}-euclidean" for d in dimensions] + # DATASET_NAMES = [f"normal-{d}-euclidean" for d in dimensions] + + # Create the datasets. First create the directory if it doesn't exist + for dataset_name in DATASET_NAMES: + directory_path = os.path.join(base_path, dataset_name) + + if check_datasets_exists(directory_path, dataset_name): + print(f"Dataset {dataset_name} already exists. Skipping...") + continue + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + print(f"Generating dataset: {dataset_name}") + + _, dimension, metric = dataset_name.split("-") + metric = metric if metric == "euclidean" else "cosine" + # Generate the datasets + generate_iid_normal_dataset( + num_samples=args.dataset_size, + num_dimensions=int(dimension), + num_queries=args.num_queries, + k=args.k, + directory_path=directory_path, + dataset_name=dataset_name, + metric=metric, + ) diff --git a/experiments/generate-hubness-scores.py b/experiments/generate-hubness-scores.py new file mode 100644 index 0000000..1ffb89a --- /dev/null +++ b/experiments/generate-hubness-scores.py @@ -0,0 +1,100 @@ +import numpy as np +from sklearn.neighbors import NearestNeighbors +import json +import os +import argparse +import faiss + + +def compute_k_occurence_distrubution(top_k_indices: np.ndarray) -> np.ndarray: + """ + Computes the distribution of k-occurences for each node in the given array. + :param top_k_indices: array of shape (dataset_size, k) containing the indices of + the k nearest neighbors for each node. + + :return: array of shape (dataset_size,) containing the k-occurence distribution for each node (N_k) + """ + + # validate indices. If any value is negative, throw an error + if np.any(top_k_indices < 0): + raise ValueError("Indices cannot be negative") + + dataset_size = top_k_indices.shape[0] + k_occurence_distribution = np.zeros(dataset_size, dtype=int) + + flattened_indices = top_k_indices.flatten() + unique_indices, counts = np.unique(flattened_indices, return_counts=True) + k_occurence_distribution[unique_indices] = counts + + return k_occurence_distribution + + +def compute_skewness(dataset: np.ndarray, k: int, metric: str) -> float: + # For cosine distance, we will assume that the data was normalized + + if metric in ["cosine", "angular", "ip"]: + index = faiss.IndexFlatIP(dataset.shape[1]) + else: + index = faiss.IndexFlatL2(dataset.shape[1]) + + # Shuffle the dataset and add only the first 10k elements to the index + # np.random.shuffle(dataset) + # dataset = dataset[0:10000] + + index.add(dataset) + _, top_k_indices = index.search(dataset, k=k) + + k_occurence_distribution = compute_k_occurence_distrubution( + top_k_indices=top_k_indices + ) + mean = np.mean(k_occurence_distribution) + std_dev = np.std(k_occurence_distribution) + denominator = len(k_occurence_distribution) * (std_dev**3) + skewness = (np.sum((k_occurence_distribution - mean) ** 3)) / denominator + + return skewness + + +if __name__ == "__main__": + # We will compute the hubness scores for all given datasets and + # save them in a dictionary as JSON + + parser = argparse.ArgumentParser() + parser.add_argument("--base-path", type=str, required=True) + parser.add_argument("--dataset-names", type=str, nargs="+", required=True) + parser.add_argument("--k", type=int, default=100) + parser.add_argument("--save-file", type=str, required=True) + + args = parser.parse_args() + + file_path = os.path.join(args.base_path, args.save_file) + + dataset_names = args.dataset_names + for dataset_name in dataset_names: + print(f"Computing hubness score for {dataset_name}") + + dataset_path = os.path.join( + args.base_path, dataset_name, f"{dataset_name}.train.npy" + ) + _, dimension, metric = dataset_name.split("-") + metric = metric if metric == "euclidean" else "cosine" + + dataset = np.load(dataset_path) + dataset = dataset.astype(np.float32, copy=False) + + skewness = compute_skewness(dataset=dataset, k=args.k, metric=metric) + print(f"Skewness: {skewness}") + + # Read the existing data from the JSON file + if os.path.exists(file_path): + with open(file_path, "r") as file: + hubness_scores = json.load(file) + else: + hubness_scores = {} + + # Update the dictionary with the new hubness score + hubness_scores[dataset_name] = skewness + + # Write the updated dictionary back to the JSON file + with open(file_path, "w") as file: + json.dump(hubness_scores, file, indent=4) diff --git a/experiments/hubness.py b/experiments/hubness.py index c24f2e6..a6f4141 100644 --- a/experiments/hubness.py +++ b/experiments/hubness.py @@ -3,7 +3,7 @@ import numpy as np import argparse import os -from typing import Tuple, Union +from typing import Tuple, Union, Optional import logging import hnswlib import time @@ -14,118 +14,14 @@ import plotly.express as px from utils import compute_metrics -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) # ROOT_DATASET_PATH = "/root/data/" ROOT_DATASET_PATH = os.path.join(os.getcwd(), "..", "data") -DATASET_NAMES = { - "mnist-784-euclidean": "mnist", - "sift-128-euclidean": "sift", - "cauchy-10-euclidean": "cauchy10", - "cauchy-256-euclidean": "cauchy256", - "cauchy-1024-euclidean": "cauchy1024", -} - -def generate_cauchy_dataset( - num_samples: int, - num_dimensions: int, - num_queries: int, - k: int, - base_path: str, - metric: str = "minkowski", - p: int = 2, -): - """ - Generates a dataset with the specified number of samples and dimensions using - the Cauchy distribution. - Separates a subset for queries and computes their true k nearest neighbors. - - :param num_samples: Number of samples in the dataset. - :param num_dimensions: Number of dimensions for each sample. - :param num_queries: Number of queries to be separated from the dataset. - :param k: The number of nearest neighbors to find. - :param base_path: Base path to save the dataset, queries, and ground truth labels. - :param metric: Metric to use for computing nearest neighbors. - :param p: Parameter for the metric. - - NOTE: metric="minkowski" and p=2 is equivalent to Euclidean distance. - See: https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html - """ - # Generate the dataset - dataset = np.random.standard_cauchy(size=(num_samples, num_dimensions)) - - # Separate out a subset for queries - np.random.shuffle(dataset) - query_set = dataset[:num_queries] - dataset_without_queries = dataset[num_queries:] - - # Compute the true k nearest neighbors for the query set - nbrs = NearestNeighbors(n_neighbors=k, algorithm="brute", p=p, metric=metric).fit( - dataset_without_queries - ) - ground_truth_labels = nbrs.kneighbors(query_set, return_distance=False) - - # Save the dataset without queries, the queries, and the ground truth labels - np.save(f"{base_path}/train.npy", dataset_without_queries.astype(np.float32)) - np.save(f"{base_path}/test.npy", query_set.astype(np.float32)) - np.save(f"{base_path}/ground_truth.npy", ground_truth_labels.astype(np.int32)) - - -def compute_k_occurence_distrubution(top_k_indices: np.ndarray) -> np.ndarray: - """ - Computes the distribution of k-occurences for each node in the given array. - :param top_k_indices: array of shape (dataset_size, k) containing the indices of - the k nearest neighbors for each node. - - :return: array of shape (dataset_size,) containing the k-occurence distribution for each node (N_k) - """ - - # validate indices. If any value is negative, throw an error - if np.any(top_k_indices < 0): - raise ValueError("Indices cannot be negative") - - dataset_size = top_k_indices.shape[0] - k_occurence_distribution = np.zeros(dataset_size, dtype=int) - - flattened_indices = top_k_indices.flatten() - unique_indices, counts = np.unique(flattened_indices, return_counts=True) - k_occurence_distribution[unique_indices] = counts - - return k_occurence_distribution - - -def compute_skewness( - index: Union[flatnav.index.L2Index, hnswlib.Index], - dataset: np.ndarray, - ef_search: int, - k: int, -) -> float: - if type(index) == flatnav.index.L2Index: - _, top_k_indices = index.search( - queries=dataset, - ef_search=ef_search, - K=k, - ) - elif type(index) == hnswlib.Index: - top_k_indices, _ = index.knn_query(dataset, k=k) - else: - raise ValueError("Invalid index") - - k_occurence_distribution = compute_k_occurence_distrubution( - top_k_indices=top_k_indices - ) - mean = np.mean(k_occurence_distribution) - std_dev = np.std(k_occurence_distribution) - denominator = len(k_occurence_distribution) * (std_dev**3) - skewness = (np.sum((k_occurence_distribution - mean) ** 3)) / denominator - - return skewness - - -def get_recall_and_dataset_skewness( +def aggregate_metrics( train_dataset: np.ndarray, queries: np.ndarray, ground_truth: np.ndarray, @@ -134,64 +30,93 @@ def get_recall_and_dataset_skewness( ef_construction: int, ef_search: int, k: int, + search_batch_size: Optional[int] = None, num_initializations: int = 100, -): +) -> Tuple[dict, dict]: + """ + Computes the following metrics for FlatNav and HNSW: + - Recall@k + - Latency + - QPS + - Hubness score as measured by the skewness of the k-occurence distribution (N_k) + + NOTE: Index construction is done in parallel, but search is single-threaded. + + :param train_dataset: The dataset to compute the skewness for. + :param queries: The query vectors. + :param ground_truth: The ground truth indices for each query. + :param distance_type: The distance type to use for computing the skewness. + :param max_edges_per_node: The maximum number of edges per node. + :param ef_construction: The ef-construction parameter. + :param ef_search: The ef-search parameter. + :param k: The number of nearest neighbors to find. + :param num_initializations: The number of initializations for FlatNav. + """ + dataset_size, dim = train_dataset.shape - flatnav_index = flatnav.index.index_factory( - distance_type=distance_type, - dim=dim, - dataset_size=dataset_size, - max_edges_per_node=max_edges_per_node, - verbose=False, + hnsw_index = hnswlib.Index( + space=distance_type if distance_type == "l2" else "ip", dim=dim ) - hnsw_index = hnswlib.Index(space=distance_type, dim=dim) hnsw_index.init_index( max_elements=dataset_size, ef_construction=ef_construction, M=max_edges_per_node // 2, ) - flatnav_index.set_num_threads(os.cpu_count()) hnsw_index.set_num_threads(os.cpu_count()) - logging.debug(f"Building index...") + logging.info(f"Building index...") + hnsw_base_layer_filename = "hnsw_base_layer.mtx" hnsw_index.add_items(data=train_dataset, ids=np.arange(dataset_size)) - flatnav_index.add( - data=train_dataset, - ef_construction=ef_construction, - num_initializations=num_initializations, + hnsw_index.save_base_layer_graph(filename=hnsw_base_layer_filename) + + # Build FlatNav index + logging.info(f"Building FlatNav index...") + flatnav_index = flatnav.index.index_factory( + distance_type=distance_type, + dim=dim, + mtx_filename=hnsw_base_layer_filename, ) - # We currently have a bufferoverlow issue during FlatNav's multithreaded search, so - # I'm setting the number of threads to 1 for now until it's fixed. - flatnav_index.set_num_threads(num_threads=1) + logging.info(f"Allocating nodes and building graph links...") + # Here we will first allocate memory for the index and then build edge connectivity + # using the HNSW base layer graph. We do not use the ef-construction parameter since + # it's assumed to have been used when building the HNSW base layer. + flatnav_index.allocate_nodes(data=train_dataset).build_graph_links() - skewness_flatnav = compute_skewness( - dataset=train_dataset, index=flatnav_index, ef_search=ef_search, k=k - ) - skewness_hnsw = compute_skewness( - dataset=train_dataset, index=hnsw_index, ef_search=ef_search, k=k - ) + # Now delete the HNSW base layer graph since we don't need it anymore + os.remove(hnsw_base_layer_filename) + + flatnav_index.set_num_threads(num_threads=1) + hnsw_index.set_num_threads(num_threads=1) - logging.info(f"Skewness: Flatnav: {skewness_flatnav}, HNSW: {skewness_hnsw}") + requested_metrics = [f"recall@{k}", "latency", "qps"] - recall_flatnav, _ = compute_metrics( + logging.info(f"Computing FlatNav metrics...") + flatnav_metrics: dict = compute_metrics( + requested_metrics=requested_metrics, index=flatnav_index, queries=queries, ground_truth=ground_truth, ef_search=ef_search, k=k, + batch_size=search_batch_size, ) - recall_hnsw, _ = compute_metrics( + + logging.info(f"Computing HNSW metrics...") + hnsw_metrics: dict = compute_metrics( + requested_metrics=requested_metrics, index=hnsw_index, queries=queries, ground_truth=ground_truth, ef_search=ef_search, k=k, + batch_size=search_batch_size, ) - logging.info(f"Recall@{k}: Flatnav: {recall_flatnav}, HNSW: {recall_hnsw}") - return recall_flatnav, recall_hnsw, skewness_flatnav, skewness_hnsw + logging.info(f"Metrics: Flatnav: {flatnav_metrics}, \n HNSW: {hnsw_metrics}") + + return flatnav_metrics, hnsw_metrics def parse_args() -> argparse.Namespace: @@ -205,6 +130,13 @@ def parse_args() -> argparse.Namespace: help="dataset names. All will be expected to be at theh same path.", ) + parser.add_argument( + "--hubness-scores", + type=str, + required=True, + help="JSON file containing hubness scores for each dataset", + ) + parser.add_argument( "--k", type=int, @@ -212,9 +144,7 @@ def parse_args() -> argparse.Namespace: default=100, help="Number of nearest neighbors to consider", ) - parser.add_argument( - "--metric", type=str, required=True, help="Metric to use (l2 or angular)" - ) + parser.add_argument( "--ef-construction", type=int, required=True, help="ef-construction parameter." ) @@ -230,17 +160,24 @@ def parse_args() -> argparse.Namespace: help="max-edges-per-node parameter.", ) + parser.add_argument( + "--search-batch-size", + type=int, + default=None, + required=False, + help="The number of queries to search in a batch.", + ) + return parser.parse_args() def load_dataset(base_path: str, dataset_name: str) -> Tuple[np.ndarray]: if not os.path.exists(base_path): raise FileNotFoundError(f"Dataset path not found at {base_path}") - return ( - np.load(f"{base_path}/{dataset_name}.train.npy").astype(np.float32), - np.load(f"{base_path}/{dataset_name}.test.npy").astype(np.float32), - np.load(f"{base_path}/{dataset_name}.gtruth.npy").astype(np.uint32), + np.load(f"{base_path}/{dataset_name}.train.npy").astype(copy=False), + np.load(f"{base_path}/{dataset_name}.test.npy").astype(copy=False), + np.load(f"{base_path}/{dataset_name}.gtruth.npy").astype(copy=False), ) @@ -248,7 +185,7 @@ def plot_metrics_seaborn(metrics: dict, k: int): df_hnsw = pd.DataFrame( { "Skewness": metrics["skewness_hnsw"], - "Recall": metrics["recall_hnsw"], + "Latency": metrics["latency_hnsw"], "Algorithm": "HNSW", "Dataset": metrics["dataset_names"], } @@ -257,7 +194,7 @@ def plot_metrics_seaborn(metrics: dict, k: int): df_flatnav = pd.DataFrame( { "Skewness": metrics["skewness_flatnav"], - "Recall": metrics["recall_flatnav"], + "Latency": metrics["latency_flatnav"], "Algorithm": "FlatNav", "Dataset": metrics["dataset_names"], } @@ -272,7 +209,7 @@ def plot_metrics_seaborn(metrics: dict, k: int): sns.scatterplot( x="Skewness", - y="Recall", + y="Latency", hue="Algorithm", style="Algorithm", data=df, @@ -284,7 +221,7 @@ def plot_metrics_seaborn(metrics: dict, k: int): for i in range(len(df)): ax.text( df["Skewness"][i] + 0.5, - df["Recall"][i] + 0.01, + df["Latency"][i] + 0.01, df["Dataset"][i], horizontalalignment="center", size="small", @@ -293,7 +230,7 @@ def plot_metrics_seaborn(metrics: dict, k: int): ) sns.despine(trim=True, left=True) - ax.set_title(f"Recall vs Hubness score(skewness)") + ax.set_title(f"Mean Query Latency vs Hubness score(skewness)") ax.legend() # Save the figure @@ -304,7 +241,7 @@ def plot_metrics_plotly(metrics: dict, k: int): df_hnsw = pd.DataFrame( { "Skewness": metrics["skewness_hnsw"], - "Recall": metrics["recall_hnsw"], + "Latency": metrics["latency_hnsw"], "Algorithm": "HNSW", "Dataset": metrics["dataset_names"], } @@ -312,7 +249,7 @@ def plot_metrics_plotly(metrics: dict, k: int): df_flatnav = pd.DataFrame( { "Skewness": metrics["skewness_flatnav"], - "Recall": metrics["recall_flatnav"], + "Latency": metrics["latency_flatnav"], "Algorithm": "FlatNav", "Dataset": metrics["dataset_names"], } @@ -323,22 +260,22 @@ def plot_metrics_plotly(metrics: dict, k: int): fig = px.scatter( df, x="Skewness", - y="Recall", + y="Latency", color="Algorithm", symbol="Algorithm", size_max=15, hover_name="Dataset", # Shows dataset name on hover - title=f"Recall vs Hubness Score (Skewness of N_k)", + title="Mean query latency vs hubness score", ) fig.update_layout( legend_title_text="Algorithm", xaxis_title="Skewness", - yaxis_title="Recall", - legend=dict(orientation="h", yanchor="bottom", y=0.01, xanchor="left", x=0.01), + yaxis_title="Latency", + legend=dict(orientation="h", yanchor="top", y=0.01, xanchor="left", x=0.01), ) fig.show() - fig.write_html("hubness.html") - fig.write_image("hubness.png") + fig.write_html("hubness__.html") + # fig.write_image("hubness.png") if __name__ == "__main__": @@ -346,45 +283,73 @@ def plot_metrics_plotly(metrics: dict, k: int): # Initialize a metrics dictionary to contain recall values for FlatNav and HNSW and the skewness values for FlatNav and HNSW metrics = { - "recall_flatnav": [], - "recall_hnsw": [], - "skewness_flatnav": [], - "skewness_hnsw": [], + "latency_flatnav": [], + "latency_hnsw": [], "dataset_names": [], } + # replace every instance of "euclidean" with "l2" and "angular" with "cosine" in the + # dataset names list and form a dictionary of dataset names + DATASET_NAMES = {} + for dataset_name in args.datasets: + name, dimension, metric = dataset_name.split("-") + if metric == "euclidean": + metric = "l2" + elif metric == "angular": + metric = "cosine" + else: + raise ValueError(f"Invalid metric: {metric}") + DATASET_NAMES[dataset_name] = f"{name}{dimension}-{metric}" + + hubness_scores = os.getcwd() + "/" + args.hubness_scores + if not os.path.exists(hubness_scores): + raise FileNotFoundError(f"Hubness scores file not found at {hubness_scores}") + dataset_names = args.datasets - for dataset_name in dataset_names: + + for index, dataset_name in enumerate(dataset_names): + print(f"Processing dataset {dataset_name} ({index + 1}/{len(dataset_names)})") + _, dimension, metric = dataset_name.split("-") base_path = os.path.join(ROOT_DATASET_PATH, dataset_name) + + if not os.path.exists(base_path): + raise FileNotFoundError(f"Dataset path not found at {base_path}") + + print(f"Loading dataset {dataset_name}...") train_dataset, queries, ground_truth = load_dataset( base_path=base_path, dataset_name=dataset_name ) - ( - recall_flatnav, - recall_hnsw, - skewness_flatnav, - skewness_hnsw, - ) = get_recall_and_dataset_skewness( + + flatnav_metrics, hnsw_metrics = aggregate_metrics( train_dataset=train_dataset, queries=queries, ground_truth=ground_truth, - distance_type=args.metric, + distance_type=metric, max_edges_per_node=args.num_node_links, ef_construction=args.ef_construction, ef_search=args.ef_search, k=args.k, + search_batch_size=args.search_batch_size, ) - # Aggregate metrics - metrics["recall_flatnav"].append(recall_flatnav) - metrics["recall_hnsw"].append(recall_hnsw) - metrics["skewness_flatnav"].append(skewness_flatnav) - metrics["skewness_hnsw"].append(skewness_hnsw) + metrics["latency_flatnav"].append(flatnav_metrics["latency"]) + metrics["latency_hnsw"].append(hnsw_metrics["latency"]) metrics["dataset_names"].append(DATASET_NAMES[dataset_name]) - # Serialize metrics as JSON - with open("hubness.json", "w") as f: + # Add hubness scores to the metrics dictionary + with open(hubness_scores, "r") as f: + hubness_scores = json.load(f) + metrics["skewness_flatnav"] = [ + hubness_scores[dataset_name] for dataset_name in dataset_names + ] + metrics["skewness_hnsw"] = [ + hubness_scores[dataset_name] for dataset_name in dataset_names + ] + + # Save metrics to a JSON file called metrics.json + with open("metrics.json", "w") as f: json.dump(metrics, f) # Plot the metrics using seaborn - plot_metrics_plotly(metrics=metrics, k=100) + plot_metrics_plotly(metrics=metrics, k=args.k) + # plot_metrics_seaborn(metrics=metrics, k=args.k) diff --git a/experiments/plotting_utils.py b/experiments/plotting_utils.py new file mode 100644 index 0000000..7044c1b --- /dev/null +++ b/experiments/plotting_utils.py @@ -0,0 +1,82 @@ +import plotly.express as px +import pandas as pd + + +def plot_bubble_chart(metrics: dict): + # Adding a placeholder for the size of the datasets + df["DataSize"] = pd.Series( + [1 for _ in range(len(df))] + ) # Replace with actual data size values + + fig = px.scatter( + df, + x="Skewness", + y="Latency", + size="DataSize", # This is the third variable encoded as bubble size + color="Algorithm", + hover_name="Dataset", + ) + fig.show() + + +def plot_heatmap(metrics: dict): + df_hnsw = pd.DataFrame( + { + "Skewness": metrics["skewness_hnsw"], + "Latency": metrics["latency_hnsw"], + "Algorithm": "HNSW", + "Dataset": metrics["dataset_names"], + } + ) + df_flatnav = pd.DataFrame( + { + "Skewness": metrics["skewness_flatnav"], + "Latency": metrics["latency_flatnav"], + "Algorithm": "FlatNav", + "Dataset": metrics["dataset_names"], + } + ) + df = pd.concat([df_hnsw, df_flatnav]) + + # Pivot for heatmap + heatmap_data = df.pivot_table( + values="Latency", index="Dataset", columns="Algorithm", aggfunc="mean" + ) + + fig = px.imshow( + heatmap_data, + labels=dict(x="Algorithm", y="Dataset", color="Latency"), + x=["HNSW", "FlatNav"], + ) + fig.show() + + +def plot_faceted_grid(metrics: dict): + df_hnsw = pd.DataFrame( + { + "Skewness": metrics["skewness_hnsw"], + "Latency": metrics["latency_hnsw"], + "Algorithm": "HNSW", + "Dataset": metrics["dataset_names"], + } + ) + df_flatnav = pd.DataFrame( + { + "Skewness": metrics["skewness_flatnav"], + "Latency": metrics["latency_flatnav"], + "Algorithm": "FlatNav", + "Dataset": metrics["dataset_names"], + } + ) + df = pd.concat([df_hnsw, df_flatnav]) + + # Create the faceted grid + fig = px.scatter( + df, + x="Skewness", + y="Latency", + color="Algorithm", + facet_col="Algorithm", + hover_name="Dataset", + ) + fig.show() diff --git a/experiments/pyproject.toml b/experiments/pyproject.toml index 399b2c3..f9cfe63 100644 --- a/experiments/pyproject.toml +++ b/experiments/pyproject.toml @@ -18,6 +18,7 @@ seaborn = "^0.13.1" pandas = "^2.1.4" hnswlib = "^0.8.0" flatnav = {path = "../flatnav_python/dist/flatnav-0.0.1-cp310-cp310-linux_x86_64.whl"} +faiss-cpu = "^1.7.4" [build-system] requires = ["poetry-core"] diff --git a/experiments/utils.py b/experiments/utils.py index f539f1b..a1245b8 100644 --- a/experiments/utils.py +++ b/experiments/utils.py @@ -1,48 +1,129 @@ -from typing import Tuple, Union +from typing import Tuple, Union, List, Optional import numpy as np import hnswlib import time import flatnav +def search_in_batches(index, queries, batch_size, ef_search, k): + if type(index) in (flatnav.index.L2Index, flatnav.index.IPIndex): + for i in range(0, len(queries), batch_size): + top_k_indices_batch, _ = index.search( + queries=queries[i : i + batch_size], + ef_search=ef_search, + K=k, + num_initializations=300, + ) + yield top_k_indices_batch + else: + index.set_ef(ef_search) + for i in range(0, len(queries), batch_size): + top_k_indices_batch, _ = index.knn_query( + data=queries[i : i + batch_size], k=k + ) + yield top_k_indices_batch + + +# def search_in_batches( +# index, queries, batch_size, ef_search, k +# ) -> Tuple[np.ndarray, float]: +# top_k_indices = [] +# if type(index) in (flatnav.index.L2Index, flatnav.index.IPIndex): +# start = time.time() +# for i in range(0, len(queries), batch_size): +# top_k_indices_batch, _ = index.search( +# queries=queries[i : i + batch_size], +# ef_search=ef_search, +# K=k, +# num_initializations=300, +# ) +# top_k_indices.append(top_k_indices_batch) +# end = time.time() +# else: +# index.set_ef(ef_search) +# start = time.time() +# for i in range(0, len(queries), batch_size): +# top_k_indices_batch, _ = index.knn_query( +# data=queries[i : i + batch_size], k=k +# ) +# top_k_indices.append(top_k_indices_batch) +# end = time.time() + +# querying_time = end - start +# top_k_indices = np.concatenate(top_k_indices) +# return top_k_indices, querying_time + + def compute_metrics( + requested_metrics: List[str], index: Union[flatnav.index.L2Index, flatnav.index.IPIndex, hnswlib.Index], queries: np.ndarray, ground_truth: np.ndarray, ef_search: int, k=100, -) -> Tuple[float, float]: + batch_size: Optional[int] = None, +) -> dict: """ Compute recall and QPS for given queries, ground truth for the given index(FlatNav or HNSW). Args: + - requested_metrics: A list of metrics to compute. These include recall, qps and latency. - index: A FlatNav index to search. - queries: The query vectors. - ground_truth: The ground truth indices for each query. - k: Number of neighbors to search. + - ef_search: The number of neighbors to visit during search. + - batch_size: The number of queries to search in a batch. If None, search all queries at once. Returns: - Mean recall over all queries. - QPS over all queries + - A dictionary containing the requested metrics. """ - if type(index) in (flatnav.index.L2Index, flatnav.index.IPIndex): - print(f"[FlatNav] searching with num-threads = {index.num_threads}") + + index_is_flatnav = type(index) in (flatnav.index.L2Index, flatnav.index.IPIndex) + + # Use teh batch size to search in batches and then concatenate the results + if batch_size is not None: start = time.time() - _, top_k_indices = index.search( - queries=queries, ef_search=ef_search, K=k, num_initializations=300 - ) + top_k_indices = [] + for batch_result in search_in_batches( + index=index, + queries=queries, + batch_size=batch_size, + ef_search=ef_search, + k=k, + ): + top_k_indices.append(batch_result) + + top_k_indices = np.concatenate(top_k_indices) end = time.time() + querying_time = end - start + else: - print(f"[HNSW] search") - index.set_ef(ef_search) - start = time.time() - # Search for HNSW return (ids, distances) instead of (distances, ids) - top_k_indices, _ = index.knn_query(data=queries, k=k) - end = time.time() + if type(index) in (flatnav.index.L2Index, flatnav.index.IPIndex): + start = time.time() + top_k_indices, _ = index.search( + queries=queries, ef_search=ef_search, K=k, num_initializations=300 + ) + end = time.time() + querying_time = end - start + else: + index.set_ef(ef_search) + start = time.time() + top_k_indices, _ = index.knn_query(data=queries, k=k) + end = time.time() + querying_time = end - start + + metrics = {} - querying_time = end - start - qps = len(queries) / querying_time + if "qps" in requested_metrics: + qps = len(queries) / querying_time + metrics["qps"] = qps + + if "latency" in requested_metrics: + # Get latency in milliseconds + latency = querying_time * 1000 / len(queries) + metrics["latency"] = latency # Convert each ground truth list to a set for faster lookup ground_truth_sets = [set(gt) for gt in ground_truth] @@ -57,4 +138,6 @@ def compute_metrics( recall = mean_recall / len(queries) - return recall, qps + metrics["recall"] = recall + + return metrics diff --git a/flatnav_python/python_bindings.cpp b/flatnav_python/python_bindings.cpp index 3fa294d..3656de7 100644 --- a/flatnav_python/python_bindings.cpp +++ b/flatnav_python/python_bindings.cpp @@ -36,6 +36,9 @@ class PyIndex : public std::enable_shared_from_this> { typedef std::pair, py::array_t> DistancesLabelsPair; + typedef std::pair, py::array_t> + LabelsDistancesPair; + explicit PyIndex(std::unique_ptr> index) : _dim(index->dataDimension()), _label_id(0), _verbose(false), _index(index.get()) { @@ -154,7 +157,7 @@ class PyIndex : public std::enable_shared_from_this> { } } - DistancesLabelsPair + LabelsDistancesPair search(const py::array_t queries, int K, int ef_search, int num_initializations = 100) { @@ -218,7 +221,7 @@ class PyIndex : public std::enable_shared_from_this> { {num_queries, (size_t)K}, {K * sizeof(float), sizeof(float)}, distances, free_distances_when_done); - return {dists, labels}; + return {labels, dists}; } };