diff --git a/requirements.txt b/requirements.txt index ce92150..5d0ef16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ ipdb>=0.13.4 -nptyping>=2.5.0 -numpy>=2.2.6 +nptyping>=1.4.4 +numpy>=1.20.0,<2.0.0 PyJWT>=2.10.1 setuptools>=80.7.1 -torch>=2.7.0 +torch>=2.0.0 tqdm>=4.67.1 +scikit-learn>=1.0.0 diff --git a/stringlifier/api_improved.py b/stringlifier/api_improved.py new file mode 100644 index 0000000..a169a91 --- /dev/null +++ b/stringlifier/api_improved.py @@ -0,0 +1,225 @@ +import numpy as np +import torch +from typing import List, Tuple, Optional, Union, Dict, Any +from numpy.typing import NDArray + +class StringlifierAPI: + """ + API for the Stringlifier model with improved vectorization. + + This class provides methods to identify and extract random strings, UUIDs, + IP addresses, and other non-natural language tokens from text using + a trained sequence labeling model. + """ + + def __init__(self, classifier, encodings): + """ + Initialize the Stringlifier API. + + Args: + classifier: Trained classifier model + encodings: Encodings for the model + """ + self.classifier = classifier + self.encodings = encodings + + def process(self, string_or_list: Union[str, List[str]], cutoff: int = 5, + return_tokens: bool = False) -> Union[List[str], Tuple[List[str], List[List[Tuple[str, int, int, str]]]]]: + """ + Process input string(s) to identify and replace random strings. + + Args: + string_or_list: Input string or list of strings to process + cutoff: Minimum length of tokens to consider + return_tokens: Whether to return extracted tokens along with processed strings + + Returns: + If return_tokens is False, returns list of processed strings + If return_tokens is True, returns tuple of (processed_strings, extracted_tokens) + """ + # Handle single string input + if isinstance(string_or_list, str): + tokens = [string_or_list] + else: + tokens = string_or_list + + # Handle empty input + max_len = max([len(s) for s in tokens]) if tokens else 0 + if max_len == 0: + if return_tokens: + return [''], [] + else: + return [''] + + # Get model predictions + with torch.no_grad(): + p_ts = self.classifier(tokens) + p_ts = torch.argmax(p_ts, dim=-1).detach().cpu().numpy() + + # Process each input string + ext_tokens: List[List[Tuple[str, int, int, str]]] = [] + new_strings: List[str] = [] + + for iBatch in range(p_ts.shape[0]): + new_str, toks = self._extract_tokens_vectorized(tokens[iBatch], p_ts[iBatch], cutoff=cutoff) + new_strings.append(new_str) + ext_tokens.append(toks) + + if return_tokens: + return new_strings, ext_tokens + else: + return new_strings + + def _extract_tokens_vectorized(self, string: str, pred: NDArray, cutoff: int = 5) -> Tuple[ + str, List[Tuple[str, int, int, str]]]: + """ + Extract tokens from a string using vectorized operations. + + Args: + string: Input string to process + pred: Model predictions for each character + cutoff: Minimum length of tokens to consider + + Returns: + Tuple of (processed_string, extracted_tokens) + """ + if len(string) == 0: + return "", [] + + # Convert predictions to mask labels + mask_array = np.array([self.encodings._label_list[p] for p in pred]) + + # Special handling for numeric characters + numbers = set('0123456789') + string_array = np.array(list(string)) + is_number = np.isin(string_array, list(numbers)) + + # Apply numeric rule: if character is 'C' and is a number, change to 'N' + mask_array[(mask_array == 'C') & is_number] = 'N' + + # Find label transitions + transitions = np.diff(np.concatenate([[0], (mask_array[:-1] != mask_array[1:]).astype(int), [1]])) + start_indices = np.where(transitions == 1)[0] + end_indices = np.where(transitions == -1)[0] + + # Extract tokens + tokens = [] + for i in range(len(start_indices)): + start = start_indices[i] + end = end_indices[i] + label = mask_array[start] + + if label != 'C' and (end - start) > cutoff: + token_text = string[start:end] + token_type = self._get_token_type(label) + if token_type: + tokens.append((token_text, start, end, token_type)) + + # Compose new string with replacements + if not tokens: + return string, [] + + # Use numpy for efficient string composition + segments = [] + last_pos = 0 + + for token in tokens: + if token[1] > last_pos: + segments.append(string[last_pos:token[1]]) + segments.append(token[3]) # Append token type + last_pos = token[2] + + # Add remaining part of string + if last_pos < len(string): + segments.append(string[last_pos:]) + + return ''.join(segments), tokens + + def _get_token_type(self, label: str) -> Optional[str]: + """ + Get token type based on label. + + Args: + label: Label character from model prediction + + Returns: + Token type string or None if label is 'C' (common text) + """ + if label == 'C': + return None + elif label == 'H': + return '' + elif label == 'N': + return '' + elif label == 'I': + return '' + elif label == 'U': + return '' + elif label == 'J': + return '' + return None + + # Legacy methods for backward compatibility + + def _extract_tokens_2class(self, string: str, pred: NDArray) -> Tuple[str, List[Tuple[str, int, int]]]: + """ + Legacy method for extracting tokens with 2-class model (vectorized version). + + Args: + string: Input string to process + pred: Model predictions for each character + + Returns: + Tuple of (processed_string, extracted_tokens) + """ + CUTOFF = 5 + + # Convert predictions to mask + mask_array = np.array([self.encodings._label_list[p] for p in pred]) + + # Find transitions between C and non-C + is_c = mask_array == 'C' + transitions = np.diff(np.concatenate([[False], ~is_c, [False]])) + start_indices = np.where(transitions == 1)[0] + end_indices = np.where(transitions == -1)[0] + + # Extract tokens + tokens = [] + for i in range(len(start_indices)): + start = start_indices[i] + end = end_indices[i] + if end - start > CUTOFF: + tokens.append((string[start:end], start, end)) + + # Compose new string + if not tokens: + return string, [] + + new_str = '' + last_pos = 0 + + for token in tokens: + if token[1] > last_pos: + new_str += string[last_pos:token[1]] + new_str += token[0] + last_pos = token[2] + 1 + + if last_pos < len(string): + new_str += string[last_pos:] + + return new_str, tokens + + def _extract_tokens(self, string: str, pred: NDArray, cutoff: int = 5) -> Tuple[ + str, List[Tuple[str, int, int, str]]]: + """ + Legacy method for extracting tokens (redirects to vectorized version). + + Args: + string: Input string to process + pred: Model predictions for each character + cutoff: Minimum length of tokens to consider + + Returns: + Tuple of (processed_string, extracted_tokens) + """ + return self._extract_tokens_vectorized(string, pred, cutoff) diff --git a/stringlifier/modules/training_improved.py b/stringlifier/modules/training_improved.py new file mode 100644 index 0000000..6229bef --- /dev/null +++ b/stringlifier/modules/training_improved.py @@ -0,0 +1,435 @@ +# +# Copyright (c) 2020 Adobe Systems Incorporated. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Training utilities for stringlifier models with improved vectorization and pipeline support. +""" + +import random +import uuid +import datetime +import jwt +import string +import numpy as np +from typing import List, Tuple, Dict, Any, Optional, Union +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.pipeline import Pipeline + +# Load known words once at module level +try: + with open('corpus/words_alpha.txt') as f: + KNOWN_WORDS = [line.strip() for line in f] + random.shuffle(KNOWN_WORDS) +except FileNotFoundError: + KNOWN_WORDS = [] + +# Global index for cycling through known words +known_index = 0 + +class WordGenerator: + """ + Generator class for creating synthetic words and their corresponding masks. + + This class provides methods to generate different types of synthetic words + (UUIDs, timestamps, random strings, IP addresses, JWT tokens) along with + their corresponding mask labels for training. + """ + + def __init__(self, known_words: List[str]): + """ + Initialize the word generator with a list of known words. + + Args: + known_words: List of known words to use for generation + """ + self.known_words = known_words + self.known_index = 0 + random.shuffle(self.known_words) + + def generate_word(self) -> Tuple[str, str]: + """ + Generate a synthetic word with its corresponding mask. + + Returns: + Tuple containing (generated_word, mask_character) + """ + generated = None + ii = random.randint(0, 5) + mask = 'H' # Default mask for random strings + + if ii == 0: + # UUID + generated = str(uuid.uuid4()) + mask = 'U' + elif ii == 1: + # UUID hex + generated = str(uuid.uuid4().hex) + mask = 'H' + elif ii == 2: + # Numeric + c = random.randint(0, 3) + if c == 0: + generated = str(datetime.datetime.now().timestamp()) + elif c == 1: + generated = str(random.randint(0, 100000000000)) + elif c == 2: + generated = f"{random.randint(0, 999)}.{random.randint(0, 999)}" + else: + generated = f"{random.randint(0, 999)}.{random.randint(0, 9999)}.{random.randint(0, 9999)}" + mask = 'N' + elif ii == 3: + # Random string + N = random.randint(5, 20) + chars = string.ascii_uppercase + string.digits + string.ascii_lowercase + message = ''.join(random.choice(chars) for _ in range(N)) + + i = random.randint(0, 2) + if i == 0: + message = message.lower() + elif i == 1: + message = message.upper() + generated = message + elif ii == 4: + # IP address + toks = [str(random.randint(0, 255)) for _ in range(4)] + generated = '.'.join(toks) + mask = 'I' + elif ii == 5: + # JWT token + generated = self._generate_jwt_token() + mask = 'J' + + return str(generated), mask + + def _generate_jwt_token(self) -> str: + """ + Generate a JWT token for training data. + + Returns: + A string representation of a JWT token + """ + payload = { + "id": str(random.random()), + "client_id": str(random.random()), + "user_id": str(random.random()), + "type": "access_token", + "expires_in": str(random.randint(10, 3600000)), + "scope": "read, write", + "created_at": str(random.randint(1900000, 9000000)) + } + encoded_jwt = jwt.encode(payload, 'secret', algorithm='HS256') + # Handle both string and bytes return types from different jwt versions + if isinstance(encoded_jwt, bytes): + return encoded_jwt.decode('utf-8') + return encoded_jwt + + def get_next_known(self) -> str: + """ + Get the next known word from the list. + + Returns: + A known word from the internal list + """ + if not self.known_words: + return "placeholder" + + s = self.known_words[self.known_index] + self.known_index += 1 + if self.known_index >= len(self.known_words): + self.known_index = 0 + random.shuffle(self.known_words) + return s + + def get_next_generated(self) -> Tuple[str, str]: + """ + Get the next generated word and its mask. + + Returns: + Tuple of (word, mask) + """ + return self.generate_word() + + +class CommandGenerator: + """ + Generator for creating synthetic commands with their corresponding masks. + + This class uses the WordGenerator to create realistic-looking commands + with proper masking for training sequence labeling models. + """ + + def __init__(self, word_generator: WordGenerator): + """ + Initialize the command generator. + + Args: + word_generator: WordGenerator instance to use for word generation + """ + self.word_generator = word_generator + self.delimiters = ' /.,?!~|<>-=_~:;\\+-&*%$#@!' + self.enclosers = '[]{}``""\'\'()' + + def generate_next_command(self) -> Tuple[str, str]: + """ + Generate a synthetic command with its corresponding mask. + + Returns: + Tuple of (command, mask) + """ + mask = '' + cmd = '' + num_words = random.randint(3, 15) + use_space = False + + for _ in range(num_words): + use_delimiter = random.random() > 0.5 + use_encloser = random.random() > 0.8 + case_style = random.randint(0, 2) + use_gen_word = random.random() > 0.7 + del_index = random.randint(0, len(self.delimiters) - 1) + enc_index = random.randint(0, len(self.enclosers) // 2 - 1) * 2 + + if use_space: + mask += 'C' + cmd += ' ' + + if use_gen_word: + wrd, label = self.word_generator.get_next_generated() + if case_style == 1: + wrd = wrd[0].upper() + wrd[1:] if wrd else wrd + elif case_style == 2: + wrd = wrd.upper() + msk = label * len(wrd) # Vectorized mask creation + else: + wrd = self.word_generator.get_next_known() + append_number = random.random() > 0.97 + if append_number: + wrd = wrd + str(random.randint(0, 99)) + if case_style == 1: + wrd = wrd[0].upper() + wrd[1:] if wrd else wrd + elif case_style == 2: + wrd = wrd.upper() + msk = 'C' * len(wrd) # Vectorized mask creation + + if use_delimiter: + wrd = self.delimiters[del_index] + wrd + msk = 'C' + msk + + if use_encloser: + wrd = self.enclosers[enc_index] + wrd + self.enclosers[enc_index + 1] + msk = 'C' + msk + 'C' + + cmd += wrd + mask += msk + use_space = random.random() > 0.7 + + return cmd, mask + + +class DatasetGenerator(BaseEstimator, TransformerMixin): + """ + Scikit-learn compatible transformer for generating synthetic datasets. + + This class implements the scikit-learn transformer interface to enable + integration with scikit-learn pipelines for data generation. + """ + + def __init__(self, size: int = 1000, known_words: Optional[List[str]] = None): + """ + Initialize the dataset generator. + + Args: + size: Number of examples to generate + known_words: List of known words to use (if None, uses module-level KNOWN_WORDS) + """ + self.size = size + self.known_words = known_words if known_words is not None else KNOWN_WORDS + self.word_generator = WordGenerator(self.known_words) + self.command_generator = CommandGenerator(self.word_generator) + + def fit(self, X=None, y=None): + """ + Fit method (does nothing but required for scikit-learn compatibility). + + Returns: + self + """ + return self + + def transform(self, X=None) -> List[Tuple[str, str]]: + """ + Generate a synthetic dataset. + + Args: + X: Ignored, exists for scikit-learn compatibility + + Returns: + List of (command, mask) tuples + """ + return [self.command_generator.generate_next_command() for _ in range(self.size)] + + def fit_transform(self, X=None, y=None) -> List[Tuple[str, str]]: + """ + Fit and transform (generate dataset). + + Args: + X: Ignored, exists for scikit-learn compatibility + y: Ignored, exists for scikit-learn compatibility + + Returns: + List of (command, mask) tuples + """ + self.fit(X, y) + return self.transform(X) + + +class BatchCreator(BaseEstimator, TransformerMixin): + """ + Scikit-learn compatible transformer for creating batches from datasets. + + This class implements the scikit-learn transformer interface to enable + integration with scikit-learn pipelines for batch creation. + """ + + def __init__(self, batch_size: int = 32): + """ + Initialize the batch creator. + + Args: + batch_size: Size of batches to create + """ + self.batch_size = batch_size + + def fit(self, X, y=None): + """ + Fit method (does nothing but required for scikit-learn compatibility). + + Returns: + self + """ + return self + + def transform(self, X: List[Tuple[str, str]]) -> Tuple[List[List[str]], List[List[str]]]: + """ + Create batches from a dataset. + + Args: + X: List of (command, mask) tuples + + Returns: + Tuple of (batched_commands, batched_masks) + """ + random.shuffle(X) + commands, masks = zip(*X) + + # Create batches + batched_commands = [commands[i:i+self.batch_size] for i in range(0, len(commands), self.batch_size)] + batched_masks = [masks[i:i+self.batch_size] for i in range(0, len(masks), self.batch_size)] + + return batched_commands, batched_masks + + +def create_data_pipeline(batch_size: int = 32, dataset_size: int = 1000) -> Pipeline: + """ + Create a scikit-learn pipeline for data generation and batch creation. + + Args: + batch_size: Size of batches to create + dataset_size: Number of examples to generate + + Returns: + Scikit-learn pipeline for data generation and batch creation + """ + return Pipeline([ + ('generate', DatasetGenerator(size=dataset_size)), + ('batch', BatchCreator(batch_size=batch_size)) + ]) + + +# For backward compatibility +def generate_next_cmd() -> Tuple[str, str]: + """ + Legacy function for generating a command and mask (for backward compatibility). + + Returns: + Tuple of (command, mask) + """ + global known_index, KNOWN_WORDS + + word_generator = WordGenerator(KNOWN_WORDS) + command_generator = CommandGenerator(word_generator) + return command_generator.generate_next_command() + + +def _get_next_known() -> str: + """ + Legacy function for getting the next known word (for backward compatibility). + + Returns: + A known word + """ + global known_index, KNOWN_WORDS + + if not KNOWN_WORDS: + return "placeholder" + + s = KNOWN_WORDS[known_index] + known_index += 1 + if known_index >= len(KNOWN_WORDS): + known_index = 0 + random.shuffle(KNOWN_WORDS) + return s + + +def _get_next_gen() -> Tuple[str, str]: + """ + Legacy function for getting the next generated word (for backward compatibility). + + Returns: + Tuple of (word, mask) + """ + global KNOWN_WORDS + + word_generator = WordGenerator(KNOWN_WORDS) + return word_generator.generate_word() + + +def _generate_word(known_words: List[str]) -> Tuple[str, str]: + """ + Legacy function for generating a word (for backward compatibility). + + Args: + known_words: List of known words + + Returns: + Tuple of (word, mask) + """ + word_generator = WordGenerator(known_words) + return word_generator.generate_word() + + +def _generate_JWT_token(known_words: List[str]) -> str: + """ + Legacy function for generating a JWT token (for backward compatibility). + + Args: + known_words: List of known words (unused) + + Returns: + JWT token string + """ + word_generator = WordGenerator([]) + return word_generator._generate_jwt_token()