From 00b8bcf3250a8103bd2bf8f065653fb73fece824 Mon Sep 17 00:00:00 2001 From: pragya16067 Date: Wed, 18 Jan 2023 10:55:19 -0800 Subject: [PATCH] Fix e2e dataset construction --- e2e_scripts/preprocess_s2and_data.py | 67 +++++++++++-- e2e_scripts/train_s2and_hac.py | 105 ++++++++++++++++++++ s2and/data.py | 116 +++++++++++++++++------ s2and/featurizer.py | 1 + wandb_configs/wandb_overfit_1_batch.json | 3 +- 5 files changed, 257 insertions(+), 35 deletions(-) create mode 100644 e2e_scripts/train_s2and_hac.py diff --git a/e2e_scripts/preprocess_s2and_data.py b/e2e_scripts/preprocess_s2and_data.py index df946fe..1322586 100644 --- a/e2e_scripts/preprocess_s2and_data.py +++ b/e2e_scripts/preprocess_s2and_data.py @@ -13,6 +13,14 @@ import numpy as np from utils.parser import Parser +from s2and.data import ANDData +import logging +from s2and.featurizer import FeaturizationInfo, featurize + +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', datefmt='%m/%d/%Y %H:%M:%S', + level=logging.INFO) +logger = logging.getLogger(__name__) + def save_blockwise_featurized_data(dataset_name, random_seed): parent_dir = f"{DATA_HOME_DIR}/{dataset_name}" @@ -23,22 +31,20 @@ def save_blockwise_featurized_data(dataset_name, random_seed): specter_embeddings=join(parent_dir, f"{dataset_name}_specter.pickle"), clusters=join(parent_dir, f"{dataset_name}_clusters.json"), block_type="s2", - train_pairs_size=100, - val_pairs_size=100, - test_pairs_size=100, + train_pairs_size=100000, + val_pairs_size=10000, + test_pairs_size=10000, name=dataset_name, - n_jobs=2, + n_jobs=16, random_seed=random_seed, ) - # Uncomment the following line if you wish to preprocess the whole dataset - AND_dataset.process_whole_dataset() # Load the featurizer, which calculates pairwise similarity scores featurization_info = FeaturizationInfo() # the cache will make it faster to train multiple times - it stores the features on disk for you train_pkl, val_pkl, test_pkl = store_featurized_pickles(AND_dataset, featurization_info, - n_jobs=2, + n_jobs=16, use_cache=False, random_seed=random_seed) @@ -53,6 +59,51 @@ def read_blockwise_features(pkl): print("Total num of blocks:", len(blockwise_data.keys())) return blockwise_data +def find_total_num_train_pairs(blockwise_data): + count = 0 + for block_id in blockwise_data.keys(): + count += len(blockwise_data[block_id][0]) + + print("Total num of signature pairs", count) + +# def verify_diff_with_s2and(dataset_name, random_seed): +# parent_dir = f"{DATA_HOME_DIR}/{dataset_name}" +# AND_dataset = ANDData( +# signatures=join(parent_dir, f"{dataset_name}_signatures.json"), +# papers=join(parent_dir, f"{dataset_name}_papers.json"), +# mode="train", +# specter_embeddings=join(parent_dir, f"{dataset_name}_specter.pickle"), +# clusters=join(parent_dir, f"{dataset_name}_clusters.json"), +# block_type="s2", +# train_pairs_size=100, +# val_pairs_size=100, +# test_pairs_size=100, +# # train_pairs_size=100000, +# # val_pairs_size=10000, +# # test_pairs_size=10000, +# name=dataset_name, +# n_jobs=2, +# random_seed=random_seed, +# ) +# +# # Load the featurizer, which calculates pairwise similarity scores +# featurization_info = FeaturizationInfo() +# # the cache will make it faster to train multiple times - it stores the features on disk for you +# train, val, test = featurize(AND_dataset, featurization_info, n_jobs=2, use_cache=False) +# X_train, y_train, _ = train +# X_val, y_val, _ = val +# +# logger.info("Done loading and featurizing") +# +# #Verify the 2 sets are equal +# with open("s2and_data_subsample.pkl", "rb") as _pkl_file: +# s2and_set = pickle.load(_pkl_file) +# +# with open("our_data_subsample.pkl", "rb") as _pkl_file: +# our_set = pickle.load(_pkl_file) +# +# print("VERIFICATION STATUS: ", s2and_set==our_set) + if __name__=='__main__': # Creates the pickles that store the preprocessed data @@ -77,6 +128,8 @@ def read_blockwise_features(pkl): val_pkl = f"{PREPROCESSED_DATA_DIR}/{dataset}/seed{seed}/val_features.pkl" test_pkl = f"{PREPROCESSED_DATA_DIR}/{dataset}/seed{seed}/test_features.pkl" blockwise_features = read_blockwise_features(train_pkl) + find_total_num_train_pairs(blockwise_features) + #verify_diff_with_s2and(dataset, seed) diff --git a/e2e_scripts/train_s2and_hac.py b/e2e_scripts/train_s2and_hac.py new file mode 100644 index 0000000..6325b00 --- /dev/null +++ b/e2e_scripts/train_s2and_hac.py @@ -0,0 +1,105 @@ +import pickle +from os.path import join +from typing import Dict, Tuple +import numpy as np +from s2and.consts import PREPROCESSED_DATA_DIR +from s2and.data import ANDData +import logging +from s2and.model import PairwiseModeler +from s2and.featurizer import FeaturizationInfo, featurize +from s2and.eval import pairwise_eval, cluster_eval +from s2and.model import Clusterer, FastCluster +from hyperopt import hp + +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', datefmt='%m/%d/%Y %H:%M:%S', + level=logging.INFO) +logger = logging.getLogger(__name__) + +def load_training_data(train_pkl, val_pkl): + blockwise_data: Dict[str, Tuple[np.ndarray, np.ndarray]] + with open(train_pkl, "rb") as _pkl_file: + blockwise_data = pickle.load(_pkl_file) + # Combine the blockwise_data to form complete train, test, val sets + remove_arr = np.zeros(39) + X_train = [remove_arr] + y_train = [] + for block_data in blockwise_data.values(): + x, y, cluster_ids = block_data + X_train = np.concatenate((X_train, x), axis=0) + y_train = np.concatenate((y_train, y), axis=0) + X_train = np.delete(X_train, 0) + + blockwise_data_val: Dict[str, Tuple[np.ndarray, np.ndarray]] + with open(val_pkl, "rb") as _pkl_file: + blockwise_data_val = pickle.load(_pkl_file) + # Combine the blockwise_data to form complete train, test, val sets + X_val = [remove_arr] + y_val = [] + for block_data in blockwise_data_val.values(): + x, y, cluster_ids = block_data + X_val = np.concatenate((X_val, x), axis=0) + y_val = np.concatenate((y_val, y), axis=0) + X_val = np.delete(X_val, 0) + + logger.info("Dataset loaded and prepared for training") + + # dataset = ANDData( + # signatures=join(parent_dir, f"{dataset_name}_signatures.json"), + # papers=join(parent_dir, f"{dataset_name}_papers.json"), + # mode="train", + # specter_embeddings=join(parent_dir, f"{dataset_name}_specter.pickle"), + # clusters=join(parent_dir, f"{dataset_name}_clusters.json"), + # block_type="s2", + # name=dataset_name, + # n_jobs=4, + # ) + # logger.info("loaded the data") + + # Training Featurizer model + featurization_info = FeaturizationInfo() + + logger.info("Done loading and featurizing") + return featurization_info, X_train, y_train, X_val, y_val + +def train_pairwise_classifier(featurization_info, X_train, y_train, X_val, y_val): + # calibration fits isotonic regression after the binary classifier is fit + # monotone constraints help the LightGBM classifier behave sensibly + pairwise_model = PairwiseModeler( + n_iter=25, monotone_constraints=featurization_info.lightgbm_monotone_constraints + ) + # this does hyperparameter selection, which is why we need to pass in the validation set. + pairwise_model.fit(X_train, y_train, X_val, y_val) + logger.info("Fitted the Pairwise model") + + # this will also dump a lot of useful plots (ROC, PR, SHAP) to the figs_path + pairwise_metrics = pairwise_eval(X_val, y_val, pairwise_model.classifier, figs_path='figs/', title='validation_metrics') + logger.info(pairwise_metrics) + return pairwise_model + +def train_HAC_clusterer(dataset_name, featurization_info, pairwise_model): + clusterer = Clusterer( + featurization_info, + pairwise_model, + cluster_model=FastCluster(linkage="average"), + search_space={"eps": hp.uniform("eps", 0, 1)}, + n_iter=25, + n_jobs=8, + ) + clusterer.fit(dataset_name) + + # the metrics_per_signature are there so we can break out the facets if needed + metrics, metrics_per_signature = cluster_eval(dataset_name, clusterer) + logger.info(metrics) + + +if __name__=='__main__': + dataset_name = "pubmed" + dataset_seed = 1 + parent_dir = f"../data/{dataset_name}" + train_pkl = f"{PREPROCESSED_DATA_DIR}/{dataset_name}/seed{dataset_seed}/train_features.pkl" + val_pkl = f"{PREPROCESSED_DATA_DIR}/{dataset_name}/seed{dataset_seed}/val_features.pkl" + test_pkl = f"{PREPROCESSED_DATA_DIR}/{dataset_name}/seed{dataset_seed}/test_features.pkl" + + featurization_info, X_train, y_train, X_val, y_val = load_training_data(train_pkl, val_pkl) + pairwise_model = train_pairwise_classifier(featurization_info, X_train, y_train, X_val, y_val) + train_HAC_clusterer(dataset_name, featurization_info, pairwise_model) \ No newline at end of file diff --git a/s2and/data.py b/s2and/data.py index c19e4b0..7354199 100644 --- a/s2and/data.py +++ b/s2and/data.py @@ -494,19 +494,6 @@ def __init__( self.preprocess_signatures(name_counts_loaded) logger.info("preprocessed signatures") - def process_whole_dataset(self): - """ - Call this function before calling the preprocessing function if you wish to preprocess ALL train/test/val pairs - """ - N = len(self.signatures) - print("Dataset has total ", N, "signatures") - n = 0.8 * N - self.train_pairs_size = n * (n - 1) / 2 - n = 0.1 * N - self.val_pairs_size = n * (n - 1) / 2 - n = 0.1 * N - self.test_pairs_size = n * (n - 1) / 2 - def get_signature_objects(self, signature_ids: Dict[str, List[str]]) -> Dict[str, List[Signature]]: """ @@ -1230,12 +1217,12 @@ def split_pairs_to_store( and isinstance(val_signatures, dict) and isinstance(test_signatures, dict) ) - train_blockwise_pairs, train_blockwise_clusterIds = self.pair_sampling_to_store( + train_signatures, train_blockwise_pairs, train_blockwise_clusterIds = self.pair_sampling_to_store( self.train_pairs_size, [], train_signatures, ) - val_blockwise_pairs, val_blockwise_clusterIds = ( + val_signatures, val_blockwise_pairs, val_blockwise_clusterIds = ( self.pair_sampling_to_store( self.val_pairs_size, [], @@ -1245,14 +1232,15 @@ def split_pairs_to_store( else [] ) - test_blockwise_pairs, test_blockwise_clusterIds = self.pair_sampling_to_store( + test_signatures, test_blockwise_pairs, test_blockwise_clusterIds = self.pair_sampling_to_store( self.test_pairs_size, [], test_signatures, self.all_test_pairs_flag ) - return train_blockwise_pairs, train_blockwise_clusterIds, \ + return train_signatures, val_signatures, test_signatures, \ + train_blockwise_pairs, train_blockwise_clusterIds, \ val_blockwise_pairs, val_blockwise_clusterIds, \ test_blockwise_pairs, test_blockwise_clusterIds @@ -1482,8 +1470,24 @@ def pair_sampling( sample_size = min(len(possible), sample_size) pairs = random_sampling(possible, sample_size, self.random_seed) + # if not os.path.exists("s2and_data_subsample.pkl"): + # subsample_id_set = set() + # for tuple in pairs: + # id1, id2, _ = tuple + # subsample_id_set.add(id1) + # subsample_id_set.add(id2) + # + # with open('s2and_data_subsample.pkl', 'wb') as f: + # pickle.dump(subsample_id_set, f) + return pairs + @staticmethod + def get_indices_by_matrix_idx(K, n): + first_pos = list(range(K * (n - 1) - K * (K - 1) // 2, K * (n - 1) - K * (K - 1) // 2 + (n - K - 2) + 1)) + second_pos = [k * (n - 1) - k * (k - 1) // 2 + K - k - 1 for k in range(K)] + return first_pos + second_pos + def pair_sampling_to_store( self, sample_size: int, @@ -1536,6 +1540,7 @@ def pair_sampling_to_store( blockwise_sig_pairs: Dict[str, List[Tuple[str, str, Union[int, float]]]] = {} # Return block-wise cluster_ids blockwise_cluster_ids: Dict[str, List[str]] = {} + blockwise_sig_ids: Dict[str, List[str]] = {} if not self.pair_sampling_block: #Ignored for s2 Block featurization for i, s1 in enumerate(signature_ids): @@ -1561,7 +1566,7 @@ def pair_sampling_to_store( for i, s1 in enumerate(signatures): s1_cluster = self.signature_to_cluster_id[s1] cluster_ids.append(s1_cluster) - for s2 in signatures[i + 1 :]: + for j, s2 in enumerate(signatures[i + 1 :]): if self.signature_to_cluster_id is not None: s2_cluster = self.signature_to_cluster_id[s2] if s1_cluster == s2_cluster: @@ -1575,6 +1580,7 @@ def pair_sampling_to_store( sig_pairs.append((s1, s2, NUMPY_NAN)) blockwise_sig_pairs[block_id] = sig_pairs blockwise_cluster_ids[block_id] = cluster_ids + blockwise_sig_ids[block_id] = signatures else: for _, signatures in blocks.items(): @@ -1594,6 +1600,7 @@ def pair_sampling_to_store( same_name_different_cluster.append((s1, s2, 0)) else: different_name_different_cluster.append((s1, s2, 0)) + if all_pairs: # TODO: Need to update this for mode=inference if ( self.pair_sampling_balanced_homonym_synonym @@ -1625,15 +1632,70 @@ def pair_sampling_to_store( and not self.pair_sampling_balanced_classes and not self.pair_sampling_balanced_homonym_synonym ): - # Take samples from each block weighted by their len of total samples - # sample_size = min(len(possible), sample_size) - # blockwise_pairs: Dict[str, List[Tuple[str, str, Union[int, float]]]] = {} - # for k in blockwise_sig_pairs.keys(): - # block_sample_size = int(sample_size * len(blockwise_sig_pairs[k])/len(possible)) - # samples = random_sampling(blockwise_sig_pairs[k], block_sample_size, self.random_seed) - # if(len(samples)>0): - # blockwise_pairs[k] = samples - return blockwise_sig_pairs, blockwise_cluster_ids + # Subsampling, while maintaining transitivity + sample_size = min(len(possible), sample_size) + pairs_ids = random_sampling(possible, sample_size, self.random_seed) + # Construct hashset to make queries faster + subsample_id_set = set() + for tuple in pairs_ids: + id1, id2, _ = tuple + subsample_id_set.add(id1) + subsample_id_set.add(id2) + + # if not os.path.exists("our_data_subsample.pkl"): + # with open('our_data_subsample.pkl', 'wb') as f: + # pickle.dump(subsample_id_set, f) + + # Remove sig_pairs which are not subsampled + for block_id, signatures in blocks.items(): + sig_pairs = blockwise_sig_pairs[block_id] + cluster_ids = blockwise_cluster_ids[block_id] + n = len(signatures) + block_len = len(sig_pairs) + + all_idxs = np.arange(0, n) + sig_idxs_to_keep = [] + for i, s in enumerate(signatures): + if (s in subsample_id_set): + sig_idxs_to_keep.append(i) + + sig_idxs_to_keep = np.sort(sig_idxs_to_keep) + if(sig_idxs_to_keep.size == 0): + # delete the whole block + del blockwise_sig_ids[block_id] + del blockwise_sig_pairs[block_id] + del blockwise_cluster_ids[block_id] + continue + elif(sig_idxs_to_keep.size== n): + # continue no need to subsample + continue + else: + sig_idxs_to_remove = np.delete(all_idxs, sig_idxs_to_keep) + + idxs_to_remove = [] + for midx in sig_idxs_to_remove: + idxs_to_remove += self.get_indices_by_matrix_idx(midx, n) + idxs_to_remove = np.sort(np.unique(idxs_to_remove)) + if (idxs_to_remove.size == 0): + idxs_to_keep = np.arange(block_len) + else: + idxs_to_keep = np.delete(np.arange(block_len), idxs_to_remove) + + idxs_to_keep = np.array(idxs_to_keep, dtype=int) + sig_idxs_to_keep = np.array(sig_idxs_to_keep, dtype=int) + #print(type(idxs_to_keep), type(sig_idxs_to_keep), idxs_to_keep, sig_idxs_to_keep) + _sig_pairs = [ele for idx, ele in enumerate(sig_pairs) if idx in idxs_to_keep] + # Error in next line: 'tuple' object is not callable, but above line also works same + #_sig_pairs = list(np.array([tuple(row) for row in sig_pairs], dtype=np.dtype('U10,U10,i'))[idxs_to_keep]) + _clusterIds = list(np.array(cluster_ids)[sig_idxs_to_keep]) + _signatures = list(np.array(signatures)[sig_idxs_to_keep]) + # Update the values in the dictionary + blockwise_sig_pairs[block_id] = _sig_pairs + blockwise_cluster_ids[block_id] = _clusterIds + blockwise_sig_ids[block_id] = _signatures + + + return blockwise_sig_ids, blockwise_sig_pairs, blockwise_cluster_ids return pairs diff --git a/s2and/featurizer.py b/s2and/featurizer.py index a155f5e..f306980 100644 --- a/s2and/featurizer.py +++ b/s2and/featurizer.py @@ -904,6 +904,7 @@ def store_featurized_pickles( ) = dataset.split_cluster_signatures() # this is called for getting blockwise signature pairs # Modify method call to store blockwise signature pairs as pickle so that dataloader can load from these idxs # After this call block id is lost + train_signatures, val_signatures, test_signatures,\ train_blockwise_pairs, train_blockwise_clusterIds, \ val_blockwise_pairs, val_blockwise_clusterIds, \ test_blockwise_pairs, test_blockwise_clusterIds = dataset.split_pairs_to_store( diff --git a/wandb_configs/wandb_overfit_1_batch.json b/wandb_configs/wandb_overfit_1_batch.json index 2a8d872..ccab55f 100644 --- a/wandb_configs/wandb_overfit_1_batch.json +++ b/wandb_configs/wandb_overfit_1_batch.json @@ -1,3 +1,4 @@ { - "overfit_batch_idx": 35 + "dataset": "pubmed", + "dataset_random_seed": 1 } \ No newline at end of file