From c4cd8215b2466eb8677d66ad406d321719c7dfea Mon Sep 17 00:00:00 2001 From: Clemente Date: Fri, 10 Oct 2025 10:08:38 +0000 Subject: [PATCH 01/12] quantization update --- genlm/backend/llm/hf.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/genlm/backend/llm/hf.py b/genlm/backend/llm/hf.py index e736774..576c945 100644 --- a/genlm/backend/llm/hf.py +++ b/genlm/backend/llm/hf.py @@ -104,8 +104,11 @@ def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): _hf_opts.update(hf_opts) tok = AutoTokenizer.from_pretrained(model_id) + model_kwargs = _hf_opts + if bnb_config: + model_kwargs["quantization_config"] = bnb_config # pass the bnb configuration as an hf parameter mod = AutoModelForCausalLM.from_pretrained( - model_id, quantization_config=bnb_config, **_hf_opts + model_id, **model_kwargs ) return cls(mod, tok, **kwargs) From 9e163ffbe626f11ed378b8f8991506103de2a3a4 Mon Sep 17 00:00:00 2001 From: Clemente Date: Fri, 17 Oct 2025 11:01:06 +0000 Subject: [PATCH 02/12] V1 backend --- genlm/backend/llm/__init__.py | 2 +- genlm/backend/llm/hf.py | 47 ++- genlm/backend/llm/{vllm.py => vllmV0.py} | 0 genlm/backend/llm/vllmV1.py | 363 +++++++++++++++++++++++ notes/Untitled.ipynb | 44 +++ notes/playground.ipynb | 0 6 files changed, 451 insertions(+), 5 deletions(-) rename genlm/backend/llm/{vllm.py => vllmV0.py} (100%) create mode 100644 genlm/backend/llm/vllmV1.py create mode 100644 notes/Untitled.ipynb create mode 100644 notes/playground.ipynb diff --git a/genlm/backend/llm/__init__.py b/genlm/backend/llm/__init__.py index 13373c8..0afb112 100644 --- a/genlm/backend/llm/__init__.py +++ b/genlm/backend/llm/__init__.py @@ -1,4 +1,4 @@ -from genlm.backend.llm.vllm import AsyncVirtualLM +from genlm.backend.llm.vllmV1 import AsyncVirtualLM from genlm.backend.llm.hf import AsyncTransformer from genlm.backend.llm.base import AsyncLM, MockAsyncLM diff --git a/genlm/backend/llm/hf.py b/genlm/backend/llm/hf.py index 576c945..45a1bf5 100644 --- a/genlm/backend/llm/hf.py +++ b/genlm/backend/llm/hf.py @@ -104,15 +104,54 @@ def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): _hf_opts.update(hf_opts) tok = AutoTokenizer.from_pretrained(model_id) - model_kwargs = _hf_opts - if bnb_config: - model_kwargs["quantization_config"] = bnb_config # pass the bnb configuration as an hf parameter mod = AutoModelForCausalLM.from_pretrained( - model_id, **model_kwargs + model_id, quantization_config=bnb_config, **_hf_opts ) return cls(mod, tok, **kwargs) + # @classmethod + # def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): + # """Create an AsyncTransformer instance from a pretrained HuggingFace model. + + # Args: + # model_id (str): Model identifier in HuggingFace's model hub. + # bitsandbytes_opts (dict, optional): Additional configuration options for bitsandbytes quantization. + # Defaults to None. + # hf_opts (dict, optional): Additional configuration options for loading the HuggingFace model. + # Defaults to None. + # **kwargs: Additional arguments passed to the `AsyncTransformer` constructor + + # Returns: + # (AsyncTransformer): An initialized `AsyncTransformer` instance. + # """ + # if bitsandbytes_opts: + # bnb_config = BitsAndBytesConfig(**bitsandbytes_opts) + # else: + # bnb_config = None + + # _hf_opts = { + # "device_map": "auto", + # "torch_dtype": "auto", + # } + # if hf_opts: + # _hf_opts.update(hf_opts) + + # tok = AutoTokenizer.from_pretrained(model_id) + # # model_kwargs = _hf_opts + # # if bnb_config: + # # model_kwargs["quantization_config"] = bnb_config # pass the bnb configuration as an hf parameter + # # mod = AutoModelForCausalLM.from_pretrained( + # # model_id, **model_kwargs + # # ) + # mod = AutoModelForCausalLM.from_pretrained( + # model_id, quantization_config=bnb_config, **_hf_opts + # ) + + + + # return cls(mod, tok, **kwargs) + @torch.no_grad() def __init__(self, hf_model, hf_tokenizer, batch_size=20, timeout=0.02): """Initialize an AsyncTransformer instance. diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllmV0.py similarity index 100% rename from genlm/backend/llm/vllm.py rename to genlm/backend/llm/vllmV0.py diff --git a/genlm/backend/llm/vllmV1.py b/genlm/backend/llm/vllmV1.py new file mode 100644 index 0000000..24f03e4 --- /dev/null +++ b/genlm/backend/llm/vllmV1.py @@ -0,0 +1,363 @@ +import torch +import logging +import warnings + +from genlm.backend.llm.base import AsyncLM +from genlm.backend.cache import OutputCache + +try: + # Import vLLM - need to handle potential shadowing by local vllm.py file + import sys + import importlib.util + + # Find the vLLM package (not the local vllm.py file) + vllm_spec = importlib.util.find_spec("vllm") + if vllm_spec and vllm_spec.origin and not vllm_spec.origin.endswith("vllm.py"): + # This is the real vLLM package + import vllm + AsyncLLMEngine = vllm.AsyncLLMEngine + SamplingParams = vllm.SamplingParams + + from vllm.engine.arg_utils import AsyncEngineArgs + from vllm.utils import Counter + from vllm.inputs import TokensPrompt + from vllm.distributed.parallel_state import ( + destroy_model_parallel, + destroy_distributed_environment, + ) + HAS_VLLM = True + else: + # vLLM package not found or shadowed + raise ImportError("vLLM package not found or shadowed by local vllm.py") + +except (ImportError, AttributeError) as e: # pragma: no cover + HAS_VLLM = False # pragma: no cover + import sys + print(f"WARNING: vLLM import failed: {e}", file=sys.stderr) + +if not HAS_VLLM: + + class AsyncVirtualLM: # pragma: no cover + """Placeholder class when vLLM is not installed.""" + + def __init__(self, *args, **kwargs): # pragma: no cover + raise ImportError( + "vLLM is not installed. Please install it with 'pip install vllm' " + "to use the vLLM-based AsyncLM model." + ) + + @classmethod + def from_name(cls, *args, **kwargs): # pragma: no cover + raise ImportError( + "vLLM is not installed. Please install it with 'pip install vllm' " + "to use the vLLM-based AsyncLM model." + ) + +else: + logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) + + + class AsyncVirtualLM(AsyncLM): + default_params = { + "max_tokens": 1, + "n": 1, + "detokenize": False, + "stop": None, + "ignore_eos": True, + "logprobs": 256, # Request logprobs for top 1000 tokens + } + + def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logits_processors=None): + """Initialize an `AsyncVirtualLM` instance. + + Args: + async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. + cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. + cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. + + Note: + The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. + """ + self.async_llm_engine = async_llm_engine + # Wrap v1 tokenizer to be compatible with base class + self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) + self.request_counter = Counter() + # Store logits processors for compatibility (not used in v1) + self._logits_processors = list(logits_processors or []) + self.cache = ( + OutputCache(maxsize=cache_size, **cache_opts) + if cache_size > 0 + else None + ) + + async_llm_engine.log_stats = False + + super().__init__(tokenizer=self.tokenizer) + + def _wrap_tokenizer(self, tokenizer): + """Wrap v1 tokenizer to be compatible with base class expectations.""" + class TokenizerWrapper: + def __init__(self, tokenizer): + # Access the underlying tokenizer from TokenizerGroup + self._tokenizer = getattr(tokenizer, 'tokenizer', tokenizer) + # Add compatibility attributes + self.is_fast = True # Assume fast tokenizer for v1 + self.name_or_path = getattr(self._tokenizer, 'name_or_path', 'unknown') + + def __getattr__(self, name): + return getattr(self._tokenizer, name) + + def __len__(self): + return len(self._tokenizer) + + return TokenizerWrapper(tokenizer) + + @classmethod + def from_name(cls, model_name, engine_opts=None, **kwargs): + """Create a `AsyncVirtualLM` instance from a model name. + + Args: + model_name (str): Name of the model to load. + engine_opts (dict): Additional options to pass to the `AsyncLLMEngine`. The engine will be + configured with prefix caching enabled and async output processing disabled by default. + **kwargs: Additional arguments passed to `AsyncVirtualLM` constructor. + + Returns: + (AsyncVirtualLM): An `AsyncVirtualLM` instance. + """ + if not HAS_VLLM: + raise ImportError( # pragma: no cover + "vLLM not available. Install vLLM or use AsyncTransformer instead." + ) + + if engine_opts is not None and "enable_chunked_prefill" in engine_opts: + if engine_opts["enable_chunked_prefill"]: + warnings.warn( # pragma: no cover + "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " + "custom sampling functionality." + ) + + engine_opts = { + "enable_prefix_caching": True, + "disable_log_requests": True, + "gpu_memory_utilization": 0.3, # Reduce GPU memory usage + "max_model_len": 512, # Reduce max sequence length + "max_logprobs": 1000, # Allow up to 1000 logprobs per token + # "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. + **(engine_opts or {}), + } + + engine = AsyncLLMEngine.from_engine_args( + AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) + ) + + return cls(engine, **kwargs) + + @property + def underlying_model(self): + raise NotImplementedError + + @property + def logits_processors(self): + return self._logits_processors + + async def next_token_logprobs(self, token_ids): + """Request log probabilities of next token asynchronously with output caching. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + result (torch.Tensor): Normalized log probability tensor. + """ + # Handle string input properly + if isinstance(token_ids, str): + key = token_ids + else: + key = tuple(token_ids) + + if self.cache is not None and key in self.cache: + return self.cache[key] + + result = await self._next_token_logprobs(key) + + if self.cache is not None: + self.cache[key] = result + + return result + + async def _next_token_logprobs(self, token_ids): + """Request log probabilities of next token asynchronously. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + (torch.Tensor): Normalized log probability tensor. + """ + req_id = str(next(self.request_counter)) + print(f"request id: {req_id}") + # For v1, use string prompt directly instead of TokensPrompt + if isinstance(token_ids, str): + prompt = token_ids + else: + # Convert token IDs back to string for v1 compatibility + prompt = self.tokenizer.decode(token_ids) + + outputs = [] + async for output in self.async_llm_engine.generate( + prompt=prompt, + sampling_params=SamplingParams(**self.default_params), + request_id=req_id, + ): + if output.finished: + outputs.append(output) + + if not outputs: + raise RuntimeError("No outputs generated") + + # Extract logprobs from the output + # v1 provides logprobs in the output when logprobs parameter is set + output = outputs[0].outputs[0] + logprobs = output.logprobs + + assert logprobs + print(f"shape of logprobs before: {len(logprobs[0]),type(logprobs)}") + # v1 logprobs format: list of dicts with token_id -> logprob + # With max_logprobs=1000, we get many more logprobs + vocab_size = len(self.tokenizer) + logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) + + if isinstance(logprobs, list) and len(logprobs) > 0: + # Get the first (and only) token's logprobs + token_logprobs = logprobs[0] + if isinstance(token_logprobs, dict): + for token_id, logprob in token_logprobs.items(): + print(f"token_id: {token_id}, logprob: {logprob}") + if isinstance(token_id, int) and 0 <= token_id < vocab_size: + # Extract the actual logprob value from the Logprob object + if hasattr(logprob, 'logprob'): + logprobs_tensor[0, token_id] = logprob.logprob + else: + logprobs_tensor[0, token_id] = float(logprob) + + #Distribute the remaining mass across the tokens that are not in the top-k + non_inf_mask = logprobs_tensor[0] != -float('inf') + if non_inf_mask.sum() > 0: + # Get the logprobs for the top-k tokens + top_logprobs = logprobs_tensor[0][non_inf_mask] + + # Calculate the remaining probability mass + remaining_prob = 1.0 - torch.exp(top_logprobs).sum().item() + if remaining_prob > 0: + # Distribute the remaining probability uniformly among remaining tokens + remaining_tokens = (~non_inf_mask).sum().item() + if remaining_tokens > 0: + uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) + logprobs_tensor[0][~non_inf_mask] = uniform_logprob + + logprobs = logprobs_tensor + + return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) + + def next_token_logprobs_sync(self, token_ids): + """Request log probabilities of next token synchronously. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + (torch.Tensor): Normalized log probability tensor. + """ + import asyncio + return asyncio.run(self.next_token_logprobs(token_ids)) + + async def batch_next_token_logprobs(self, token_ids_list): + """ + Request log probabilities of next tokens in a batch asynchronously. + + Args: + token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. + + Returns: + (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. + """ + # Handle empty batch + if not token_ids_list: + return torch.empty((0, len(self.tokenizer)), dtype=torch.float32) + + # Use the base class implementation + return await super().batch_next_token_logprobs(token_ids_list) + + def batch_next_token_logprobs_sync(self, token_ids_list): + """ + Request log probabilities of next tokens in a batch synchronously. + + Args: + token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. + + Returns: + (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. + """ + # Handle empty batch + if not token_ids_list: + return torch.empty((0, len(self.tokenizer)), dtype=torch.float32) + + # Use the base class implementation + return super().batch_next_token_logprobs_sync(token_ids_list) + + def clear_cache(self): + """Clear output cache.""" + if self.cache: + self.cache.clear() + + def __del__(self): + """Clean up resources on deletion.""" + self._cleanup_engine() + + def _cleanup_engine(self): + """Clean up the vLLM engine and associated resources.""" + if async_engine := getattr(self, "async_llm", None): + async_engine.shutdown() + destroy_model_parallel() + destroy_distributed_environment() + + async def sample( + self, + prompt_token_ids, + max_tokens, + eos_token_ids, + temperature=1.0, + seed=None, + ): + """Sample from the language model. + + Args: + prompt_token_ids (list[int]): The token IDs of the prompt. + eos_token_ids (list[int]): The token IDs of the end-of-sequence tokens. + temperature (float, optional): The temperature to use to rescale the logits. Defaults to 1.0. + max_tokens (int): The maximum number of tokens to generate. + seed (int, optional): The seed for the random number generator. Defaults to None. + + Returns: + (list[int]): The sampled token IDs. + """ + async for output in self.async_llm_engine.generate( + prompt=TokensPrompt(prompt_token_ids=prompt_token_ids), + sampling_params=SamplingParams( + n=1, + max_tokens=max_tokens, + temperature=temperature, + seed=seed, + stop=[self.byte_vocab[i].decode() for i in eos_token_ids], + ), + request_id=str(next(self.request_counter)), + ): + if output.finished: + assert len(output.outputs) == 1, ( + "Expected exactly one sequence group" + ) + token_ids = list(output.outputs[0].token_ids) + if token_ids[-1] in eos_token_ids: + token_ids = token_ids[:-1] + return token_ids diff --git a/notes/Untitled.ipynb b/notes/Untitled.ipynb new file mode 100644 index 0000000..940d413 --- /dev/null +++ b/notes/Untitled.ipynb @@ -0,0 +1,44 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "id": "21839ddb-6090-4866-beb4-b24a7c2f4e80", + "metadata": {}, + "outputs": [], + "source": [ + "import genlm\n", + "import numpy as np" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca6090bf-5ef9-45ee-8e15-12c0c9399422", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python genlm", + "language": "python", + "name": "genlm" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notes/playground.ipynb b/notes/playground.ipynb new file mode 100644 index 0000000..e69de29 From b446b35b7775cda882e079673ec569d2f6e43f69 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 20 Oct 2025 16:06:49 +0000 Subject: [PATCH 03/12] Polishing V1 --- genlm/backend/llm/__init__.py | 2 +- genlm/backend/llm/{vllmV1.py => vllm.py} | 336 +++++++++++++++++++---- genlm/backend/llm/vllmV0.py | 287 ------------------- 3 files changed, 288 insertions(+), 337 deletions(-) rename genlm/backend/llm/{vllmV1.py => vllm.py} (52%) delete mode 100644 genlm/backend/llm/vllmV0.py diff --git a/genlm/backend/llm/__init__.py b/genlm/backend/llm/__init__.py index 0afb112..13373c8 100644 --- a/genlm/backend/llm/__init__.py +++ b/genlm/backend/llm/__init__.py @@ -1,4 +1,4 @@ -from genlm.backend.llm.vllmV1 import AsyncVirtualLM +from genlm.backend.llm.vllm import AsyncVirtualLM from genlm.backend.llm.hf import AsyncTransformer from genlm.backend.llm.base import AsyncLM, MockAsyncLM diff --git a/genlm/backend/llm/vllmV1.py b/genlm/backend/llm/vllm.py similarity index 52% rename from genlm/backend/llm/vllmV1.py rename to genlm/backend/llm/vllm.py index 24f03e4..a9d70ae 100644 --- a/genlm/backend/llm/vllmV1.py +++ b/genlm/backend/llm/vllm.py @@ -6,34 +6,24 @@ from genlm.backend.cache import OutputCache try: - # Import vLLM - need to handle potential shadowing by local vllm.py file - import sys - import importlib.util - - # Find the vLLM package (not the local vllm.py file) - vllm_spec = importlib.util.find_spec("vllm") - if vllm_spec and vllm_spec.origin and not vllm_spec.origin.endswith("vllm.py"): - # This is the real vLLM package - import vllm - AsyncLLMEngine = vllm.AsyncLLMEngine - SamplingParams = vllm.SamplingParams - + from vllm import AsyncLLMEngine, SamplingParams + from vllm.utils import Counter + from vllm.inputs import TokensPrompt + from vllm import envs + + if envs.VLLM_USE_V1: #The import is different iin V1 and V0 from vllm.engine.arg_utils import AsyncEngineArgs - from vllm.utils import Counter - from vllm.inputs import TokensPrompt - from vllm.distributed.parallel_state import ( - destroy_model_parallel, - destroy_distributed_environment, - ) - HAS_VLLM = True else: - # vLLM package not found or shadowed - raise ImportError("vLLM package not found or shadowed by local vllm.py") + from vllm import AsyncEngineArgs + + from vllm.distributed.parallel_state import ( + destroy_model_parallel, + destroy_distributed_environment, + ) -except (ImportError, AttributeError) as e: # pragma: no cover + HAS_VLLM = True +except ImportError: # pragma: no cover HAS_VLLM = False # pragma: no cover - import sys - print(f"WARNING: vLLM import failed: {e}", file=sys.stderr) if not HAS_VLLM: @@ -53,10 +43,10 @@ def from_name(cls, *args, **kwargs): # pragma: no cover "to use the vLLM-based AsyncLM model." ) -else: +elif envs.VLLM_USE_V1: #If V1 + logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) - class AsyncVirtualLM(AsyncLM): default_params = { "max_tokens": 1, @@ -228,36 +218,26 @@ async def _next_token_logprobs(self, token_ids): vocab_size = len(self.tokenizer) logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) - if isinstance(logprobs, list) and len(logprobs) > 0: - # Get the first (and only) token's logprobs - token_logprobs = logprobs[0] - if isinstance(token_logprobs, dict): - for token_id, logprob in token_logprobs.items(): - print(f"token_id: {token_id}, logprob: {logprob}") - if isinstance(token_id, int) and 0 <= token_id < vocab_size: - # Extract the actual logprob value from the Logprob object - if hasattr(logprob, 'logprob'): - logprobs_tensor[0, token_id] = logprob.logprob - else: - logprobs_tensor[0, token_id] = float(logprob) + for token_id, logprob in logprobs[0].items(): + print(f"token_id: {token_id}, logprob: {logprob}") + # if isinstance(token_id, int) and 0 <= token_id < vocab_size: + # Extract the actual logprob value from the Logprob object + if hasattr(logprob, 'logprob'): + logprobs_tensor[0, token_id] = logprob.logprob + else: + logprobs_tensor[0, token_id] = float(logprob) #Distribute the remaining mass across the tokens that are not in the top-k - non_inf_mask = logprobs_tensor[0] != -float('inf') + non_inf_mask = logprobs_tensor[0] != -float('inf') # create boolean non-inf mask if non_inf_mask.sum() > 0: # Get the logprobs for the top-k tokens top_logprobs = logprobs_tensor[0][non_inf_mask] - - # Calculate the remaining probability mass remaining_prob = 1.0 - torch.exp(top_logprobs).sum().item() - if remaining_prob > 0: - # Distribute the remaining probability uniformly among remaining tokens - remaining_tokens = (~non_inf_mask).sum().item() - if remaining_tokens > 0: - uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) - logprobs_tensor[0][~non_inf_mask] = uniform_logprob - + remaining_tokens = (~non_inf_mask).sum().item() + uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) + logprobs_tensor[0][~non_inf_mask] = uniform_logprob + logprobs = logprobs_tensor - return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) def next_token_logprobs_sync(self, token_ids): @@ -322,6 +302,264 @@ def _cleanup_engine(self): destroy_model_parallel() destroy_distributed_environment() + async def sample( + self, + prompt_token_ids, + max_tokens, + eos_token_ids, + temperature=1.0, + seed=None, + ): + """Sample from the language model. + + Args: + prompt_token_ids (list[int]): The token IDs of the prompt. + eos_token_ids (list[int]): The token IDs of the end-of-sequence tokens. + temperature (float, optional): The temperature to use to rescale the logits. Defaults to 1.0. + max_tokens (int): The maximum number of tokens to generate. + seed (int, optional): The seed for the random number generator. Defaults to None. + + Returns: + (list[int]): The sampled token IDs. + """ + + if isinstance(prompt_token_ids, list): + prompt_token_ids = self.tokenizer.decode(prompt_token_ids) + elif isinstance(prompt_token_ids, str): + pass + else: + raise f"Invalid prompt_ids_Type: {type(prompt_token_ids)}" + + async for output in self.async_llm_engine.generate( + prompt=prompt_token_ids, + sampling_params=SamplingParams( + n=1, + max_tokens=max_tokens, + temperature=temperature, + seed=seed, + stop=[self.byte_vocab[i].decode() for i in eos_token_ids], + ), + request_id=str(next(self.request_counter)), + ): + if output.finished: + assert len(output.outputs) == 1, ( + "Expected exactly one sequence group" + ) + token_ids = list(output.outputs[0].token_ids) + if token_ids[-1] in eos_token_ids: + token_ids = token_ids[:-1] + return token_ids + +else: #Otherwise use V0 + + logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) + + class PassThroughLogitsProcessor: + """A logits processor that stores the logprobs and passes the logits through.""" + + def __init__(self): + self.log_probs = None + + def __call__(self, past_token_ids, logits): + assert self.log_probs is None, ( + "Log probs already set. This should never happen." + ) + self.log_probs = torch.log_softmax(logits, dim=-1, dtype=logits.dtype) + return logits + + class AsyncVirtualLM(AsyncLM): + default_params = { + "max_tokens": 1, + "n": 1, + "detokenize": False, + "stop": None, + "ignore_eos": True, + } + + def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): + """Initialize an `AsyncVirtualLM` instance. + + Args: + async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. + cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. + cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. + + Note: + The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. + """ + self.async_llm_engine = async_llm_engine + self.tokenizer = async_llm_engine.engine.get_tokenizer() + self.request_counter = Counter() + self.cache = ( + OutputCache(maxsize=cache_size, **cache_opts) + if cache_size > 0 + else None + ) + + async_llm_engine.engine.log_stats = False + + super().__init__(tokenizer=self.tokenizer) + + @classmethod + def from_name(cls, model_name, engine_opts=None, **kwargs): + """Create a `AsyncVirtualLM` instance from a model name. + + Args: + model_name (str): Name of the model to load. + engine_opts (dict): Additional options to pass to the `AsyncLLMEngine`. The engine will be + configured with prefix caching enabled and async output processing disabled by default. + **kwargs: Additional arguments passed to `AsyncVirtualLM` constructor. + + Returns: + (AsyncVirtualLM): An `AsyncVirtualLM` instance. + """ + if not HAS_VLLM: + raise ImportError( # pragma: no cover + "vLLM not available. Install vLLM or use AsyncTransformer instead." + ) + + if engine_opts is not None and "enable_chunked_prefill" in engine_opts: + if engine_opts["enable_chunked_prefill"]: + warnings.warn( # pragma: no cover + "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " + "custom sampling functionality." + ) + + engine_opts = { + "enable_prefix_caching": True, + "disable_log_requests": True, + "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. + **(engine_opts or {}), + } + + engine = AsyncLLMEngine.from_engine_args( + AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) + ) + + return cls(engine, **kwargs) + + @property + def underlying_model(self): + return self.async_llm_engine.engine.model_executor.driver_worker.model_runner.model + + async def next_token_logprobs(self, token_ids): + """Request log probabilities of next token asynchronously with output caching. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + result (torch.Tensor): Normalized log probability tensor. + + Warning: + Do not use `asyncio.run(next_token_logprobs())` as it may interfere with vLLM's background loop. + For synchronous usage, use the `next_token_logprobs_sync()` method instead. + """ + key = tuple(token_ids) + + if self.cache is not None and key in self.cache: + return self.cache[key] + + result = await self._next_token_logprobs(key) + + if self.cache is not None: + self.cache[key] = result + + return result + + async def _next_token_logprobs(self, token_ids): + """Request log probabilities of next token asynchronously. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + (torch.Tensor): Normalized log probability tensor. + """ + req_id = str(next(self.request_counter)) + prompt = TokensPrompt(prompt_token_ids=token_ids) + + outputs = [] + processor = PassThroughLogitsProcessor() + async for output in self.async_llm_engine.generate( + prompt=prompt, + sampling_params=SamplingParams( + **self.default_params, logits_processors=[processor] + ), + request_id=req_id, + ): + if output.finished: + outputs.append(output) + + assert processor.log_probs is not None, ( + "Log probs should be set by the logits processor." + ) + return processor.log_probs + + def next_token_logprobs_sync(self, token_ids): + """Request log probabilities of next token synchronously. + + Args: + token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + + Returns: + (torch.Tensor): Normalized log probability tensor. + """ + return self.batch_next_token_logprobs_sync([token_ids])[0] + + def batch_next_token_logprobs_sync(self, token_ids_list): + """ + Request log probabilities of next tokens in a batch synchronously. + + Args: + token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. + + Returns: + (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. + """ + req_ids = [] + req_id2processors = {} + for token_ids in token_ids_list: + req_id = str(next(self.request_counter)) + req_ids.append(req_id) + processor = PassThroughLogitsProcessor() + req_id2processors[req_id] = processor + self.async_llm_engine.engine.add_request( + prompt=TokensPrompt(prompt_token_ids=token_ids), + params=SamplingParams( + **self.default_params, logits_processors=[processor] + ), + request_id=req_id, + ) + + while self.async_llm_engine.engine.has_unfinished_requests(): + output = self.async_llm_engine.engine.step() + for out in output: + if out.finished: + assert out.request_id in req_id2processors, ( + f"{out.request_id} not in requested IDs" + ) + + return torch.stack( + [req_id2processors[req_id].log_probs for req_id in req_ids] + ) + + def clear_cache(self): + """Clear output cache.""" + if self.cache: + self.cache.clear() + + def __del__(self): + """Clean up resources on deletion.""" + self._cleanup_engine() + + def _cleanup_engine(self): + """Clean up the vLLM engine and associated resources.""" + if async_engine := getattr(self, "async_llm_engine", None): + async_engine.shutdown_background_loop() + destroy_model_parallel() + destroy_distributed_environment() + async def sample( self, prompt_token_ids, diff --git a/genlm/backend/llm/vllmV0.py b/genlm/backend/llm/vllmV0.py deleted file mode 100644 index ba424e8..0000000 --- a/genlm/backend/llm/vllmV0.py +++ /dev/null @@ -1,287 +0,0 @@ -import torch -import logging -import warnings - -from genlm.backend.llm.base import AsyncLM -from genlm.backend.cache import OutputCache - -try: - from vllm import AsyncLLMEngine, SamplingParams, AsyncEngineArgs - from vllm.utils import Counter - from vllm.inputs import TokensPrompt - - from vllm.distributed.parallel_state import ( - destroy_model_parallel, - destroy_distributed_environment, - ) - - HAS_VLLM = True -except ImportError: # pragma: no cover - HAS_VLLM = False # pragma: no cover - -if not HAS_VLLM: - - class AsyncVirtualLM: # pragma: no cover - """Placeholder class when vLLM is not installed.""" - - def __init__(self, *args, **kwargs): # pragma: no cover - raise ImportError( - "vLLM is not installed. Please install it with 'pip install vllm' " - "to use the vLLM-based AsyncLM model." - ) - - @classmethod - def from_name(cls, *args, **kwargs): # pragma: no cover - raise ImportError( - "vLLM is not installed. Please install it with 'pip install vllm' " - "to use the vLLM-based AsyncLM model." - ) - -else: - logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) - - class PassThroughLogitsProcessor: - """A logits processor that stores the logprobs and passes the logits through.""" - - def __init__(self): - self.log_probs = None - - def __call__(self, past_token_ids, logits): - assert self.log_probs is None, ( - "Log probs already set. This should never happen." - ) - self.log_probs = torch.log_softmax(logits, dim=-1, dtype=logits.dtype) - return logits - - class AsyncVirtualLM(AsyncLM): - default_params = { - "max_tokens": 1, - "n": 1, - "detokenize": False, - "stop": None, - "ignore_eos": True, - } - - def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): - """Initialize an `AsyncVirtualLM` instance. - - Args: - async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. - cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. - cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. - - Note: - The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. - """ - self.async_llm_engine = async_llm_engine - self.tokenizer = async_llm_engine.engine.get_tokenizer() - self.request_counter = Counter() - self.cache = ( - OutputCache(maxsize=cache_size, **cache_opts) - if cache_size > 0 - else None - ) - - async_llm_engine.engine.log_stats = False - - super().__init__(tokenizer=self.tokenizer) - - @classmethod - def from_name(cls, model_name, engine_opts=None, **kwargs): - """Create a `AsyncVirtualLM` instance from a model name. - - Args: - model_name (str): Name of the model to load. - engine_opts (dict): Additional options to pass to the `AsyncLLMEngine`. The engine will be - configured with prefix caching enabled and async output processing disabled by default. - **kwargs: Additional arguments passed to `AsyncVirtualLM` constructor. - - Returns: - (AsyncVirtualLM): An `AsyncVirtualLM` instance. - """ - if not HAS_VLLM: - raise ImportError( # pragma: no cover - "vLLM not available. Install vLLM or use AsyncTransformer instead." - ) - - if engine_opts is not None and "enable_chunked_prefill" in engine_opts: - if engine_opts["enable_chunked_prefill"]: - warnings.warn( # pragma: no cover - "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " - "custom sampling functionality." - ) - - engine_opts = { - "enable_prefix_caching": True, - "disable_log_requests": True, - "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. - **(engine_opts or {}), - } - - engine = AsyncLLMEngine.from_engine_args( - AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) - ) - - return cls(engine, **kwargs) - - @property - def underlying_model(self): - return self.async_llm_engine.engine.model_executor.driver_worker.model_runner.model - - async def next_token_logprobs(self, token_ids): - """Request log probabilities of next token asynchronously with output caching. - - Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. - - Returns: - result (torch.Tensor): Normalized log probability tensor. - - Warning: - Do not use `asyncio.run(next_token_logprobs())` as it may interfere with vLLM's background loop. - For synchronous usage, use the `next_token_logprobs_sync()` method instead. - """ - key = tuple(token_ids) - - if self.cache is not None and key in self.cache: - return self.cache[key] - - result = await self._next_token_logprobs(key) - - if self.cache is not None: - self.cache[key] = result - - return result - - async def _next_token_logprobs(self, token_ids): - """Request log probabilities of next token asynchronously. - - Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. - - Returns: - (torch.Tensor): Normalized log probability tensor. - """ - req_id = str(next(self.request_counter)) - prompt = TokensPrompt(prompt_token_ids=token_ids) - - outputs = [] - processor = PassThroughLogitsProcessor() - async for output in self.async_llm_engine.generate( - prompt=prompt, - sampling_params=SamplingParams( - **self.default_params, logits_processors=[processor] - ), - request_id=req_id, - ): - if output.finished: - outputs.append(output) - - assert processor.log_probs is not None, ( - "Log probs should be set by the logits processor." - ) - return processor.log_probs - - def next_token_logprobs_sync(self, token_ids): - """Request log probabilities of next token synchronously. - - Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. - - Returns: - (torch.Tensor): Normalized log probability tensor. - """ - return self.batch_next_token_logprobs_sync([token_ids])[0] - - def batch_next_token_logprobs_sync(self, token_ids_list): - """ - Request log probabilities of next tokens in a batch synchronously. - - Args: - token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. - - Returns: - (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. - """ - req_ids = [] - req_id2processors = {} - for token_ids in token_ids_list: - req_id = str(next(self.request_counter)) - req_ids.append(req_id) - processor = PassThroughLogitsProcessor() - req_id2processors[req_id] = processor - self.async_llm_engine.engine.add_request( - prompt=TokensPrompt(prompt_token_ids=token_ids), - params=SamplingParams( - **self.default_params, logits_processors=[processor] - ), - request_id=req_id, - ) - - while self.async_llm_engine.engine.has_unfinished_requests(): - output = self.async_llm_engine.engine.step() - for out in output: - if out.finished: - assert out.request_id in req_id2processors, ( - f"{out.request_id} not in requested IDs" - ) - - return torch.stack( - [req_id2processors[req_id].log_probs for req_id in req_ids] - ) - - def clear_cache(self): - """Clear output cache.""" - if self.cache: - self.cache.clear() - - def __del__(self): - """Clean up resources on deletion.""" - self._cleanup_engine() - - def _cleanup_engine(self): - """Clean up the vLLM engine and associated resources.""" - if async_engine := getattr(self, "async_llm_engine", None): - async_engine.shutdown_background_loop() - destroy_model_parallel() - destroy_distributed_environment() - - async def sample( - self, - prompt_token_ids, - max_tokens, - eos_token_ids, - temperature=1.0, - seed=None, - ): - """Sample from the language model. - - Args: - prompt_token_ids (list[int]): The token IDs of the prompt. - eos_token_ids (list[int]): The token IDs of the end-of-sequence tokens. - temperature (float, optional): The temperature to use to rescale the logits. Defaults to 1.0. - max_tokens (int): The maximum number of tokens to generate. - seed (int, optional): The seed for the random number generator. Defaults to None. - - Returns: - (list[int]): The sampled token IDs. - """ - async for output in self.async_llm_engine.generate( - prompt=TokensPrompt(prompt_token_ids=prompt_token_ids), - sampling_params=SamplingParams( - n=1, - max_tokens=max_tokens, - temperature=temperature, - seed=seed, - stop=[self.byte_vocab[i].decode() for i in eos_token_ids], - ), - request_id=str(next(self.request_counter)), - ): - if output.finished: - assert len(output.outputs) == 1, ( - "Expected exactly one sequence group" - ) - token_ids = list(output.outputs[0].token_ids) - if token_ids[-1] in eos_token_ids: - token_ids = token_ids[:-1] - return token_ids From df8f78996a4460043b5a8d16472bac043d961468 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 27 Oct 2025 17:41:32 +0000 Subject: [PATCH 04/12] Updates to support V1. updated pyproject.toml to support vllm 10.2, whch is necessary for gpt-oss. --- genlm/backend/llm/vllm.py | 91 +++++++++++++-------------------------- pyproject.toml | 2 +- 2 files changed, 30 insertions(+), 63 deletions(-) diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index a9d70ae..f06f889 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -1,3 +1,7 @@ +FORCE_V0 = True #Currently, we force thw model to use V0, to switch to V1 simply set this to False +LOGPROBS_PER_REQUEST = 256 #These are th elogprobs that are retrieved currently in V1 + +from syslog import LOG_PERROR import torch import logging import warnings @@ -43,7 +47,7 @@ def from_name(cls, *args, **kwargs): # pragma: no cover "to use the vLLM-based AsyncLM model." ) -elif envs.VLLM_USE_V1: #If V1 +elif envs.VLLM_USE_V1 and not FORCE_V0: #If V1 logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) @@ -54,12 +58,12 @@ class AsyncVirtualLM(AsyncLM): "detokenize": False, "stop": None, "ignore_eos": True, - "logprobs": 256, # Request logprobs for top 1000 tokens + "logprobs": LOGPROBS_PER_REQUEST, # This parameter fixes the number of requested logprobs. } - def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logits_processors=None): + def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): """Initialize an `AsyncVirtualLM` instance. - + Args: async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. @@ -72,8 +76,6 @@ def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logits_process # Wrap v1 tokenizer to be compatible with base class self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) self.request_counter = Counter() - # Store logits processors for compatibility (not used in v1) - self._logits_processors = list(logits_processors or []) self.cache = ( OutputCache(maxsize=cache_size, **cache_opts) if cache_size > 0 @@ -85,7 +87,8 @@ def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logits_process super().__init__(tokenizer=self.tokenizer) def _wrap_tokenizer(self, tokenizer): - """Wrap v1 tokenizer to be compatible with base class expectations.""" + """Wrap v1 tokenizer to be compatible with base class expectations. + Note that in V1 async_llm_engine.tokenizer is a TokenizerGroup object""" class TokenizerWrapper: def __init__(self, tokenizer): # Access the underlying tokenizer from TokenizerGroup @@ -94,7 +97,7 @@ def __init__(self, tokenizer): self.is_fast = True # Assume fast tokenizer for v1 self.name_or_path = getattr(self._tokenizer, 'name_or_path', 'unknown') - def __getattr__(self, name): + def __getattr__(self, name): # Retrieve the tokenizer from the TokenizerGroup object return getattr(self._tokenizer, name) def __len__(self): @@ -114,11 +117,14 @@ def from_name(cls, model_name, engine_opts=None, **kwargs): Returns: (AsyncVirtualLM): An `AsyncVirtualLM` instance. + + Note: for GPT-OSS, vLLM >= 0.10.2 is required """ if not HAS_VLLM: raise ImportError( # pragma: no cover "vLLM not available. Install vLLM or use AsyncTransformer instead." ) + if engine_opts is not None and "enable_chunked_prefill" in engine_opts: if engine_opts["enable_chunked_prefill"]: @@ -129,11 +135,8 @@ def from_name(cls, model_name, engine_opts=None, **kwargs): engine_opts = { "enable_prefix_caching": True, - "disable_log_requests": True, - "gpu_memory_utilization": 0.3, # Reduce GPU memory usage - "max_model_len": 512, # Reduce max sequence length - "max_logprobs": 1000, # Allow up to 1000 logprobs per token - # "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. + "max_logprobs": LOGPROBS_PER_REQUEST, + # "disable_log_requests": True, **(engine_opts or {}), } @@ -160,8 +163,8 @@ async def next_token_logprobs(self, token_ids): Returns: result (torch.Tensor): Normalized log probability tensor. """ - # Handle string input properly - if isinstance(token_ids, str): + # Note that differently from V0, V1 takes inout string by default + if isinstance(token_ids, str): key = token_ids else: key = tuple(token_ids) @@ -186,12 +189,12 @@ async def _next_token_logprobs(self, token_ids): (torch.Tensor): Normalized log probability tensor. """ req_id = str(next(self.request_counter)) - print(f"request id: {req_id}") + # print(f"request id: {req_id}") # For v1, use string prompt directly instead of TokensPrompt if isinstance(token_ids, str): prompt = token_ids else: - # Convert token IDs back to string for v1 compatibility + # Convert token IDs to string for v1 compatibility prompt = self.tokenizer.decode(token_ids) outputs = [] @@ -212,35 +215,32 @@ async def _next_token_logprobs(self, token_ids): logprobs = output.logprobs assert logprobs - print(f"shape of logprobs before: {len(logprobs[0]),type(logprobs)}") # v1 logprobs format: list of dicts with token_id -> logprob - # With max_logprobs=1000, we get many more logprobs vocab_size = len(self.tokenizer) logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) for token_id, logprob in logprobs[0].items(): - print(f"token_id: {token_id}, logprob: {logprob}") - # if isinstance(token_id, int) and 0 <= token_id < vocab_size: - # Extract the actual logprob value from the Logprob object + #Assign the logprobs to the top-k retrieved tokens in the vocabulary. if hasattr(logprob, 'logprob'): logprobs_tensor[0, token_id] = logprob.logprob else: logprobs_tensor[0, token_id] = float(logprob) + # Question: do we actually need to renormalize or can we just leave to -inf ?? #Distribute the remaining mass across the tokens that are not in the top-k non_inf_mask = logprobs_tensor[0] != -float('inf') # create boolean non-inf mask - if non_inf_mask.sum() > 0: + if non_inf_mask.sum() > 0: # Get the logprobs for the top-k tokens top_logprobs = logprobs_tensor[0][non_inf_mask] - remaining_prob = 1.0 - torch.exp(top_logprobs).sum().item() - remaining_tokens = (~non_inf_mask).sum().item() - uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) + remaining_prob = max( 1.0 - torch.exp(top_logprobs).sum().item(),0.0 ) #Compute the remaining probability mass + remaining_tokens = (~non_inf_mask).sum().item() #Compute the number of remaining tokens + uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) #Compute the uniform log probability logprobs_tensor[0][~non_inf_mask] = uniform_logprob logprobs = logprobs_tensor return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) - def next_token_logprobs_sync(self, token_ids): + def next_token_logprobs_sync(self, token_ids): #For now, this simply uses the asynchronous method. """Request log probabilities of next token synchronously. Args: @@ -252,39 +252,6 @@ def next_token_logprobs_sync(self, token_ids): import asyncio return asyncio.run(self.next_token_logprobs(token_ids)) - async def batch_next_token_logprobs(self, token_ids_list): - """ - Request log probabilities of next tokens in a batch asynchronously. - - Args: - token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. - - Returns: - (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. - """ - # Handle empty batch - if not token_ids_list: - return torch.empty((0, len(self.tokenizer)), dtype=torch.float32) - - # Use the base class implementation - return await super().batch_next_token_logprobs(token_ids_list) - - def batch_next_token_logprobs_sync(self, token_ids_list): - """ - Request log probabilities of next tokens in a batch synchronously. - - Args: - token_ids_list (list[list[int]]): A list of token ID lists, each representing a prompt to the language model. - - Returns: - (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. - """ - # Handle empty batch - if not token_ids_list: - return torch.empty((0, len(self.tokenizer)), dtype=torch.float32) - - # Use the base class implementation - return super().batch_next_token_logprobs_sync(token_ids_list) def clear_cache(self): """Clear output cache.""" @@ -328,7 +295,7 @@ async def sample( elif isinstance(prompt_token_ids, str): pass else: - raise f"Invalid prompt_ids_Type: {type(prompt_token_ids)}" + raise ValueError(f"Invalid prompt_ids_Type: {type(prompt_token_ids)}") async for output in self.async_llm_engine.generate( prompt=prompt_token_ids, @@ -337,7 +304,7 @@ async def sample( max_tokens=max_tokens, temperature=temperature, seed=seed, - stop=[self.byte_vocab[i].decode() for i in eos_token_ids], + stop=[self.tokenizer.decode([i]) for i in eos_token_ids], ), request_id=str(next(self.request_counter)), ): diff --git a/pyproject.toml b/pyproject.toml index f75650a..e39904c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "accelerate", "bitsandbytes; sys_platform == 'linux'", "numba", - "vllm>=0.6.6,<=0.10.0; sys_platform == 'linux'", + "vllm>=0.6.6,<=0.10.3; sys_platform == 'linux'", "triton>=3.2.0; sys_platform == 'linux'" ] From 2adf75b4386fca1c4a2d34e310edc100a3b92e92 Mon Sep 17 00:00:00 2001 From: Clemente Date: Sun, 2 Nov 2025 15:38:47 +0000 Subject: [PATCH 05/12] added test cases for V1, and modified the vllm class so that we can select either V1 or V0 by passing a flag to the constructor. --- .gitignore | 3 + genlm/backend/llm/vllm.py | 360 +++++++++++--------------------------- notes/Untitled.ipynb | 44 ----- notes/playground.ipynb | 0 tests/conftest.py | 1 + tests/test_llm.py | 31 ++++ 6 files changed, 138 insertions(+), 301 deletions(-) delete mode 100644 notes/Untitled.ipynb delete mode 100644 notes/playground.ipynb diff --git a/.gitignore b/.gitignore index 82f9275..b753619 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# Notes +notes/*.ipynb diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index f06f889..de6af14 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -1,10 +1,7 @@ -FORCE_V0 = True #Currently, we force thw model to use V0, to switch to V1 simply set this to False -LOGPROBS_PER_REQUEST = 256 #These are th elogprobs that are retrieved currently in V1 - -from syslog import LOG_PERROR import torch import logging import warnings +import os from genlm.backend.llm.base import AsyncLM from genlm.backend.cache import OutputCache @@ -13,12 +10,6 @@ from vllm import AsyncLLMEngine, SamplingParams from vllm.utils import Counter from vllm.inputs import TokensPrompt - from vllm import envs - - if envs.VLLM_USE_V1: #The import is different iin V1 and V0 - from vllm.engine.arg_utils import AsyncEngineArgs - else: - from vllm import AsyncEngineArgs from vllm.distributed.parallel_state import ( destroy_model_parallel, @@ -47,34 +38,55 @@ def from_name(cls, *args, **kwargs): # pragma: no cover "to use the vLLM-based AsyncLM model." ) -elif envs.VLLM_USE_V1 and not FORCE_V0: #If V1 +else: logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) + class PassThroughLogitsProcessor: + """A logits processor that stores the logprobs and passes the logits through.""" + + def __init__(self): + self.log_probs = None + + def __call__(self, past_token_ids, logits): + assert self.log_probs is None, ( + "Log probs already set. This should never happen." + ) + self.log_probs = torch.log_softmax(logits, dim=-1, dtype=logits.dtype) + return logits + class AsyncVirtualLM(AsyncLM): - default_params = { - "max_tokens": 1, - "n": 1, - "detokenize": False, - "stop": None, - "ignore_eos": True, - "logprobs": LOGPROBS_PER_REQUEST, # This parameter fixes the number of requested logprobs. - } - def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): + def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logprobs_per_request=256, v1=False): """Initialize an `AsyncVirtualLM` instance. Args: async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. + v1: if true sets the engine to V1, otherwise to V0 + logprobs_per_request: used only in V1, selects the number of retrieved logprobs. Note: The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. """ + self.v1 = v1 self.async_llm_engine = async_llm_engine - # Wrap v1 tokenizer to be compatible with base class - self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) + self.default_params = { + "max_tokens": 1, + "n": 1, + "detokenize": False, + "stop": None, + "ignore_eos": True, + } + # Version specific modifications + if self.v1: + self.default_params["logprobs"] = logprobs_per_request # set the retrieved logprobs + self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) # wrap tokenizer for V1 + async_llm_engine.log_stats = False + else: + self.tokenizer = async_llm_engine.engine.get_tokenizer() + async_llm_engine.engine.log_stats = False self.request_counter = Counter() self.cache = ( OutputCache(maxsize=cache_size, **cache_opts) @@ -82,8 +94,6 @@ def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): else None ) - async_llm_engine.log_stats = False - super().__init__(tokenizer=self.tokenizer) def _wrap_tokenizer(self, tokenizer): @@ -106,7 +116,7 @@ def __len__(self): return TokenizerWrapper(tokenizer) @classmethod - def from_name(cls, model_name, engine_opts=None, **kwargs): + def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=None, **kwargs): """Create a `AsyncVirtualLM` instance from a model name. Args: @@ -133,18 +143,41 @@ def from_name(cls, model_name, engine_opts=None, **kwargs): "custom sampling functionality." ) - engine_opts = { + if v1: + original_v1_env = os.environ.get("VLLM_USE_V1") # The engine type may be set as an environmrntal varible + os.environ["VLLM_USE_V1"] = "1" + from vllm.engine.arg_utils import AsyncEngineArgs # the AsyncEngineArgs import is different in V1 and V0. + engine_opts = { + "enable_prefix_caching": True, + "max_logprobs": logprobs_per_request, + **(engine_opts or {}), + } + else: + original_v1_env = os.environ.get("VLLM_USE_V1") + os.environ["VLLM_USE_V1"] = "0" + from vllm import AsyncEngineArgs # the AsyncEngineArgs import is different in V1 and V0 + engine_opts = { "enable_prefix_caching": True, - "max_logprobs": LOGPROBS_PER_REQUEST, - # "disable_log_requests": True, + "disable_log_requests": True, + "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. **(engine_opts or {}), } - engine = AsyncLLMEngine.from_engine_args( - AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) + engine = AsyncLLMEngine.from_engine_args( # Set up the engine + AsyncEngineArgs(model=model_name, + tokenizer=model_name, + **engine_opts) ) - return cls(engine, **kwargs) + try: # reset the environmental variable, so that it does not interfere with other instances + if original_v1_env is not None: + os.environ["VLLM_USE_V1"] = original_v1_env + else: + os.environ.pop("VLLM_USE_V1", None) + except Exception: + pass # Ignore cleanup errors + + return cls(engine, v1=v1, logprobs_per_request=logprobs_per_request, **kwargs) @property def underlying_model(self): @@ -163,23 +196,23 @@ async def next_token_logprobs(self, token_ids): Returns: result (torch.Tensor): Normalized log probability tensor. """ - # Note that differently from V0, V1 takes inout string by default - if isinstance(token_ids, str): - key = token_ids - else: - key = tuple(token_ids) + + key = tuple(token_ids) if self.cache is not None and key in self.cache: return self.cache[key] - result = await self._next_token_logprobs(key) + if self.v1: + result = await self._next_token_logprobs_v1(key) + else: + result = await self._next_token_logprobs_v0(key) if self.cache is not None: self.cache[key] = result return result - async def _next_token_logprobs(self, token_ids): + async def _next_token_logprobs_v1(self, token_ids): """Request log probabilities of next token asynchronously. Args: @@ -189,7 +222,7 @@ async def _next_token_logprobs(self, token_ids): (torch.Tensor): Normalized log probability tensor. """ req_id = str(next(self.request_counter)) - # print(f"request id: {req_id}") + # For v1, use string prompt directly instead of TokensPrompt if isinstance(token_ids, str): prompt = token_ids @@ -206,235 +239,29 @@ async def _next_token_logprobs(self, token_ids): if output.finished: outputs.append(output) - if not outputs: - raise RuntimeError("No outputs generated") - # Extract logprobs from the output # v1 provides logprobs in the output when logprobs parameter is set output = outputs[0].outputs[0] logprobs = output.logprobs - assert logprobs + assert logprobs, "Log probs should have been retrieved at this point" # v1 logprobs format: list of dicts with token_id -> logprob vocab_size = len(self.tokenizer) logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) for token_id, logprob in logprobs[0].items(): - #Assign the logprobs to the top-k retrieved tokens in the vocabulary. - if hasattr(logprob, 'logprob'): - logprobs_tensor[0, token_id] = logprob.logprob - else: - logprobs_tensor[0, token_id] = float(logprob) - - # Question: do we actually need to renormalize or can we just leave to -inf ?? - #Distribute the remaining mass across the tokens that are not in the top-k - non_inf_mask = logprobs_tensor[0] != -float('inf') # create boolean non-inf mask - if non_inf_mask.sum() > 0: - # Get the logprobs for the top-k tokens - top_logprobs = logprobs_tensor[0][non_inf_mask] - remaining_prob = max( 1.0 - torch.exp(top_logprobs).sum().item(),0.0 ) #Compute the remaining probability mass - remaining_tokens = (~non_inf_mask).sum().item() #Compute the number of remaining tokens - uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) #Compute the uniform log probability - logprobs_tensor[0][~non_inf_mask] = uniform_logprob + #Assign the logprobs to the top-k retrieved tokens in the vocabulary. + assert hasattr(logprob, 'logprob'), "Logprob field is required" + logprobs_tensor[0, token_id] = logprob.logprob + + #Right now we don't re-normalize! We might want to change this, + #the remaining mass can either be redistributed among the remaining tokens + # or among the selected ones. logprobs = logprobs_tensor return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) - def next_token_logprobs_sync(self, token_ids): #For now, this simply uses the asynchronous method. - """Request log probabilities of next token synchronously. - - Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. - - Returns: - (torch.Tensor): Normalized log probability tensor. - """ - import asyncio - return asyncio.run(self.next_token_logprobs(token_ids)) - - - def clear_cache(self): - """Clear output cache.""" - if self.cache: - self.cache.clear() - - def __del__(self): - """Clean up resources on deletion.""" - self._cleanup_engine() - - def _cleanup_engine(self): - """Clean up the vLLM engine and associated resources.""" - if async_engine := getattr(self, "async_llm", None): - async_engine.shutdown() - destroy_model_parallel() - destroy_distributed_environment() - - async def sample( - self, - prompt_token_ids, - max_tokens, - eos_token_ids, - temperature=1.0, - seed=None, - ): - """Sample from the language model. - - Args: - prompt_token_ids (list[int]): The token IDs of the prompt. - eos_token_ids (list[int]): The token IDs of the end-of-sequence tokens. - temperature (float, optional): The temperature to use to rescale the logits. Defaults to 1.0. - max_tokens (int): The maximum number of tokens to generate. - seed (int, optional): The seed for the random number generator. Defaults to None. - - Returns: - (list[int]): The sampled token IDs. - """ - - if isinstance(prompt_token_ids, list): - prompt_token_ids = self.tokenizer.decode(prompt_token_ids) - elif isinstance(prompt_token_ids, str): - pass - else: - raise ValueError(f"Invalid prompt_ids_Type: {type(prompt_token_ids)}") - - async for output in self.async_llm_engine.generate( - prompt=prompt_token_ids, - sampling_params=SamplingParams( - n=1, - max_tokens=max_tokens, - temperature=temperature, - seed=seed, - stop=[self.tokenizer.decode([i]) for i in eos_token_ids], - ), - request_id=str(next(self.request_counter)), - ): - if output.finished: - assert len(output.outputs) == 1, ( - "Expected exactly one sequence group" - ) - token_ids = list(output.outputs[0].token_ids) - if token_ids[-1] in eos_token_ids: - token_ids = token_ids[:-1] - return token_ids - -else: #Otherwise use V0 - - logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) - - class PassThroughLogitsProcessor: - """A logits processor that stores the logprobs and passes the logits through.""" - - def __init__(self): - self.log_probs = None - - def __call__(self, past_token_ids, logits): - assert self.log_probs is None, ( - "Log probs already set. This should never happen." - ) - self.log_probs = torch.log_softmax(logits, dim=-1, dtype=logits.dtype) - return logits - - class AsyncVirtualLM(AsyncLM): - default_params = { - "max_tokens": 1, - "n": 1, - "detokenize": False, - "stop": None, - "ignore_eos": True, - } - - def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): - """Initialize an `AsyncVirtualLM` instance. - - Args: - async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. - cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. - cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. - - Note: - The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. - """ - self.async_llm_engine = async_llm_engine - self.tokenizer = async_llm_engine.engine.get_tokenizer() - self.request_counter = Counter() - self.cache = ( - OutputCache(maxsize=cache_size, **cache_opts) - if cache_size > 0 - else None - ) - - async_llm_engine.engine.log_stats = False - - super().__init__(tokenizer=self.tokenizer) - - @classmethod - def from_name(cls, model_name, engine_opts=None, **kwargs): - """Create a `AsyncVirtualLM` instance from a model name. - - Args: - model_name (str): Name of the model to load. - engine_opts (dict): Additional options to pass to the `AsyncLLMEngine`. The engine will be - configured with prefix caching enabled and async output processing disabled by default. - **kwargs: Additional arguments passed to `AsyncVirtualLM` constructor. - - Returns: - (AsyncVirtualLM): An `AsyncVirtualLM` instance. - """ - if not HAS_VLLM: - raise ImportError( # pragma: no cover - "vLLM not available. Install vLLM or use AsyncTransformer instead." - ) - - if engine_opts is not None and "enable_chunked_prefill" in engine_opts: - if engine_opts["enable_chunked_prefill"]: - warnings.warn( # pragma: no cover - "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " - "custom sampling functionality." - ) - - engine_opts = { - "enable_prefix_caching": True, - "disable_log_requests": True, - "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. - **(engine_opts or {}), - } - - engine = AsyncLLMEngine.from_engine_args( - AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) - ) - - return cls(engine, **kwargs) - - @property - def underlying_model(self): - return self.async_llm_engine.engine.model_executor.driver_worker.model_runner.model - - async def next_token_logprobs(self, token_ids): - """Request log probabilities of next token asynchronously with output caching. - - Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. - - Returns: - result (torch.Tensor): Normalized log probability tensor. - - Warning: - Do not use `asyncio.run(next_token_logprobs())` as it may interfere with vLLM's background loop. - For synchronous usage, use the `next_token_logprobs_sync()` method instead. - """ - key = tuple(token_ids) - - if self.cache is not None and key in self.cache: - return self.cache[key] - - result = await self._next_token_logprobs(key) - - if self.cache is not None: - self.cache[key] = result - - return result - - async def _next_token_logprobs(self, token_ids): + async def _next_token_logprobs_v0(self, token_ids): """Request log probabilities of next token asynchronously. Args: @@ -472,6 +299,7 @@ def next_token_logprobs_sync(self, token_ids): Returns: (torch.Tensor): Normalized log probability tensor. """ + assert not self.v1 #Currently implemented only for V0 return self.batch_next_token_logprobs_sync([token_ids])[0] def batch_next_token_logprobs_sync(self, token_ids_list): @@ -484,6 +312,7 @@ def batch_next_token_logprobs_sync(self, token_ids_list): Returns: (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. """ + assert not self.v1 #Currently implemented only for V0 req_ids = [] req_id2processors = {} for token_ids in token_ids_list: @@ -511,6 +340,7 @@ def batch_next_token_logprobs_sync(self, token_ids_list): [req_id2processors[req_id].log_probs for req_id in req_ids] ) + def clear_cache(self): """Clear output cache.""" if self.cache: @@ -523,7 +353,10 @@ def __del__(self): def _cleanup_engine(self): """Clean up the vLLM engine and associated resources.""" if async_engine := getattr(self, "async_llm_engine", None): - async_engine.shutdown_background_loop() + if self.v1: + async_engine.shutdown() + else: + async_engine.shutdown_background_loop() destroy_model_parallel() destroy_distributed_environment() @@ -547,14 +380,27 @@ async def sample( Returns: (list[int]): The sampled token IDs. """ + if self.v1: + if isinstance(prompt_token_ids, list): + prompt_token_ids = self.tokenizer.decode(prompt_token_ids) + elif isinstance(prompt_token_ids, str): + pass + else: + raise ValueError(f"Invalid prompt_ids_Type: {type(prompt_token_ids)}") + else: + prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) + + # Question to check: Why do we need to use "byte_vocab"? + decode_eos = lambda eos_token_ids : [self.tokenizer.decode([i]) for i in eos_token_ids] if self.v1 else [self.byte_vocab[i].decode() for i in eos_token_ids] + async for output in self.async_llm_engine.generate( - prompt=TokensPrompt(prompt_token_ids=prompt_token_ids), + prompt=prompt_token_ids, sampling_params=SamplingParams( n=1, max_tokens=max_tokens, temperature=temperature, seed=seed, - stop=[self.byte_vocab[i].decode() for i in eos_token_ids], + stop= decode_eos(eos_token_ids), ), request_id=str(next(self.request_counter)), ): diff --git a/notes/Untitled.ipynb b/notes/Untitled.ipynb deleted file mode 100644 index 940d413..0000000 --- a/notes/Untitled.ipynb +++ /dev/null @@ -1,44 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 2, - "id": "21839ddb-6090-4866-beb4-b24a7c2f4e80", - "metadata": {}, - "outputs": [], - "source": [ - "import genlm\n", - "import numpy as np" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ca6090bf-5ef9-45ee-8e15-12c0c9399422", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python genlm", - "language": "python", - "name": "genlm" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.13" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/notes/playground.ipynb b/notes/playground.ipynb deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py index f8a4566..9b8ebfa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -153,6 +153,7 @@ def from_name(cls, model_name, llm_opts=None): "enable_prefix_caching": True, "disable_log_stats": True, "dtype": "float16", + "disable_async_output_proc": True, # Force the use of V0 **(llm_opts or {}), } llm = LLM(model=model_name, tokenizer=model_name, **llm_opts) diff --git a/tests/test_llm.py b/tests/test_llm.py index 81aefc7..c481cd3 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -4,6 +4,7 @@ from conftest import cuda_only, ReferenceVirtualLM from arsenal.maths import compare from genlm.backend.llm import load_model_by_name, MockAsyncLM, AsyncVirtualLM +import os # from hypothesis import given, strategies as st, settings @@ -13,6 +14,7 @@ def model_name(): return "gpt2" + @pytest.fixture(scope="module") def reference_llm(model_name): return ReferenceVirtualLM.from_name( @@ -28,6 +30,14 @@ def async_llm(model_name): llm_opts={"engine_opts": {"gpu_memory_utilization": 0.2, "dtype": "float16"}}, ) +@pytest.fixture(scope="module") +def async_llm_v1(model_name): + return load_model_by_name( + model_name, + backend="vllm", + llm_opts={"v1": True, "engine_opts": {"gpu_memory_utilization": 0.2, "dtype": "float16"}}, + ) + @pytest.fixture(scope="module") def transformer_llm(model_name): @@ -148,6 +158,27 @@ def test_batch_next_token_logprobs_agreement( token_ids_list[i], ] +@cuda_only +@pytest.mark.asyncio # Need to run V1 with asyncio. For some reason gets messed up with multiple event loops +async def test_v1_next_token_logprobs(async_llm_v1, reference_llm, token_ids_list): + """Test V1 logprobs against reference (on top-256 tokens only).""" + for token_ids in token_ids_list: + logprobs_v1 = await async_llm_v1.next_token_logprobs(token_ids) + logprobs_ref = await reference_llm.next_token_logprobs(token_ids) + + # Filter non-inf tokens. Note that V1 retrieves only th etpo-k tokens and sets the other to -inf + valid_mask = logprobs_v1 != -float('inf') + if valid_mask.sum() <= 128: + pytest.skip("Less than 128 tokens to compare!") + + comparison = compare( + logprobs_v1[valid_mask].cpu().numpy(), + logprobs_ref[valid_mask] + ) + + assert comparison.max_rel_err < 0.1, token_ids # Had to increase a bit the tollerance for V1. Note that the V1 engine might slightly differ in how the weights are handled. + assert comparison.pearson > 0.95, token_ids + @pytest.mark.asyncio async def test_mock_async_llm(): From f3f920a8b93990034896336059bb5c760b7d2a89 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 3 Nov 2025 18:37:42 +0000 Subject: [PATCH 06/12] ruff tests and set back the dependencies to <=0.10.0 to avoid conflicts --- genlm/backend/llm/hf.py | 2 - genlm/backend/llm/vllm.py | 136 ++++++++++++++++++++++++-------------- pyproject.toml | 2 +- tests/conftest.py | 2 +- tests/test_hf_llm.py | 2 +- tests/test_llm.py | 26 +++++--- 6 files changed, 103 insertions(+), 67 deletions(-) diff --git a/genlm/backend/llm/hf.py b/genlm/backend/llm/hf.py index 45a1bf5..746ce0b 100644 --- a/genlm/backend/llm/hf.py +++ b/genlm/backend/llm/hf.py @@ -148,8 +148,6 @@ def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): # model_id, quantization_config=bnb_config, **_hf_opts # ) - - # return cls(mod, tok, **kwargs) @torch.no_grad() diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index de6af14..74ec28b 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -38,8 +38,7 @@ def from_name(cls, *args, **kwargs): # pragma: no cover "to use the vLLM-based AsyncLM model." ) -else: - +else: logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) class PassThroughLogitsProcessor: @@ -56,10 +55,16 @@ def __call__(self, past_token_ids, logits): return logits class AsyncVirtualLM(AsyncLM): - - def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logprobs_per_request=256, v1=False): + def __init__( + self, + async_llm_engine, + cache_size=0, + cache_opts={}, + logprobs_per_request=256, + v1=False, + ): """Initialize an `AsyncVirtualLM` instance. - + Args: async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. @@ -73,16 +78,20 @@ def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logprobs_per_r self.v1 = v1 self.async_llm_engine = async_llm_engine self.default_params = { - "max_tokens": 1, - "n": 1, - "detokenize": False, - "stop": None, - "ignore_eos": True, + "max_tokens": 1, + "n": 1, + "detokenize": False, + "stop": None, + "ignore_eos": True, } # Version specific modifications if self.v1: - self.default_params["logprobs"] = logprobs_per_request # set the retrieved logprobs - self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) # wrap tokenizer for V1 + self.default_params["logprobs"] = ( + logprobs_per_request # set the retrieved logprobs + ) + self.tokenizer = self._wrap_tokenizer( + async_llm_engine.tokenizer + ) # wrap tokenizer for V1 async_llm_engine.log_stats = False else: self.tokenizer = async_llm_engine.engine.get_tokenizer() @@ -99,24 +108,36 @@ def __init__(self, async_llm_engine, cache_size=0, cache_opts={}, logprobs_per_r def _wrap_tokenizer(self, tokenizer): """Wrap v1 tokenizer to be compatible with base class expectations. Note that in V1 async_llm_engine.tokenizer is a TokenizerGroup object""" + class TokenizerWrapper: def __init__(self, tokenizer): # Access the underlying tokenizer from TokenizerGroup - self._tokenizer = getattr(tokenizer, 'tokenizer', tokenizer) + self._tokenizer = getattr(tokenizer, "tokenizer", tokenizer) # Add compatibility attributes self.is_fast = True # Assume fast tokenizer for v1 - self.name_or_path = getattr(self._tokenizer, 'name_or_path', 'unknown') + self.name_or_path = getattr( + self._tokenizer, "name_or_path", "unknown" + ) - def __getattr__(self, name): # Retrieve the tokenizer from the TokenizerGroup object + def __getattr__( + self, name + ): # Retrieve the tokenizer from the TokenizerGroup object return getattr(self._tokenizer, name) - + def __len__(self): return len(self._tokenizer) - + return TokenizerWrapper(tokenizer) @classmethod - def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=None, **kwargs): + def from_name( + cls, + model_name, + v1=False, + logprobs_per_request=256, + engine_opts=None, + **kwargs, + ): """Create a `AsyncVirtualLM` instance from a model name. Args: @@ -128,13 +149,12 @@ def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=N Returns: (AsyncVirtualLM): An `AsyncVirtualLM` instance. - Note: for GPT-OSS, vLLM >= 0.10.2 is required + Note: for GPT-OSS, vLLM >= 0.10.2 is required """ if not HAS_VLLM: raise ImportError( # pragma: no cover "vLLM not available. Install vLLM or use AsyncTransformer instead." ) - if engine_opts is not None and "enable_chunked_prefill" in engine_opts: if engine_opts["enable_chunked_prefill"]: @@ -144,9 +164,14 @@ def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=N ) if v1: - original_v1_env = os.environ.get("VLLM_USE_V1") # The engine type may be set as an environmrntal varible + original_v1_env = os.environ.get( + "VLLM_USE_V1" + ) # The engine type may be set as an environmrntal varible os.environ["VLLM_USE_V1"] = "1" - from vllm.engine.arg_utils import AsyncEngineArgs # the AsyncEngineArgs import is different in V1 and V0. + from vllm.engine.arg_utils import ( + AsyncEngineArgs, + ) # the AsyncEngineArgs import is different in V1 and V0. + engine_opts = { "enable_prefix_caching": True, "max_logprobs": logprobs_per_request, @@ -155,21 +180,22 @@ def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=N else: original_v1_env = os.environ.get("VLLM_USE_V1") os.environ["VLLM_USE_V1"] = "0" - from vllm import AsyncEngineArgs # the AsyncEngineArgs import is different in V1 and V0 + from vllm import ( + AsyncEngineArgs, + ) # the AsyncEngineArgs import is different in V1 and V0 + engine_opts = { - "enable_prefix_caching": True, - "disable_log_requests": True, - "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. - **(engine_opts or {}), - } + "enable_prefix_caching": True, + "disable_log_requests": True, # is it possible to remove this parameter? it is cauing problems with vllm >= v 0.10.0 + "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. + **(engine_opts or {}), + } engine = AsyncLLMEngine.from_engine_args( # Set up the engine - AsyncEngineArgs(model=model_name, - tokenizer=model_name, - **engine_opts) + AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) ) - try: # reset the environmental variable, so that it does not interfere with other instances + try: # reset the environmental variable, so that it does not interfere with other instances if original_v1_env is not None: os.environ["VLLM_USE_V1"] = original_v1_env else: @@ -177,7 +203,9 @@ def from_name(cls, model_name, v1=False, logprobs_per_request=256 ,engine_opts=N except Exception: pass # Ignore cleanup errors - return cls(engine, v1=v1, logprobs_per_request=logprobs_per_request, **kwargs) + return cls( + engine, v1=v1, logprobs_per_request=logprobs_per_request, **kwargs + ) @property def underlying_model(self): @@ -196,7 +224,7 @@ async def next_token_logprobs(self, token_ids): Returns: result (torch.Tensor): Normalized log probability tensor. """ - + key = tuple(token_ids) if self.cache is not None and key in self.cache: @@ -243,21 +271,22 @@ async def _next_token_logprobs_v1(self, token_ids): # v1 provides logprobs in the output when logprobs parameter is set output = outputs[0].outputs[0] logprobs = output.logprobs - + assert logprobs, "Log probs should have been retrieved at this point" # v1 logprobs format: list of dicts with token_id -> logprob vocab_size = len(self.tokenizer) - logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) - + logprobs_tensor = torch.full( + (1, vocab_size), -float("inf"), dtype=torch.float32 + ) + for token_id, logprob in logprobs[0].items(): - #Assign the logprobs to the top-k retrieved tokens in the vocabulary. - assert hasattr(logprob, 'logprob'), "Logprob field is required" + # Assign the logprobs to the top-k retrieved tokens in the vocabulary. + assert hasattr(logprob, "logprob"), "Logprob field is required" logprobs_tensor[0, token_id] = logprob.logprob - - - #Right now we don't re-normalize! We might want to change this, - #the remaining mass can either be redistributed among the remaining tokens - # or among the selected ones. + + # Right now we don't re-normalize! We might want to change this, + # the remaining mass can either be redistributed among the remaining tokens + # or among the selected ones. logprobs = logprobs_tensor return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) @@ -299,7 +328,7 @@ def next_token_logprobs_sync(self, token_ids): Returns: (torch.Tensor): Normalized log probability tensor. """ - assert not self.v1 #Currently implemented only for V0 + assert not self.v1 # Currently implemented only for V0 return self.batch_next_token_logprobs_sync([token_ids])[0] def batch_next_token_logprobs_sync(self, token_ids_list): @@ -312,7 +341,7 @@ def batch_next_token_logprobs_sync(self, token_ids_list): Returns: (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. """ - assert not self.v1 #Currently implemented only for V0 + assert not self.v1 # Currently implemented only for V0 req_ids = [] req_id2processors = {} for token_ids in token_ids_list: @@ -340,7 +369,6 @@ def batch_next_token_logprobs_sync(self, token_ids_list): [req_id2processors[req_id].log_probs for req_id in req_ids] ) - def clear_cache(self): """Clear output cache.""" if self.cache: @@ -386,12 +414,18 @@ async def sample( elif isinstance(prompt_token_ids, str): pass else: - raise ValueError(f"Invalid prompt_ids_Type: {type(prompt_token_ids)}") + raise ValueError( + f"Invalid prompt_ids_Type: {type(prompt_token_ids)}" + ) else: prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) - # Question to check: Why do we need to use "byte_vocab"? - decode_eos = lambda eos_token_ids : [self.tokenizer.decode([i]) for i in eos_token_ids] if self.v1 else [self.byte_vocab[i].decode() for i in eos_token_ids] + # Question to check: Why do we need to use "byte_vocab"? + def decode_eos(eos_token_ids): + if self.v1: + return [self.tokenizer.decode([i]) for i in eos_token_ids] + else: # What is the adavntage of using "byte_vocab" instead of the tokenizer. Can we do this also with V1 ? + [self.byte_vocab[i].decode() for i in eos_token_ids] async for output in self.async_llm_engine.generate( prompt=prompt_token_ids, @@ -400,7 +434,7 @@ async def sample( max_tokens=max_tokens, temperature=temperature, seed=seed, - stop= decode_eos(eos_token_ids), + stop=decode_eos(eos_token_ids), ), request_id=str(next(self.request_counter)), ): diff --git a/pyproject.toml b/pyproject.toml index e39904c..f75650a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "accelerate", "bitsandbytes; sys_platform == 'linux'", "numba", - "vllm>=0.6.6,<=0.10.3; sys_platform == 'linux'", + "vllm>=0.6.6,<=0.10.0; sys_platform == 'linux'", "triton>=3.2.0; sys_platform == 'linux'" ] diff --git a/tests/conftest.py b/tests/conftest.py index 9b8ebfa..7d83fbd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -153,7 +153,7 @@ def from_name(cls, model_name, llm_opts=None): "enable_prefix_caching": True, "disable_log_stats": True, "dtype": "float16", - "disable_async_output_proc": True, # Force the use of V0 + "disable_async_output_proc": True, # Force the use of V0 **(llm_opts or {}), } llm = LLM(model=model_name, tokenizer=model_name, **llm_opts) diff --git a/tests/test_hf_llm.py b/tests/test_hf_llm.py index 99de5f1..c71265a 100644 --- a/tests/test_hf_llm.py +++ b/tests/test_hf_llm.py @@ -259,7 +259,7 @@ def test_load_model_by_name_no_backend(): def test_sample_seeded(async_llm): prompt_token_ids = async_llm.tokenizer.encode("An apple a day keeps the") - + first_token_ids = asyncio.run( async_llm.sample( prompt_token_ids=prompt_token_ids, diff --git a/tests/test_llm.py b/tests/test_llm.py index c481cd3..09cead9 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -4,7 +4,6 @@ from conftest import cuda_only, ReferenceVirtualLM from arsenal.maths import compare from genlm.backend.llm import load_model_by_name, MockAsyncLM, AsyncVirtualLM -import os # from hypothesis import given, strategies as st, settings @@ -14,7 +13,6 @@ def model_name(): return "gpt2" - @pytest.fixture(scope="module") def reference_llm(model_name): return ReferenceVirtualLM.from_name( @@ -30,12 +28,16 @@ def async_llm(model_name): llm_opts={"engine_opts": {"gpu_memory_utilization": 0.2, "dtype": "float16"}}, ) + @pytest.fixture(scope="module") def async_llm_v1(model_name): return load_model_by_name( model_name, backend="vllm", - llm_opts={"v1": True, "engine_opts": {"gpu_memory_utilization": 0.2, "dtype": "float16"}}, + llm_opts={ + "v1": True, + "engine_opts": {"gpu_memory_utilization": 0.2, "dtype": "float16"}, + }, ) @@ -158,25 +160,27 @@ def test_batch_next_token_logprobs_agreement( token_ids_list[i], ] + @cuda_only -@pytest.mark.asyncio # Need to run V1 with asyncio. For some reason gets messed up with multiple event loops +@pytest.mark.asyncio # Need to run V1 with asyncio. For some reason gets messed up with multiple event loops async def test_v1_next_token_logprobs(async_llm_v1, reference_llm, token_ids_list): """Test V1 logprobs against reference (on top-256 tokens only).""" for token_ids in token_ids_list: logprobs_v1 = await async_llm_v1.next_token_logprobs(token_ids) logprobs_ref = await reference_llm.next_token_logprobs(token_ids) - + # Filter non-inf tokens. Note that V1 retrieves only th etpo-k tokens and sets the other to -inf - valid_mask = logprobs_v1 != -float('inf') + valid_mask = logprobs_v1 != -float("inf") if valid_mask.sum() <= 128: pytest.skip("Less than 128 tokens to compare!") - + comparison = compare( - logprobs_v1[valid_mask].cpu().numpy(), - logprobs_ref[valid_mask] + logprobs_v1[valid_mask].cpu().numpy(), logprobs_ref[valid_mask] ) - - assert comparison.max_rel_err < 0.1, token_ids # Had to increase a bit the tollerance for V1. Note that the V1 engine might slightly differ in how the weights are handled. + + assert comparison.max_rel_err < 0.1, ( + token_ids + ) # Had to increase a bit the tollerance for V1. Note that the V1 engine might slightly differ in how the weights are handled. assert comparison.pearson > 0.95, token_ids From 608d960725e6baada8fd7df243d68266203c4cd5 Mon Sep 17 00:00:00 2001 From: Clemente Date: Tue, 4 Nov 2025 10:12:05 +0000 Subject: [PATCH 07/12] addeds skipo condition in the case that V1 is not supported --- genlm/backend/llm/vllm.py | 14 ++++++-------- tests/test_llm.py | 7 +++++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index 74ec28b..9922c35 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -166,7 +166,7 @@ def from_name( if v1: original_v1_env = os.environ.get( "VLLM_USE_V1" - ) # The engine type may be set as an environmrntal varible + ) # The Engine Type could be set as an environmental variable so we set it to either V1 or V0 (after copying it in order to reset it later) os.environ["VLLM_USE_V1"] = "1" from vllm.engine.arg_utils import ( AsyncEngineArgs, @@ -195,13 +195,11 @@ def from_name( AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) ) - try: # reset the environmental variable, so that it does not interfere with other instances - if original_v1_env is not None: - os.environ["VLLM_USE_V1"] = original_v1_env - else: - os.environ.pop("VLLM_USE_V1", None) - except Exception: - pass # Ignore cleanup errors + # reset the environmental variable, so that it does not interfere with other instances + if original_v1_env is not None: + os.environ["VLLM_USE_V1"] = original_v1_env + else: + os.environ.pop("VLLM_USE_V1", None) return cls( engine, v1=v1, logprobs_per_request=logprobs_per_request, **kwargs diff --git a/tests/test_llm.py b/tests/test_llm.py index 09cead9..d317efc 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -31,6 +31,13 @@ def async_llm(model_name): @pytest.fixture(scope="module") def async_llm_v1(model_name): + try: + capability = torch.cuda.get_device_capability(0) + if capability[0] < 8: + pytest.skip("vLLM V1 requires GPU with Compute Capability >= 8.0") + except Exception: + pytest.skip("CUDA unavailable or cannot access CUDA capability") + return load_model_by_name( model_name, backend="vllm", From a44c4273b49dbcd3e8127c1d96f93cc34bd7d51f Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 10 Nov 2025 11:49:08 +0000 Subject: [PATCH 08/12] added V1 patches --- genlm/backend/llm/vllm.py | 92 +++++++++++++++++++-------------------- pyproject.toml | 5 +++ 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index 9922c35..fa4b271 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -85,14 +85,14 @@ def __init__( "ignore_eos": True, } # Version specific modifications - if self.v1: - self.default_params["logprobs"] = ( + if self.v1: # pragma: no cover + self.default_params["logprobs"] = ( # pragma: no cover logprobs_per_request # set the retrieved logprobs ) - self.tokenizer = self._wrap_tokenizer( + self.tokenizer = self._wrap_tokenizer( # pragma: no cover async_llm_engine.tokenizer - ) # wrap tokenizer for V1 - async_llm_engine.log_stats = False + ) # wrap tokenizer for V1 # pragma: no cover + async_llm_engine.log_stats = False # pragma: no cover else: self.tokenizer = async_llm_engine.engine.get_tokenizer() async_llm_engine.engine.log_stats = False @@ -105,26 +105,26 @@ def __init__( super().__init__(tokenizer=self.tokenizer) - def _wrap_tokenizer(self, tokenizer): + def _wrap_tokenizer(self, tokenizer): # pragma: no cover """Wrap v1 tokenizer to be compatible with base class expectations. Note that in V1 async_llm_engine.tokenizer is a TokenizerGroup object""" - class TokenizerWrapper: - def __init__(self, tokenizer): + class TokenizerWrapper: # pragma: no cover + def __init__(self, tokenizer): # pragma: no cover # Access the underlying tokenizer from TokenizerGroup - self._tokenizer = getattr(tokenizer, "tokenizer", tokenizer) + self._tokenizer = getattr(tokenizer, "tokenizer", tokenizer) # pragma: no cover # Add compatibility attributes - self.is_fast = True # Assume fast tokenizer for v1 + self.is_fast = True # Assume fast tokenizer for v1 # pragma: no cover self.name_or_path = getattr( - self._tokenizer, "name_or_path", "unknown" - ) + self._tokenizer, "name_or_path", "unknown" # pragma: no cover + ) # pragma: no cover - def __getattr__( + def __getattr__( # pragma: no cover self, name ): # Retrieve the tokenizer from the TokenizerGroup object return getattr(self._tokenizer, name) - def __len__(self): + def __len__(self): # pragma: no cover return len(self._tokenizer) return TokenizerWrapper(tokenizer) @@ -163,20 +163,20 @@ def from_name( "custom sampling functionality." ) - if v1: + if v1: # pragma: no cover original_v1_env = os.environ.get( - "VLLM_USE_V1" + "VLLM_USE_V1" # pragma: no cover ) # The Engine Type could be set as an environmental variable so we set it to either V1 or V0 (after copying it in order to reset it later) - os.environ["VLLM_USE_V1"] = "1" + os.environ["VLLM_USE_V1"] = "1" # pragma: no cover from vllm.engine.arg_utils import ( AsyncEngineArgs, - ) # the AsyncEngineArgs import is different in V1 and V0. + ) # the AsyncEngineArgs import is different in V1 and V0. # pragma: no cover engine_opts = { "enable_prefix_caching": True, "max_logprobs": logprobs_per_request, **(engine_opts or {}), - } + } # pragma: no cover else: original_v1_env = os.environ.get("VLLM_USE_V1") os.environ["VLLM_USE_V1"] = "0" @@ -228,8 +228,8 @@ async def next_token_logprobs(self, token_ids): if self.cache is not None and key in self.cache: return self.cache[key] - if self.v1: - result = await self._next_token_logprobs_v1(key) + if self.v1: # pragma: no cover + result = await self._next_token_logprobs_v1(key) # pragma: no cover else: result = await self._next_token_logprobs_v0(key) @@ -238,7 +238,7 @@ async def next_token_logprobs(self, token_ids): return result - async def _next_token_logprobs_v1(self, token_ids): + async def _next_token_logprobs_v1(self, token_ids): # pragma: no cover """Request log probabilities of next token asynchronously. Args: @@ -250,43 +250,43 @@ async def _next_token_logprobs_v1(self, token_ids): req_id = str(next(self.request_counter)) # For v1, use string prompt directly instead of TokensPrompt - if isinstance(token_ids, str): + if isinstance(token_ids, str): # pragma: no cover prompt = token_ids - else: + else: # pragma: no cover # Convert token IDs to string for v1 compatibility - prompt = self.tokenizer.decode(token_ids) + prompt = self.tokenizer.decode(token_ids) # pragma: no cover outputs = [] async for output in self.async_llm_engine.generate( prompt=prompt, sampling_params=SamplingParams(**self.default_params), request_id=req_id, - ): + ): # pragma: no cover if output.finished: outputs.append(output) # Extract logprobs from the output # v1 provides logprobs in the output when logprobs parameter is set - output = outputs[0].outputs[0] + output = outputs[0].outputs[0] # pragma: no cover logprobs = output.logprobs - assert logprobs, "Log probs should have been retrieved at this point" + assert logprobs, "Log probs should have been retrieved at this point" # pragma: no cover # v1 logprobs format: list of dicts with token_id -> logprob - vocab_size = len(self.tokenizer) + vocab_size = len(self.tokenizer) # pragma: no cover logprobs_tensor = torch.full( - (1, vocab_size), -float("inf"), dtype=torch.float32 + (1, vocab_size), -float("inf"), dtype=torch.float32 # pragma: no cover ) - for token_id, logprob in logprobs[0].items(): + for token_id, logprob in logprobs[0].items(): # pragma: no cover # Assign the logprobs to the top-k retrieved tokens in the vocabulary. - assert hasattr(logprob, "logprob"), "Logprob field is required" + assert hasattr(logprob, "logprob"), "Logprob field is required" # pragma: no cover logprobs_tensor[0, token_id] = logprob.logprob # Right now we don't re-normalize! We might want to change this, # the remaining mass can either be redistributed among the remaining tokens # or among the selected ones. - logprobs = logprobs_tensor - return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) + logprobs = logprobs_tensor # pragma: no cover + return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) # pragma: no cover async def _next_token_logprobs_v0(self, token_ids): """Request log probabilities of next token asynchronously. @@ -379,8 +379,8 @@ def __del__(self): def _cleanup_engine(self): """Clean up the vLLM engine and associated resources.""" if async_engine := getattr(self, "async_llm_engine", None): - if self.v1: - async_engine.shutdown() + if self.v1: # pragma: no cover + async_engine.shutdown() # pragma: no cover else: async_engine.shutdown_background_loop() destroy_model_parallel() @@ -406,22 +406,22 @@ async def sample( Returns: (list[int]): The sampled token IDs. """ - if self.v1: - if isinstance(prompt_token_ids, list): - prompt_token_ids = self.tokenizer.decode(prompt_token_ids) - elif isinstance(prompt_token_ids, str): + if self.v1: # pragma: no cover + if isinstance(prompt_token_ids, list): # pragma: no cover + prompt_token_ids = self.tokenizer.decode(prompt_token_ids) # pragma: no cover + elif isinstance(prompt_token_ids, str): # pragma: no cover pass - else: + else: # pragma: no cover raise ValueError( f"Invalid prompt_ids_Type: {type(prompt_token_ids)}" - ) - else: - prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) + ) # pragma: no cover + else: + prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) # Question to check: Why do we need to use "byte_vocab"? def decode_eos(eos_token_ids): - if self.v1: - return [self.tokenizer.decode([i]) for i in eos_token_ids] + if self.v1: # pragma: no cover + return [self.tokenizer.decode([i]) for i in eos_token_ids] # pragma: no cover else: # What is the adavntage of using "byte_vocab" instead of the tokenizer. Can we do this also with V1 ? [self.byte_vocab[i].decode() for i in eos_token_ids] diff --git a/pyproject.toml b/pyproject.toml index c1aa642..459b01b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,3 +42,8 @@ requires = ["setuptools>=64.0", "setuptools-scm>=8"] build-backend = "setuptools.build_meta" [tool.setuptools_scm] + +[tool.coverage.report] # rn, V1 must be excluded from coverage as it requires compute capability >= 8, which is not available on the CI. +exclude_lines = [ + "if self.v1:", + "if v1:"] From ab05c3da47bd189b83ebf614495f3ebb23c37909 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 10 Nov 2025 11:50:25 +0000 Subject: [PATCH 09/12] add V1 patches and fixing the formatting --- genlm/backend/llm/vllm.py | 104 ++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index fa4b271..399347b 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -85,14 +85,14 @@ def __init__( "ignore_eos": True, } # Version specific modifications - if self.v1: # pragma: no cover - self.default_params["logprobs"] = ( # pragma: no cover + if self.v1: # pragma: no cover + self.default_params["logprobs"] = ( # pragma: no cover logprobs_per_request # set the retrieved logprobs ) - self.tokenizer = self._wrap_tokenizer( # pragma: no cover + self.tokenizer = self._wrap_tokenizer( # pragma: no cover async_llm_engine.tokenizer ) # wrap tokenizer for V1 # pragma: no cover - async_llm_engine.log_stats = False # pragma: no cover + async_llm_engine.log_stats = False # pragma: no cover else: self.tokenizer = async_llm_engine.engine.get_tokenizer() async_llm_engine.engine.log_stats = False @@ -105,26 +105,32 @@ def __init__( super().__init__(tokenizer=self.tokenizer) - def _wrap_tokenizer(self, tokenizer): # pragma: no cover + def _wrap_tokenizer(self, tokenizer): # pragma: no cover """Wrap v1 tokenizer to be compatible with base class expectations. Note that in V1 async_llm_engine.tokenizer is a TokenizerGroup object""" - class TokenizerWrapper: # pragma: no cover - def __init__(self, tokenizer): # pragma: no cover + class TokenizerWrapper: # pragma: no cover + def __init__(self, tokenizer): # pragma: no cover # Access the underlying tokenizer from TokenizerGroup - self._tokenizer = getattr(tokenizer, "tokenizer", tokenizer) # pragma: no cover + self._tokenizer = getattr( + tokenizer, "tokenizer", tokenizer + ) # pragma: no cover # Add compatibility attributes - self.is_fast = True # Assume fast tokenizer for v1 # pragma: no cover + self.is_fast = ( + True # Assume fast tokenizer for v1 # pragma: no cover + ) self.name_or_path = getattr( - self._tokenizer, "name_or_path", "unknown" # pragma: no cover - ) # pragma: no cover + self._tokenizer, + "name_or_path", + "unknown", # pragma: no cover + ) # pragma: no cover - def __getattr__( # pragma: no cover + def __getattr__( # pragma: no cover self, name ): # Retrieve the tokenizer from the TokenizerGroup object return getattr(self._tokenizer, name) - def __len__(self): # pragma: no cover + def __len__(self): # pragma: no cover return len(self._tokenizer) return TokenizerWrapper(tokenizer) @@ -163,11 +169,11 @@ def from_name( "custom sampling functionality." ) - if v1: # pragma: no cover + if v1: # pragma: no cover original_v1_env = os.environ.get( - "VLLM_USE_V1" # pragma: no cover + "VLLM_USE_V1" # pragma: no cover ) # The Engine Type could be set as an environmental variable so we set it to either V1 or V0 (after copying it in order to reset it later) - os.environ["VLLM_USE_V1"] = "1" # pragma: no cover + os.environ["VLLM_USE_V1"] = "1" # pragma: no cover from vllm.engine.arg_utils import ( AsyncEngineArgs, ) # the AsyncEngineArgs import is different in V1 and V0. # pragma: no cover @@ -176,7 +182,7 @@ def from_name( "enable_prefix_caching": True, "max_logprobs": logprobs_per_request, **(engine_opts or {}), - } # pragma: no cover + } # pragma: no cover else: original_v1_env = os.environ.get("VLLM_USE_V1") os.environ["VLLM_USE_V1"] = "0" @@ -228,8 +234,8 @@ async def next_token_logprobs(self, token_ids): if self.cache is not None and key in self.cache: return self.cache[key] - if self.v1: # pragma: no cover - result = await self._next_token_logprobs_v1(key) # pragma: no cover + if self.v1: # pragma: no cover + result = await self._next_token_logprobs_v1(key) # pragma: no cover else: result = await self._next_token_logprobs_v0(key) @@ -250,43 +256,51 @@ async def _next_token_logprobs_v1(self, token_ids): # pragma: no cover req_id = str(next(self.request_counter)) # For v1, use string prompt directly instead of TokensPrompt - if isinstance(token_ids, str): # pragma: no cover + if isinstance(token_ids, str): # pragma: no cover prompt = token_ids - else: # pragma: no cover + else: # pragma: no cover # Convert token IDs to string for v1 compatibility - prompt = self.tokenizer.decode(token_ids) # pragma: no cover + prompt = self.tokenizer.decode(token_ids) # pragma: no cover outputs = [] async for output in self.async_llm_engine.generate( prompt=prompt, sampling_params=SamplingParams(**self.default_params), request_id=req_id, - ): # pragma: no cover + ): # pragma: no cover if output.finished: outputs.append(output) # Extract logprobs from the output # v1 provides logprobs in the output when logprobs parameter is set - output = outputs[0].outputs[0] # pragma: no cover + output = outputs[0].outputs[0] # pragma: no cover logprobs = output.logprobs - assert logprobs, "Log probs should have been retrieved at this point" # pragma: no cover + assert logprobs, ( + "Log probs should have been retrieved at this point" + ) # pragma: no cover # v1 logprobs format: list of dicts with token_id -> logprob - vocab_size = len(self.tokenizer) # pragma: no cover + vocab_size = len(self.tokenizer) # pragma: no cover logprobs_tensor = torch.full( - (1, vocab_size), -float("inf"), dtype=torch.float32 # pragma: no cover + (1, vocab_size), + -float("inf"), + dtype=torch.float32, # pragma: no cover ) - for token_id, logprob in logprobs[0].items(): # pragma: no cover + for token_id, logprob in logprobs[0].items(): # pragma: no cover # Assign the logprobs to the top-k retrieved tokens in the vocabulary. - assert hasattr(logprob, "logprob"), "Logprob field is required" # pragma: no cover + assert hasattr(logprob, "logprob"), ( + "Logprob field is required" + ) # pragma: no cover logprobs_tensor[0, token_id] = logprob.logprob # Right now we don't re-normalize! We might want to change this, # the remaining mass can either be redistributed among the remaining tokens # or among the selected ones. - logprobs = logprobs_tensor # pragma: no cover - return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) # pragma: no cover + logprobs = logprobs_tensor # pragma: no cover + return logprobs[ + 0 + ] # Return shape (vocab_size,) instead of (1, vocab_size) # pragma: no cover async def _next_token_logprobs_v0(self, token_ids): """Request log probabilities of next token asynchronously. @@ -379,8 +393,8 @@ def __del__(self): def _cleanup_engine(self): """Clean up the vLLM engine and associated resources.""" if async_engine := getattr(self, "async_llm_engine", None): - if self.v1: # pragma: no cover - async_engine.shutdown() # pragma: no cover + if self.v1: # pragma: no cover + async_engine.shutdown() # pragma: no cover else: async_engine.shutdown_background_loop() destroy_model_parallel() @@ -406,22 +420,26 @@ async def sample( Returns: (list[int]): The sampled token IDs. """ - if self.v1: # pragma: no cover - if isinstance(prompt_token_ids, list): # pragma: no cover - prompt_token_ids = self.tokenizer.decode(prompt_token_ids) # pragma: no cover - elif isinstance(prompt_token_ids, str): # pragma: no cover + if self.v1: # pragma: no cover + if isinstance(prompt_token_ids, list): # pragma: no cover + prompt_token_ids = self.tokenizer.decode( + prompt_token_ids + ) # pragma: no cover + elif isinstance(prompt_token_ids, str): # pragma: no cover pass - else: # pragma: no cover + else: # pragma: no cover raise ValueError( f"Invalid prompt_ids_Type: {type(prompt_token_ids)}" - ) # pragma: no cover - else: - prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) + ) # pragma: no cover + else: + prompt_token_ids = TokensPrompt(prompt_token_ids=prompt_token_ids) # Question to check: Why do we need to use "byte_vocab"? def decode_eos(eos_token_ids): - if self.v1: # pragma: no cover - return [self.tokenizer.decode([i]) for i in eos_token_ids] # pragma: no cover + if self.v1: # pragma: no cover + return [ + self.tokenizer.decode([i]) for i in eos_token_ids + ] # pragma: no cover else: # What is the adavntage of using "byte_vocab" instead of the tokenizer. Can we do this also with V1 ? [self.byte_vocab[i].decode() for i in eos_token_ids] From 40bf62e1e22558d6c4727ebb924e665969738af7 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 10 Nov 2025 15:32:26 +0000 Subject: [PATCH 10/12] fix coverage patch --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 459b01b..b95d5b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,5 +45,6 @@ build-backend = "setuptools.build_meta" [tool.coverage.report] # rn, V1 must be excluded from coverage as it requires compute capability >= 8, which is not available on the CI. exclude_lines = [ + "pragma: no cover", "if self.v1:", "if v1:"] From 132adc12660279096edea09503989c7a5e04b9f3 Mon Sep 17 00:00:00 2001 From: Clemente Date: Mon, 10 Nov 2025 16:49:16 +0000 Subject: [PATCH 11/12] fix code patches --- genlm/backend/llm/vllm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index 399347b..047d8c4 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -163,7 +163,7 @@ def from_name( ) if engine_opts is not None and "enable_chunked_prefill" in engine_opts: - if engine_opts["enable_chunked_prefill"]: + if engine_opts["enable_chunked_prefill"]: # pragma: no cover warnings.warn( # pragma: no cover "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " "custom sampling functionality." @@ -213,11 +213,11 @@ def from_name( @property def underlying_model(self): - raise NotImplementedError + raise NotImplementedError # pragma: no cover @property def logits_processors(self): - return self._logits_processors + return self._logits_processors # pragma: no cover async def next_token_logprobs(self, token_ids): """Request log probabilities of next token asynchronously with output caching. From 918d5aa90ae86820b26744e187c53f5e7e0233a4 Mon Sep 17 00:00:00 2001 From: Clemente Date: Tue, 11 Nov 2025 15:19:40 +0000 Subject: [PATCH 12/12] adjusted comments format, removed commented out code, simplified the input to next_token_logprobs in vllm.py (there is no need to consider the case where we pass a string as input) --- genlm/backend/llm/hf.py | 40 ------------------------------- genlm/backend/llm/vllm.py | 50 ++++++++++++++++++++------------------- tests/test_llm.py | 4 ++-- 3 files changed, 28 insertions(+), 66 deletions(-) diff --git a/genlm/backend/llm/hf.py b/genlm/backend/llm/hf.py index 746ce0b..e736774 100644 --- a/genlm/backend/llm/hf.py +++ b/genlm/backend/llm/hf.py @@ -110,46 +110,6 @@ def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): return cls(mod, tok, **kwargs) - # @classmethod - # def from_name(cls, model_id, bitsandbytes_opts=None, hf_opts=None, **kwargs): - # """Create an AsyncTransformer instance from a pretrained HuggingFace model. - - # Args: - # model_id (str): Model identifier in HuggingFace's model hub. - # bitsandbytes_opts (dict, optional): Additional configuration options for bitsandbytes quantization. - # Defaults to None. - # hf_opts (dict, optional): Additional configuration options for loading the HuggingFace model. - # Defaults to None. - # **kwargs: Additional arguments passed to the `AsyncTransformer` constructor - - # Returns: - # (AsyncTransformer): An initialized `AsyncTransformer` instance. - # """ - # if bitsandbytes_opts: - # bnb_config = BitsAndBytesConfig(**bitsandbytes_opts) - # else: - # bnb_config = None - - # _hf_opts = { - # "device_map": "auto", - # "torch_dtype": "auto", - # } - # if hf_opts: - # _hf_opts.update(hf_opts) - - # tok = AutoTokenizer.from_pretrained(model_id) - # # model_kwargs = _hf_opts - # # if bnb_config: - # # model_kwargs["quantization_config"] = bnb_config # pass the bnb configuration as an hf parameter - # # mod = AutoModelForCausalLM.from_pretrained( - # # model_id, **model_kwargs - # # ) - # mod = AutoModelForCausalLM.from_pretrained( - # model_id, quantization_config=bnb_config, **_hf_opts - # ) - - # return cls(mod, tok, **kwargs) - @torch.no_grad() def __init__(self, hf_model, hf_tokenizer, batch_size=20, timeout=0.02): """Initialize an AsyncTransformer instance. diff --git a/genlm/backend/llm/vllm.py b/genlm/backend/llm/vllm.py index 047d8c4..2e08e27 100644 --- a/genlm/backend/llm/vllm.py +++ b/genlm/backend/llm/vllm.py @@ -87,11 +87,11 @@ def __init__( # Version specific modifications if self.v1: # pragma: no cover self.default_params["logprobs"] = ( # pragma: no cover - logprobs_per_request # set the retrieved logprobs + logprobs_per_request # Set the retrieved logprobs. ) self.tokenizer = self._wrap_tokenizer( # pragma: no cover async_llm_engine.tokenizer - ) # wrap tokenizer for V1 # pragma: no cover + ) # Wrap tokenizer for V1. # pragma: no cover async_llm_engine.log_stats = False # pragma: no cover else: self.tokenizer = async_llm_engine.engine.get_tokenizer() @@ -111,11 +111,11 @@ def _wrap_tokenizer(self, tokenizer): # pragma: no cover class TokenizerWrapper: # pragma: no cover def __init__(self, tokenizer): # pragma: no cover - # Access the underlying tokenizer from TokenizerGroup + # Access the underlying tokenizer from TokenizerGroup. self._tokenizer = getattr( tokenizer, "tokenizer", tokenizer ) # pragma: no cover - # Add compatibility attributes + # Add compatibility attributes. self.is_fast = ( True # Assume fast tokenizer for v1 # pragma: no cover ) @@ -127,7 +127,7 @@ def __init__(self, tokenizer): # pragma: no cover def __getattr__( # pragma: no cover self, name - ): # Retrieve the tokenizer from the TokenizerGroup object + ): # Retrieve the tokenizer from the TokenizerGroup object. return getattr(self._tokenizer, name) def __len__(self): # pragma: no cover @@ -155,7 +155,7 @@ def from_name( Returns: (AsyncVirtualLM): An `AsyncVirtualLM` instance. - Note: for GPT-OSS, vLLM >= 0.10.2 is required + Note: for GPT-OSS, vLLM >= 0.10.2 is required """ if not HAS_VLLM: raise ImportError( # pragma: no cover @@ -172,11 +172,11 @@ def from_name( if v1: # pragma: no cover original_v1_env = os.environ.get( "VLLM_USE_V1" # pragma: no cover - ) # The Engine Type could be set as an environmental variable so we set it to either V1 or V0 (after copying it in order to reset it later) + ) # The Engine Type could have already been set as an environmental variable so we set it to either V1 or V0 (after copying it in order to reset it later). os.environ["VLLM_USE_V1"] = "1" # pragma: no cover from vllm.engine.arg_utils import ( AsyncEngineArgs, - ) # the AsyncEngineArgs import is different in V1 and V0. # pragma: no cover + ) # The AsyncEngineArgs import is different in V1 and V0. # pragma: no cover engine_opts = { "enable_prefix_caching": True, @@ -188,20 +188,20 @@ def from_name( os.environ["VLLM_USE_V1"] = "0" from vllm import ( AsyncEngineArgs, - ) # the AsyncEngineArgs import is different in V1 and V0 + ) # The AsyncEngineArgs import is different in V1 and V0. engine_opts = { "enable_prefix_caching": True, - "disable_log_requests": True, # is it possible to remove this parameter? it is cauing problems with vllm >= v 0.10.0 + "disable_log_requests": True, # Is it possible to remove this parameter? it is causing problems with vllm >= v 0.10.0. "disable_async_output_proc": True, # This parameter forces vLLM to use v0, which is currently what we want to do. **(engine_opts or {}), } - engine = AsyncLLMEngine.from_engine_args( # Set up the engine + engine = AsyncLLMEngine.from_engine_args( # Set up the engine. AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) ) - # reset the environmental variable, so that it does not interfere with other instances + # Reset the environmental variable, so that it does not interfere with other instances. if original_v1_env is not None: os.environ["VLLM_USE_V1"] = original_v1_env else: @@ -223,12 +223,16 @@ async def next_token_logprobs(self, token_ids): """Request log probabilities of next token asynchronously with output caching. Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + token_ids (list[int]): A list of token IDs, representing a prompt to the language model. Returns: result (torch.Tensor): Normalized log probability tensor. """ + assert isinstance(token_ids, list) and all( + isinstance(i, int) for i in token_ids + ), "token_ids must be a list of token IDs." + key = tuple(token_ids) if self.cache is not None and key in self.cache: @@ -248,19 +252,15 @@ async def _next_token_logprobs_v1(self, token_ids): # pragma: no cover """Request log probabilities of next token asynchronously. Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + token_ids (list[int]): A list of token IDs, representing a prompt to the language model. Returns: (torch.Tensor): Normalized log probability tensor. """ req_id = str(next(self.request_counter)) - # For v1, use string prompt directly instead of TokensPrompt - if isinstance(token_ids, str): # pragma: no cover - prompt = token_ids - else: # pragma: no cover - # Convert token IDs to string for v1 compatibility - prompt = self.tokenizer.decode(token_ids) # pragma: no cover + # Convert token IDs to string for v1 compatibility. # pragma: no cover + prompt = self.tokenizer.decode(token_ids) # pragma: no cover outputs = [] async for output in self.async_llm_engine.generate( @@ -306,7 +306,7 @@ async def _next_token_logprobs_v0(self, token_ids): """Request log probabilities of next token asynchronously. Args: - token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. + token_ids (list[int]): A list of token IDs, representing a prompt to the language model. Returns: (torch.Tensor): Normalized log probability tensor. @@ -340,7 +340,7 @@ def next_token_logprobs_sync(self, token_ids): Returns: (torch.Tensor): Normalized log probability tensor. """ - assert not self.v1 # Currently implemented only for V0 + assert not self.v1 # Currently implemented only for V0. return self.batch_next_token_logprobs_sync([token_ids])[0] def batch_next_token_logprobs_sync(self, token_ids_list): @@ -353,7 +353,7 @@ def batch_next_token_logprobs_sync(self, token_ids_list): Returns: (torch.Tensor): A tensor of normalized log probability tensors, one for each prompt in the input list. """ - assert not self.v1 # Currently implemented only for V0 + assert not self.v1 # Currently implemented only for V0. req_ids = [] req_id2processors = {} for token_ids in token_ids_list: @@ -393,7 +393,9 @@ def __del__(self): def _cleanup_engine(self): """Clean up the vLLM engine and associated resources.""" if async_engine := getattr(self, "async_llm_engine", None): - if self.v1: # pragma: no cover + if ( + self.v1 + ): # The shutdown method is different in V1 and V0. # pragma: no cover async_engine.shutdown() # pragma: no cover else: async_engine.shutdown_background_loop() diff --git a/tests/test_llm.py b/tests/test_llm.py index d317efc..a72e8f9 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -169,14 +169,14 @@ def test_batch_next_token_logprobs_agreement( @cuda_only -@pytest.mark.asyncio # Need to run V1 with asyncio. For some reason gets messed up with multiple event loops +@pytest.mark.asyncio # Need to run V1 with asyncio. For some reason gets messed up with multiple event loops. async def test_v1_next_token_logprobs(async_llm_v1, reference_llm, token_ids_list): """Test V1 logprobs against reference (on top-256 tokens only).""" for token_ids in token_ids_list: logprobs_v1 = await async_llm_v1.next_token_logprobs(token_ids) logprobs_ref = await reference_llm.next_token_logprobs(token_ids) - # Filter non-inf tokens. Note that V1 retrieves only th etpo-k tokens and sets the other to -inf + # Filter non-inf tokens. Note that V1 retrieves only the top-k tokens and sets the other to -inf. valid_mask = logprobs_v1 != -float("inf") if valid_mask.sum() <= 128: pytest.skip("Less than 128 tokens to compare!")