|
|
|
|
@ -5,6 +5,7 @@ import os
|
|
|
|
|
import re
|
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
|
from gpustack.policies.base import (
|
|
|
|
|
Allocatable,
|
|
|
|
|
ModelInstanceScheduleCandidate,
|
|
|
|
|
ScheduleCandidatesSelector,
|
|
|
|
|
)
|
|
|
|
|
@ -142,6 +143,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
self._vram_claim = 0
|
|
|
|
|
self._num_attention_heads = None
|
|
|
|
|
self._messages = []
|
|
|
|
|
self._workers_allocatable_resource: Dict[int, Allocatable] = {}
|
|
|
|
|
|
|
|
|
|
self._selected_gpu_workers: List[str] = None
|
|
|
|
|
self._selected_gpu_worker_count = 0
|
|
|
|
|
@ -207,6 +209,14 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
def get_messages(self) -> str:
|
|
|
|
|
return self._messages
|
|
|
|
|
|
|
|
|
|
async def _get_worker_allocatable_resource(self, worker: Worker):
|
|
|
|
|
if worker.id in self._workers_allocatable_resource:
|
|
|
|
|
return self._workers_allocatable_resource[worker.id]
|
|
|
|
|
|
|
|
|
|
allocatable = await get_worker_allocatable_resource(self._engine, worker)
|
|
|
|
|
self._workers_allocatable_resource[worker.id] = allocatable
|
|
|
|
|
return allocatable
|
|
|
|
|
|
|
|
|
|
async def select_candidates(
|
|
|
|
|
self, workers: List[Worker]
|
|
|
|
|
) -> List[ModelInstanceScheduleCandidate]:
|
|
|
|
|
@ -295,9 +305,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
|
|
|
|
|
candidates = []
|
|
|
|
|
|
|
|
|
|
allocatable = await get_worker_allocatable_resource(
|
|
|
|
|
self._engine, worker, self._model_instance
|
|
|
|
|
)
|
|
|
|
|
allocatable = await self._get_worker_allocatable_resource(worker)
|
|
|
|
|
|
|
|
|
|
if worker.status.gpu_devices:
|
|
|
|
|
for _, gpu in enumerate(worker.status.gpu_devices):
|
|
|
|
|
@ -374,9 +382,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
if total_gpu < 2:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
allocatable = await get_worker_allocatable_resource(
|
|
|
|
|
self._engine, worker, self._model_instance
|
|
|
|
|
)
|
|
|
|
|
allocatable = await self._get_worker_allocatable_resource(worker)
|
|
|
|
|
sorted_gpu_devices: GPUDevicesInfo = sorted(
|
|
|
|
|
[
|
|
|
|
|
gpu
|
|
|
|
|
@ -499,11 +505,14 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
gpu_sum = 0
|
|
|
|
|
vram_sum = 0
|
|
|
|
|
for worker in worker_group:
|
|
|
|
|
allocatable = await self._get_worker_allocatable_resource(worker)
|
|
|
|
|
if any(
|
|
|
|
|
gpu.memory is None
|
|
|
|
|
or gpu.memory.total is None
|
|
|
|
|
or (gpu.memory.allocated or 0) / gpu.memory.total
|
|
|
|
|
> self._gpu_memory_utilization
|
|
|
|
|
or (
|
|
|
|
|
allocatable.vram.get(gpu.index, 0) / gpu.memory.total
|
|
|
|
|
< self._gpu_memory_utilization
|
|
|
|
|
)
|
|
|
|
|
for gpu in worker.status.gpu_devices
|
|
|
|
|
):
|
|
|
|
|
# Skip the worker if any GPU does not satisfy the gpu_memory_utilization requirement.
|
|
|
|
|
@ -544,7 +553,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
main_worker = workers[0]
|
|
|
|
|
main_worker_name = main_worker.name
|
|
|
|
|
main_gpu_indexes = self._selected_gpu_indexes_by_worker[main_worker_name]
|
|
|
|
|
main_vram_claim = await get_worker_vram_claim(
|
|
|
|
|
main_vram_claim = await self._get_worker_vram_claim(
|
|
|
|
|
main_worker, main_gpu_indexes, self._gpu_memory_utilization
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@ -557,7 +566,7 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
gpu_indexes = self._selected_gpu_indexes_by_worker[worker.name]
|
|
|
|
|
vram_claim = await get_worker_vram_claim(
|
|
|
|
|
vram_claim = await self._get_worker_vram_claim(
|
|
|
|
|
worker, gpu_indexes, self._gpu_memory_utilization
|
|
|
|
|
)
|
|
|
|
|
ray_actors.append(
|
|
|
|
|
@ -583,30 +592,31 @@ class VLLMResourceFitSelector(ScheduleCandidatesSelector):
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
async def _get_worker_vram_claim(
|
|
|
|
|
self,
|
|
|
|
|
worker: Worker,
|
|
|
|
|
gpu_indexes: List[int],
|
|
|
|
|
gpu_memory_utilization: float = 0.9,
|
|
|
|
|
) -> Dict[int, int]:
|
|
|
|
|
"""
|
|
|
|
|
Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization.
|
|
|
|
|
Returns a dictionary of gpu index to vram claim in bytes.
|
|
|
|
|
"""
|
|
|
|
|
vram_claim: Dict[int, int] = {}
|
|
|
|
|
|
|
|
|
|
async def get_worker_vram_claim(
|
|
|
|
|
worker: Worker,
|
|
|
|
|
gpu_indexes: List[int],
|
|
|
|
|
gpu_memory_utilization: float = 0.9,
|
|
|
|
|
) -> Dict[int, int]:
|
|
|
|
|
"""
|
|
|
|
|
Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization.
|
|
|
|
|
Returns a dictionary of gpu index to vram claim in bytes.
|
|
|
|
|
"""
|
|
|
|
|
vram_claim: Dict[int, int] = {}
|
|
|
|
|
|
|
|
|
|
for gpu in worker.status.gpu_devices:
|
|
|
|
|
if gpu.index in gpu_indexes:
|
|
|
|
|
gpu_vram_claim = int(gpu.memory.total * gpu_memory_utilization)
|
|
|
|
|
allocatable_vram = gpu.memory.total - safe_int(gpu.memory.allocated)
|
|
|
|
|
if gpu_vram_claim > allocatable_vram:
|
|
|
|
|
# Allocatable seems to be smaller than the expected.
|
|
|
|
|
# We claim the maximum allocatable vram and proceed.
|
|
|
|
|
gpu_vram_claim = allocatable_vram
|
|
|
|
|
allocatable = await self._get_worker_allocatable_resource(worker)
|
|
|
|
|
for gpu in worker.status.gpu_devices:
|
|
|
|
|
if gpu.index in gpu_indexes:
|
|
|
|
|
gpu_vram_claim = int(gpu.memory.total * gpu_memory_utilization)
|
|
|
|
|
allocatable_vram = allocatable.vram.get(gpu.index, 0)
|
|
|
|
|
if gpu_vram_claim > allocatable_vram:
|
|
|
|
|
# Allocatable seems to be smaller than the expected.
|
|
|
|
|
# We claim the maximum allocatable vram and proceed.
|
|
|
|
|
gpu_vram_claim = allocatable_vram
|
|
|
|
|
|
|
|
|
|
vram_claim[gpu.index] = gpu_vram_claim
|
|
|
|
|
vram_claim[gpu.index] = gpu_vram_claim
|
|
|
|
|
|
|
|
|
|
return vram_claim
|
|
|
|
|
return vram_claim
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _create_candidate(
|
|
|
|
|
|