diff --git a/gpustack/policies/candidate_selectors/vllm_resource_fit_selector.py b/gpustack/policies/candidate_selectors/vllm_resource_fit_selector.py index 7560864..b115ced 100644 --- a/gpustack/policies/candidate_selectors/vllm_resource_fit_selector.py +++ b/gpustack/policies/candidate_selectors/vllm_resource_fit_selector.py @@ -5,6 +5,7 @@ import os import re from typing import Dict, List, Optional from gpustack.policies.base import ( + Allocatable, ModelInstanceScheduleCandidate, ScheduleCandidatesSelector, ) @@ -142,6 +143,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): self._vram_claim = 0 self._num_attention_heads = None self._messages = [] + self._workers_allocatable_resource: Dict[int, Allocatable] = {} self._selected_gpu_workers: List[str] = None self._selected_gpu_worker_count = 0 @@ -207,6 +209,14 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): def get_messages(self) -> str: return self._messages + async def _get_worker_allocatable_resource(self, worker: Worker): + if worker.id in self._workers_allocatable_resource: + return self._workers_allocatable_resource[worker.id] + + allocatable = await get_worker_allocatable_resource(self._engine, worker) + self._workers_allocatable_resource[worker.id] = allocatable + return allocatable + async def select_candidates( self, workers: List[Worker] ) -> List[ModelInstanceScheduleCandidate]: @@ -295,9 +305,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): candidates = [] - allocatable = await get_worker_allocatable_resource( - self._engine, worker, self._model_instance - ) + allocatable = await self._get_worker_allocatable_resource(worker) if worker.status.gpu_devices: for _, gpu in enumerate(worker.status.gpu_devices): @@ -374,9 +382,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): if total_gpu < 2: return None - allocatable = await get_worker_allocatable_resource( - self._engine, worker, self._model_instance - ) + allocatable = await self._get_worker_allocatable_resource(worker) sorted_gpu_devices: GPUDevicesInfo = sorted( [ gpu @@ -499,11 +505,14 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): gpu_sum = 0 vram_sum = 0 for worker in worker_group: + allocatable = await self._get_worker_allocatable_resource(worker) if any( gpu.memory is None or gpu.memory.total is None - or (gpu.memory.allocated or 0) / gpu.memory.total - > self._gpu_memory_utilization + or ( + allocatable.vram.get(gpu.index, 0) / gpu.memory.total + < self._gpu_memory_utilization + ) for gpu in worker.status.gpu_devices ): # Skip the worker if any GPU does not satisfy the gpu_memory_utilization requirement. @@ -544,7 +553,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): main_worker = workers[0] main_worker_name = main_worker.name main_gpu_indexes = self._selected_gpu_indexes_by_worker[main_worker_name] - main_vram_claim = await get_worker_vram_claim( + main_vram_claim = await self._get_worker_vram_claim( main_worker, main_gpu_indexes, self._gpu_memory_utilization ) @@ -557,7 +566,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): continue gpu_indexes = self._selected_gpu_indexes_by_worker[worker.name] - vram_claim = await get_worker_vram_claim( + vram_claim = await self._get_worker_vram_claim( worker, gpu_indexes, self._gpu_memory_utilization ) ray_actors.append( @@ -583,30 +592,31 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector): ) ] + async def _get_worker_vram_claim( + self, + worker: Worker, + gpu_indexes: List[int], + gpu_memory_utilization: float = 0.9, + ) -> Dict[int, int]: + """ + Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization. + Returns a dictionary of gpu index to vram claim in bytes. + """ + vram_claim: Dict[int, int] = {} -async def get_worker_vram_claim( - worker: Worker, - gpu_indexes: List[int], - gpu_memory_utilization: float = 0.9, -) -> Dict[int, int]: - """ - Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization. - Returns a dictionary of gpu index to vram claim in bytes. - """ - vram_claim: Dict[int, int] = {} - - for gpu in worker.status.gpu_devices: - if gpu.index in gpu_indexes: - gpu_vram_claim = int(gpu.memory.total * gpu_memory_utilization) - allocatable_vram = gpu.memory.total - safe_int(gpu.memory.allocated) - if gpu_vram_claim > allocatable_vram: - # Allocatable seems to be smaller than the expected. - # We claim the maximum allocatable vram and proceed. - gpu_vram_claim = allocatable_vram + allocatable = await self._get_worker_allocatable_resource(worker) + for gpu in worker.status.gpu_devices: + if gpu.index in gpu_indexes: + gpu_vram_claim = int(gpu.memory.total * gpu_memory_utilization) + allocatable_vram = allocatable.vram.get(gpu.index, 0) + if gpu_vram_claim > allocatable_vram: + # Allocatable seems to be smaller than the expected. + # We claim the maximum allocatable vram and proceed. + gpu_vram_claim = allocatable_vram - vram_claim[gpu.index] = gpu_vram_claim + vram_claim[gpu.index] = gpu_vram_claim - return vram_claim + return vram_claim def _create_candidate(