-
Notifications
You must be signed in to change notification settings - Fork 4
Update vllm.py to support the V1 engine #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
c4cd821
9e163ff
b446b35
df8f789
2adf75b
b38b7d2
f3f920a
45f9896
608d960
a44c427
ab05c3d
40bf62e
132adc1
918d5aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,7 @@ | ||
| FORCE_V0 = True #Currently, we force thw model to use V0, to switch to V1 simply set this to False | ||
|
||
| LOGPROBS_PER_REQUEST = 256 #These are th elogprobs that are retrieved currently in V1 | ||
|
||
|
|
||
| from syslog import LOG_PERROR | ||
| import torch | ||
| import logging | ||
| import warnings | ||
|
|
@@ -6,9 +10,15 @@ | |
| from genlm.backend.cache import OutputCache | ||
|
|
||
| try: | ||
| from vllm import AsyncLLMEngine, SamplingParams, AsyncEngineArgs | ||
| from vllm import AsyncLLMEngine, SamplingParams | ||
| from vllm.utils import Counter | ||
| from vllm.inputs import TokensPrompt | ||
| from vllm import envs | ||
|
|
||
| if envs.VLLM_USE_V1: #The import is different iin V1 and V0 | ||
| from vllm.engine.arg_utils import AsyncEngineArgs | ||
| else: | ||
| from vllm import AsyncEngineArgs | ||
|
|
||
| from vllm.distributed.parallel_state import ( | ||
| destroy_model_parallel, | ||
|
|
@@ -37,7 +47,278 @@ def from_name(cls, *args, **kwargs): # pragma: no cover | |
| "to use the vLLM-based AsyncLM model." | ||
| ) | ||
|
|
||
| else: | ||
| elif envs.VLLM_USE_V1 and not FORCE_V0: #If V1 | ||
|
|
||
| logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) | ||
|
|
||
| class AsyncVirtualLM(AsyncLM): | ||
| default_params = { | ||
| "max_tokens": 1, | ||
| "n": 1, | ||
| "detokenize": False, | ||
| "stop": None, | ||
| "ignore_eos": True, | ||
| "logprobs": LOGPROBS_PER_REQUEST, # This parameter fixes the number of requested logprobs. | ||
| } | ||
|
|
||
| def __init__(self, async_llm_engine, cache_size=0, cache_opts={}): | ||
| """Initialize an `AsyncVirtualLM` instance. | ||
|
|
||
| Args: | ||
| async_llm_engine (AsyncLLMEngine): The async vLLM engine instance. | ||
| cache_size (int, optional): Maximum size of the output cache. If 0, caching is disabled. Defaults to 0. | ||
| cache_opts (dict, optional): Additional options to pass to the [`OutputCache`][genlm.backend.cache.OutputCache] constructor. Defaults to {}. | ||
|
|
||
| Note: | ||
| The cache stores the log probabilities for previously seen token sequences to avoid redundant requests. KV caching is handled internally by the vLLM engine. | ||
| """ | ||
| self.async_llm_engine = async_llm_engine | ||
| # Wrap v1 tokenizer to be compatible with base class | ||
| self.tokenizer = self._wrap_tokenizer(async_llm_engine.tokenizer) | ||
| self.request_counter = Counter() | ||
| self.cache = ( | ||
| OutputCache(maxsize=cache_size, **cache_opts) | ||
| if cache_size > 0 | ||
| else None | ||
| ) | ||
|
|
||
| async_llm_engine.log_stats = False | ||
|
|
||
| super().__init__(tokenizer=self.tokenizer) | ||
|
|
||
| def _wrap_tokenizer(self, tokenizer): | ||
| """Wrap v1 tokenizer to be compatible with base class expectations. | ||
| Note that in V1 async_llm_engine.tokenizer is a TokenizerGroup object""" | ||
| class TokenizerWrapper: | ||
| def __init__(self, tokenizer): | ||
| # Access the underlying tokenizer from TokenizerGroup | ||
| self._tokenizer = getattr(tokenizer, 'tokenizer', tokenizer) | ||
| # Add compatibility attributes | ||
| self.is_fast = True # Assume fast tokenizer for v1 | ||
| self.name_or_path = getattr(self._tokenizer, 'name_or_path', 'unknown') | ||
|
|
||
| def __getattr__(self, name): # Retrieve the tokenizer from the TokenizerGroup object | ||
| return getattr(self._tokenizer, name) | ||
|
|
||
| def __len__(self): | ||
| return len(self._tokenizer) | ||
|
|
||
| return TokenizerWrapper(tokenizer) | ||
|
|
||
| @classmethod | ||
| def from_name(cls, model_name, engine_opts=None, **kwargs): | ||
| """Create a `AsyncVirtualLM` instance from a model name. | ||
|
|
||
| Args: | ||
| model_name (str): Name of the model to load. | ||
| engine_opts (dict): Additional options to pass to the `AsyncLLMEngine`. The engine will be | ||
| configured with prefix caching enabled and async output processing disabled by default. | ||
| **kwargs: Additional arguments passed to `AsyncVirtualLM` constructor. | ||
|
|
||
| Returns: | ||
| (AsyncVirtualLM): An `AsyncVirtualLM` instance. | ||
|
|
||
| Note: for GPT-OSS, vLLM >= 0.10.2 is required | ||
| """ | ||
| if not HAS_VLLM: | ||
| raise ImportError( # pragma: no cover | ||
| "vLLM not available. Install vLLM or use AsyncTransformer instead." | ||
| ) | ||
|
|
||
|
|
||
| if engine_opts is not None and "enable_chunked_prefill" in engine_opts: | ||
| if engine_opts["enable_chunked_prefill"]: | ||
| warnings.warn( # pragma: no cover | ||
| "Setting enable_chunked_prefill to True may interfere with AsyncVirtualLM's " | ||
| "custom sampling functionality." | ||
| ) | ||
|
|
||
| engine_opts = { | ||
| "enable_prefix_caching": True, | ||
| "max_logprobs": LOGPROBS_PER_REQUEST, | ||
| # "disable_log_requests": True, | ||
| **(engine_opts or {}), | ||
| } | ||
|
|
||
| engine = AsyncLLMEngine.from_engine_args( | ||
| AsyncEngineArgs(model=model_name, tokenizer=model_name, **engine_opts) | ||
| ) | ||
|
|
||
| return cls(engine, **kwargs) | ||
|
|
||
| @property | ||
| def underlying_model(self): | ||
| raise NotImplementedError | ||
|
|
||
| @property | ||
| def logits_processors(self): | ||
| return self._logits_processors | ||
|
|
||
| async def next_token_logprobs(self, token_ids): | ||
| """Request log probabilities of next token asynchronously with output caching. | ||
|
|
||
| Args: | ||
| token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. | ||
|
|
||
| Returns: | ||
| result (torch.Tensor): Normalized log probability tensor. | ||
| """ | ||
| # Note that differently from V0, V1 takes inout string by default | ||
| if isinstance(token_ids, str): | ||
| key = token_ids | ||
| else: | ||
| key = tuple(token_ids) | ||
|
|
||
| if self.cache is not None and key in self.cache: | ||
| return self.cache[key] | ||
|
|
||
| result = await self._next_token_logprobs(key) | ||
|
|
||
| if self.cache is not None: | ||
| self.cache[key] = result | ||
|
|
||
| return result | ||
|
|
||
| async def _next_token_logprobs(self, token_ids): | ||
| """Request log probabilities of next token asynchronously. | ||
|
|
||
| Args: | ||
| token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. | ||
|
|
||
| Returns: | ||
| (torch.Tensor): Normalized log probability tensor. | ||
| """ | ||
| req_id = str(next(self.request_counter)) | ||
| # print(f"request id: {req_id}") | ||
| # For v1, use string prompt directly instead of TokensPrompt | ||
| if isinstance(token_ids, str): | ||
| prompt = token_ids | ||
| else: | ||
| # Convert token IDs to string for v1 compatibility | ||
| prompt = self.tokenizer.decode(token_ids) | ||
|
|
||
| outputs = [] | ||
| async for output in self.async_llm_engine.generate( | ||
| prompt=prompt, | ||
| sampling_params=SamplingParams(**self.default_params), | ||
| request_id=req_id, | ||
| ): | ||
| if output.finished: | ||
| outputs.append(output) | ||
|
|
||
| if not outputs: | ||
| raise RuntimeError("No outputs generated") | ||
|
|
||
| # Extract logprobs from the output | ||
| # v1 provides logprobs in the output when logprobs parameter is set | ||
| output = outputs[0].outputs[0] | ||
| logprobs = output.logprobs | ||
|
|
||
| assert logprobs | ||
| # v1 logprobs format: list of dicts with token_id -> logprob | ||
| vocab_size = len(self.tokenizer) | ||
| logprobs_tensor = torch.full((1, vocab_size), -float('inf'), dtype=torch.float32) | ||
|
|
||
| for token_id, logprob in logprobs[0].items(): | ||
| #Assign the logprobs to the top-k retrieved tokens in the vocabulary. | ||
| if hasattr(logprob, 'logprob'): | ||
| logprobs_tensor[0, token_id] = logprob.logprob | ||
| else: | ||
| logprobs_tensor[0, token_id] = float(logprob) | ||
|
|
||
| # Question: do we actually need to renormalize or can we just leave to -inf ?? | ||
| #Distribute the remaining mass across the tokens that are not in the top-k | ||
| non_inf_mask = logprobs_tensor[0] != -float('inf') # create boolean non-inf mask | ||
| if non_inf_mask.sum() > 0: | ||
| # Get the logprobs for the top-k tokens | ||
| top_logprobs = logprobs_tensor[0][non_inf_mask] | ||
| remaining_prob = max( 1.0 - torch.exp(top_logprobs).sum().item(),0.0 ) #Compute the remaining probability mass | ||
| remaining_tokens = (~non_inf_mask).sum().item() #Compute the number of remaining tokens | ||
| uniform_logprob = torch.log(torch.tensor(remaining_prob / remaining_tokens)) #Compute the uniform log probability | ||
| logprobs_tensor[0][~non_inf_mask] = uniform_logprob | ||
|
|
||
| logprobs = logprobs_tensor | ||
| return logprobs[0] # Return shape (vocab_size,) instead of (1, vocab_size) | ||
|
|
||
| def next_token_logprobs_sync(self, token_ids): #For now, this simply uses the asynchronous method. | ||
| """Request log probabilities of next token synchronously. | ||
|
|
||
| Args: | ||
| token_ids_list (list[int]): A list of token IDs, representing a prompt to the language model. | ||
|
|
||
| Returns: | ||
| (torch.Tensor): Normalized log probability tensor. | ||
| """ | ||
| import asyncio | ||
| return asyncio.run(self.next_token_logprobs(token_ids)) | ||
|
|
||
|
|
||
| def clear_cache(self): | ||
| """Clear output cache.""" | ||
| if self.cache: | ||
| self.cache.clear() | ||
|
|
||
| def __del__(self): | ||
| """Clean up resources on deletion.""" | ||
| self._cleanup_engine() | ||
|
|
||
| def _cleanup_engine(self): | ||
| """Clean up the vLLM engine and associated resources.""" | ||
| if async_engine := getattr(self, "async_llm", None): | ||
| async_engine.shutdown() | ||
| destroy_model_parallel() | ||
| destroy_distributed_environment() | ||
|
|
||
| async def sample( | ||
| self, | ||
| prompt_token_ids, | ||
| max_tokens, | ||
| eos_token_ids, | ||
| temperature=1.0, | ||
| seed=None, | ||
| ): | ||
| """Sample from the language model. | ||
|
|
||
| Args: | ||
| prompt_token_ids (list[int]): The token IDs of the prompt. | ||
| eos_token_ids (list[int]): The token IDs of the end-of-sequence tokens. | ||
| temperature (float, optional): The temperature to use to rescale the logits. Defaults to 1.0. | ||
| max_tokens (int): The maximum number of tokens to generate. | ||
| seed (int, optional): The seed for the random number generator. Defaults to None. | ||
|
|
||
| Returns: | ||
| (list[int]): The sampled token IDs. | ||
| """ | ||
|
|
||
| if isinstance(prompt_token_ids, list): | ||
| prompt_token_ids = self.tokenizer.decode(prompt_token_ids) | ||
| elif isinstance(prompt_token_ids, str): | ||
| pass | ||
| else: | ||
| raise ValueError(f"Invalid prompt_ids_Type: {type(prompt_token_ids)}") | ||
|
|
||
| async for output in self.async_llm_engine.generate( | ||
| prompt=prompt_token_ids, | ||
| sampling_params=SamplingParams( | ||
| n=1, | ||
| max_tokens=max_tokens, | ||
| temperature=temperature, | ||
| seed=seed, | ||
| stop=[self.tokenizer.decode([i]) for i in eos_token_ids], | ||
| ), | ||
| request_id=str(next(self.request_counter)), | ||
| ): | ||
| if output.finished: | ||
| assert len(output.outputs) == 1, ( | ||
| "Expected exactly one sequence group" | ||
| ) | ||
| token_ids = list(output.outputs[0].token_ids) | ||
| if token_ids[-1] in eos_token_ids: | ||
| token_ids = token_ids[:-1] | ||
| return token_ids | ||
|
|
||
| else: #Otherwise use V0 | ||
|
|
||
| logging.getLogger("vllm.engine.async_llm_engine").setLevel(logging.WARNING) | ||
|
|
||
| class PassThroughLogitsProcessor: | ||
|
|
||
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| { | ||
| "cells": [ | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 2, | ||
| "id": "21839ddb-6090-4866-beb4-b24a7c2f4e80", | ||
| "metadata": {}, | ||
| "outputs": [], | ||
| "source": [ | ||
| "import genlm\n", | ||
| "import numpy as np" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": null, | ||
| "id": "ca6090bf-5ef9-45ee-8e15-12c0c9399422", | ||
| "metadata": {}, | ||
| "outputs": [], | ||
| "source": [] | ||
| } | ||
| ], | ||
| "metadata": { | ||
| "kernelspec": { | ||
| "display_name": "Python genlm", | ||
| "language": "python", | ||
| "name": "genlm" | ||
| }, | ||
| "language_info": { | ||
| "codemirror_mode": { | ||
| "name": "ipython", | ||
| "version": 3 | ||
| }, | ||
| "file_extension": ".py", | ||
| "mimetype": "text/x-python", | ||
| "name": "python", | ||
| "nbconvert_exporter": "python", | ||
| "pygments_lexer": "ipython3", | ||
| "version": "3.11.13" | ||
| } | ||
| }, | ||
| "nbformat": 4, | ||
| "nbformat_minor": 5 | ||
| } |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's with all this commented out code?