Compare commits

...

34 Commits
main ... master

Author SHA1 Message Date
jshixiong 04963ef76c fix: instance gpu names
3 hours ago
jshixiong fcce89e391 fix: instance gpu names
3 hours ago
jshixiong 4d734a9dcd fix: instance gpu names
3 hours ago
jshixiong b26cb7a28b fix: instance gpu names
3 hours ago
jshixiong 9676a1cffe fix: gpu model info
5 hours ago
jshixiong 4ea11004a9 fix: gpu model info
5 hours ago
jshixiong 2c538ed37b fix: model instance time
4 months ago
jshixiong 31a5fa93ec fix: rm log
4 months ago
jshixiong 48bb1ff2a9 fix: model instance steam
4 months ago
jshixiong 460c993bfe fix: log
4 months ago
jshixiong 871cb01763 fix: log
4 months ago
jshixiong 747229110d fix: circular import bug
4 months ago
jshixiong 4e83003b7d fix: circular import bug
4 months ago
jshixiong 95ddc5e324 fix: models instance stream
4 months ago
jshixiong 5bb36f5c6a fix: models gpu names
4 months ago
jshixiong fbe4dc6584 fix: models gpu names
4 months ago
jshixiong c612581098 fix: models gpu names
4 months ago
jshixiong 2a21844347 fix: models gpu names
4 months ago
jshixiong 1361af3abc fix: worker stream
4 months ago
jshixiong f4b69765b6 fix: worker stream
4 months ago
jshixiong 325e4d2a36 fix: worker stream
4 months ago
jshixiong 596bf65c90 fix: worker stream
4 months ago
jshixiong 03195d9115 fix: worker stream
4 months ago
jshixiong 61ecd531ba fix: dashboard gpu names
4 months ago
jshixiong 1f350ed34c fix: gpus with model instance
4 months ago
jshixiong 96cf3216f4 fix: gpus with model instance
4 months ago
jshixiong 66469c42b3 fix: gpus with model instance
4 months ago
jshixiong 0b43c44893 fix: gpus with model instance
4 months ago
jshixiong f02cfe09f0 fix: dashboard with model info
4 months ago
jshixiong 52a073b439 fix: dashboard with model info
4 months ago
jshixiong 147a3b4bdc fix: workers with model instance
4 months ago
jshixiong f6f50cc482 fix: workers with model instance
4 months ago
jshixiong be05884b47 fix: update linux install
4 months ago
jshixiong 01e0b8b5ae fix:update poetry.lock
4 months ago

@ -17,7 +17,9 @@ from gpustack.schemas.dashboard import (
SystemLoadSummary,
SystemSummary,
TimeSeriesData,
InstanceInfo,
)
from gpustack.schemas.gpu_devices import GPUDevice
from gpustack.schemas.model_usage import ModelUsage
from gpustack.schemas.models import Model, ModelInstance
from gpustack.schemas.system_load import SystemLoad
@ -25,6 +27,7 @@ from gpustack.schemas.users import User
from gpustack.server.deps import SessionDep
from gpustack.schemas import Worker
from gpustack.server.system_load import compute_system_load
from gpustack.utils.gpu import get_gpu_names_by_distributed_servers
router = APIRouter()
@ -263,8 +266,32 @@ async def get_active_models(session: AsyncSession) -> List[ModelSummary]:
ram=0,
vram=0,
)
gpus = await GPUDevice.paginated_by_query(session=session)
instance_infos = []
if result.id in model_instances_by_id:
for model_instance in model_instances_by_id[result.id]:
ram, vram = get_instance_resource_usage(model_instance)
gpu_names = get_gpu_names_by_distributed_servers(
gpus.items,
model_instance.worker_name,
model_instance.gpu_indexes,
model_instance.distributed_servers,
)
instance_infos.append(
InstanceInfo(
id=model_instance.id,
name=model_instance.name,
worker_name=model_instance.worker_name,
worker_ip=model_instance.worker_ip,
gpu_indexes=model_instance.gpu_indexes,
gpu_addresses=model_instance.gpu_addresses,
gpu_names=gpu_names,
ram=ram,
vram=vram,
)
)
aggregate_resource_claim(resource_claim, model_instance)
model_summary.append(
@ -274,6 +301,7 @@ async def get_active_models(session: AsyncSession) -> List[ModelSummary]:
categories=result.categories,
resource_claim=resource_claim,
instance_count=result.instance_count,
instance_infos=instance_infos,
token_count=(
result.total_token_count
if result.total_token_count is not None
@ -432,3 +460,30 @@ async def usage_stats(
model_ids=model_ids,
user_ids=user_ids,
)
def get_instance_resource_usage(model_instance: ModelInstance) -> tuple[int, int]:
main_claim = model_instance.computed_resource_claim
main_ram, main_vram = get_claim_resource_usage(main_claim)
sub_ram, sub_vram = 0, 0
distributed_servers = model_instance.distributed_servers
if distributed_servers and distributed_servers.subordinate_workers:
for subordinate_worker in distributed_servers.subordinate_workers:
sub_ram_tmp, sub_vram_tmp = get_claim_resource_usage(
subordinate_worker.computed_resource_claim
)
sub_ram += sub_ram_tmp
sub_vram += sub_vram_tmp
return main_ram + sub_ram, main_vram + sub_vram
def get_claim_resource_usage(claim) -> tuple[int, int]:
if not claim:
return 0, 0
ram = claim.ram if hasattr(claim, 'ram') and claim.ram is not None else 0
vram_dict = claim.vram if hasattr(claim, 'vram') and claim.vram is not None else {}
vram = sum(vram_dict.values())
return int(ram), int(vram)

@ -1,10 +1,15 @@
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from gpustack.schemas.models import ModelInstance
from gpustack.server.deps import ListParamsDep, SessionDep, EngineDep
from gpustack.schemas.gpu_devices import (
GPUDevice,
GPUDevicesPublic,
GPUDevicePublic,
ModelInstanceInfoInGPU,
ModelInstanceComputedResourceClaim,
)
from gpustack.api.exceptions import (
NotFoundException,
@ -28,13 +33,75 @@ async def get_gpus(
media_type="text/event-stream",
)
return await GPUDevice.paginated_by_query(
gpus = await GPUDevice.paginated_by_query(
session=session,
fuzzy_fields=fuzzy_fields,
page=params.page,
per_page=params.perPage,
)
# Get final_instances
model_instances = await ModelInstance.paginated_by_query(session)
final_instances = []
for model_instance in model_instances.items:
final_instances.append(
ModelInstanceInfoInGPU(
id=model_instance.id,
name=model_instance.name,
model_name=model_instance.model_name,
worker_ip=model_instance.worker_ip,
worker_name=model_instance.worker_name,
gpu_indexes=model_instance.gpu_indexes,
computed_resource_claim=ModelInstanceComputedResourceClaim(
ram=model_instance.computed_resource_claim.ram,
vram=model_instance.computed_resource_claim.vram,
),
)
)
if (
model_instance.distributed_servers
and model_instance.distributed_servers.subordinate_workers
):
for (
subordinate_worker
) in model_instance.distributed_servers.subordinate_workers:
final_instances.append(
ModelInstanceInfoInGPU(
id=model_instance.id,
name=model_instance.name,
model_name=model_instance.model_name,
worker_ip=subordinate_worker.worker_ip,
worker_name=subordinate_worker.worker_name,
gpu_indexes=subordinate_worker.gpu_indexes,
computed_resource_claim=ModelInstanceComputedResourceClaim(
ram=subordinate_worker.computed_resource_claim.ram,
vram=subordinate_worker.computed_resource_claim.vram,
),
)
)
updated_data = []
for gpu in gpus.items:
model_instances_in_gpu = []
for final_instance in final_instances:
if final_instance.worker_name == gpu.worker_name:
if (
final_instance.gpu_indexes
and gpu.index in final_instance.gpu_indexes
):
model_instances_in_gpu.append(final_instance)
gpu_dict = gpu.model_dump(exclude={"model_instances"})
enriched = GPUDevicePublic(**gpu_dict)
enriched.model_instances = model_instances_in_gpu
updated_data.append(enriched)
return GPUDevicesPublic(
items=updated_data,
pagination=gpus.pagination,
)
@router.get("/{id}", response_model=GPUDevicePublic)
async def get_gpu(session: SessionDep, id: str):

@ -3,8 +3,10 @@ import aiohttp
from fastapi import APIRouter, HTTPException, Request, status
from fastapi.responses import PlainTextResponse, StreamingResponse
from gpustack.api.responses import StreamingResponseWithStatusCode
from gpustack.schemas.gpu_devices import GPUDevice
from gpustack.server.services import ModelInstanceService
from gpustack.utils.gpu import get_gpu_names_by_distributed_servers
from gpustack.worker.logs import LogOptionsDep
from gpustack.api.exceptions import (
InternalServerErrorException,
@ -17,15 +19,16 @@ from gpustack.schemas.models import (
ModelInstanceCreate,
ModelInstancePublic,
ModelInstanceUpdate,
ModelInstancesPublic,
ModelInstanceStateEnum,
ModelInstancePublicWithExtra,
ModelInstancesPublicWithExtra,
)
from gpustack.schemas.model_files import ModelFileStateEnum
router = APIRouter()
@router.get("", response_model=ModelInstancesPublic)
@router.get("", response_model=ModelInstancesPublicWithExtra)
async def get_model_instances(
engine: EngineDep,
session: SessionDep,
@ -48,19 +51,46 @@ async def get_model_instances(
if state:
fields["state"] = state
gpus = await GPUDevice.paginated_by_query(session=session)
gpu_list = [
{
"worker_name": gpu.worker_name,
"worker_ip": gpu.worker_ip,
"index": gpu.index,
"name": gpu.name,
}
for gpu in gpus.items
]
if params.watch:
return StreamingResponse(
ModelInstance.streaming(engine, fields=fields),
ModelInstance.streaming(engine, fields=fields, gpu_list=gpu_list),
media_type="text/event-stream",
)
return await ModelInstance.paginated_by_query(
model_instances = await ModelInstance.paginated_by_query(
session=session,
fields=fields,
page=params.page,
per_page=params.perPage,
)
updated_data = []
for model_instance in model_instances.items:
gpu_names = get_gpu_names_by_distributed_servers(
gpus.items,
model_instance.worker_name,
model_instance.gpu_indexes,
model_instance.distributed_servers,
)
enriched = ModelInstancePublicWithExtra.model_validate(model_instance)
enriched.gpu_names = gpu_names
updated_data.append(enriched)
return ModelInstancesPublicWithExtra(
items=updated_data, pagination=model_instances.pagination
)
@router.get("/{id}", response_model=ModelInstancePublic)
async def get_model_instance(session: SessionDep, id: int):

@ -1,4 +1,5 @@
import math
import logging
from typing import List, Optional, Union
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
@ -16,12 +17,14 @@ from gpustack.api.exceptions import (
BadRequestException,
)
from gpustack.schemas.common import Pagination
from gpustack.schemas.gpu_devices import GPUDevice
from gpustack.schemas.models import (
ModelInstance,
ModelInstancesPublic,
get_backend,
is_audio_model,
BackendEnum,
ModelInstancePublicWithExtra,
ModelInstancesPublicWithExtra,
)
from gpustack.schemas.workers import GPUDeviceInfo, VendorEnum, Worker
from gpustack.server.deps import ListParamsDep, SessionDep, EngineDep
@ -35,9 +38,10 @@ from gpustack.schemas.models import (
from gpustack.server.services import ModelService, WorkerService
from gpustack.utils.command import find_parameter
from gpustack.utils.convert import safe_int
from gpustack.utils.gpu import parse_gpu_id
from gpustack.utils.gpu import parse_gpu_id, get_gpu_names_by_distributed_servers
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("", response_model=ModelsPublic)
@ -132,7 +136,7 @@ async def get_model(session: SessionDep, id: int):
return model
@router.get("/{id}/instances", response_model=ModelInstancesPublic)
@router.get("/{id}/instances", response_model=ModelInstancesPublicWithExtra)
async def get_model_instances(
engine: EngineDep, session: SessionDep, id: int, params: ListParamsDep
):
@ -140,13 +144,17 @@ async def get_model_instances(
if not model:
raise NotFoundException(message="Model not found")
gpus = await GPUDevice.paginated_by_query(session=session)
gpu_list = [
{"worker_ip": gpu.worker_ip, "index": gpu.index, "name": gpu.name}
for gpu in gpus.items
]
if params.watch:
fields = {"model_id": id}
return StreamingResponse(
ModelInstance.streaming(engine, fields=fields),
ModelInstance.streaming(engine, fields=fields, gpu_list=gpu_list),
media_type="text/event-stream",
)
instances = model.instances
count = len(instances)
total_page = math.ceil(count / params.perPage)
@ -157,7 +165,23 @@ async def get_model_instances(
totalPage=total_page,
)
return ModelInstancesPublic(items=instances, pagination=pagination)
updated_data = []
for model_instance in instances:
gpu_names = get_gpu_names_by_distributed_servers(
gpus.items,
model_instance.worker_name,
model_instance.gpu_indexes,
model_instance.distributed_servers,
)
enriched = ModelInstancePublicWithExtra.model_validate(model_instance)
enriched.gpu_names = gpu_names
updated_data.append(enriched)
return ModelInstancesPublicWithExtra(
items=updated_data,
pagination=pagination,
)
async def validate_model_in(

@ -1,5 +1,10 @@
from typing import List
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from sqlmodel.ext.asyncio.session import AsyncSession
from gpustack.schemas.models import ModelInstance, ModelInstancePublic
from gpustack.api.exceptions import (
AlreadyExistsException,
@ -11,15 +16,16 @@ from gpustack.schemas.workers import (
WorkerCreate,
WorkerPublic,
WorkerUpdate,
WorkersPublic,
Worker,
WorkerPublicWithInstances,
WorkersPublicWithInstances,
)
from gpustack.server.services import WorkerService
router = APIRouter()
@router.get("", response_model=WorkersPublic)
@router.get("", response_model=WorkersPublicWithInstances)
async def get_workers(
engine: EngineDep,
session: SessionDep,
@ -44,7 +50,7 @@ async def get_workers(
media_type="text/event-stream",
)
return await Worker.paginated_by_query(
workers = await Worker.paginated_by_query(
session=session,
fields=fields,
fuzzy_fields=fuzzy_fields,
@ -52,6 +58,22 @@ async def get_workers(
per_page=params.perPage,
)
updated_data = []
for worker in workers.items:
model_instances = await get_model_instances_by_worker(session, worker.id)
model_instances_public = []
for model_instance in model_instances:
model_instance_public = ModelInstancePublic.model_validate(model_instance)
model_instances_public.append(model_instance_public)
enriched = WorkerPublicWithInstances.model_validate(worker)
enriched.model_instances = model_instances_public
updated_data.append(enriched)
return WorkersPublicWithInstances(
items=updated_data,
pagination=workers.pagination,
)
@router.get("/{id}", response_model=WorkerPublic)
async def get_worker(session: SessionDep, id: int):
@ -101,3 +123,14 @@ async def delete_worker(session: SessionDep, id: int):
await WorkerService(session).delete(worker)
except Exception as e:
raise InternalServerErrorException(message=f"Failed to delete worker: {e}")
async def get_model_instances_by_worker(session: AsyncSession, worker_id: int):
fields = {
"worker_id": worker_id,
}
model_instances: List[ModelInstance] = await ModelInstance.all_by_fields(
session, fields=fields
)
return model_instances

@ -34,6 +34,8 @@ from gpustack.schemas.workers import (
GPUDeviceInfo,
CPUInfo,
MemoryInfo,
WorkersPublicWithInstances,
WorkerPublicWithInstances,
)
from gpustack.schemas.users import User, UserCreate, UserUpdate, UserPublic, UsersPublic
from gpustack.schemas.api_keys import ApiKey, ApiKeyCreate, ApiKeyPublic, ApiKeysPublic
@ -51,6 +53,8 @@ __all__ = [
"WorkerUpdate",
"WorkerPublic",
"WorkersPublic",
"WorkerPublicWithInstances",
"WorkersPublicWithInstances",
"Model",
"ModelCreate",
"ModelUpdate",

@ -48,6 +48,18 @@ class ResourceClaim(BaseModel):
vram: int # in bytes
class InstanceInfo(BaseModel):
id: int
name: str
worker_name: Optional[str] = None
worker_ip: Optional[str] = None
gpu_indexes: Optional[List[int]]
gpu_addresses: Optional[List[str]]
gpu_names: Optional[List[str]]
ram: int = 0
vram: int = 0
class ModelSummary(BaseModel):
id: int
name: str
@ -55,6 +67,7 @@ class ModelSummary(BaseModel):
instance_count: int
token_count: int
categories: Optional[List[str]] = None
instance_infos: Optional[List[InstanceInfo]] = None
class ResourceCounts(BaseModel):

@ -1,5 +1,7 @@
from typing import List, Optional, Dict
from pydantic import BaseModel
from sqlmodel import SQLModel
from sqlmodel import SQLModel, Field
from gpustack.mixins import BaseModelMixin
from gpustack.schemas.common import PaginatedList
@ -15,13 +17,28 @@ class GPUDeviceBase(GPUDeviceInfo, BaseModel):
worker_ip: str
class ModelInstanceComputedResourceClaim(BaseModel):
ram: Optional[int] # in bytes
vram: Optional[Dict[int, int]]
class ModelInstanceInfoInGPU(BaseModel):
id: int
name: str
model_name: str
worker_ip: Optional[str] = None
worker_name: str
gpu_indexes: Optional[List[int]]
computed_resource_claim: Optional[ModelInstanceComputedResourceClaim]
class GPUDevice(GPUDeviceBase, SQLModel, BaseModelMixin, table=True):
__tablename__ = "gpu_devices_view"
__mapper_args__ = {'primary_key': ["id"]}
class GPUDevicePublic(GPUDeviceBase):
pass
model_instances: List[ModelInstanceInfoInGPU] = Field(default_factory=list)
GPUDevicesPublic = PaginatedList[GPUDevicePublic]

@ -1,21 +1,35 @@
import asyncio
import logging
from datetime import datetime
from enum import Enum
import hashlib
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Dict, List, Optional, Union
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Dict,
List,
Optional,
Union,
Callable,
AsyncGenerator,
)
from pydantic import BaseModel, ConfigDict, model_validator, Field as PydanticField
from sqlalchemy import JSON, Column
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlmodel import Field, Relationship, SQLModel, Text
from gpustack.schemas.common import PaginatedList, UTCDateTime, pydantic_column_type
from gpustack.mixins import BaseModelMixin
from gpustack.schemas.links import ModelInstanceModelFileLink
from gpustack.server.bus import EventType, Event
from gpustack.utils.command import find_parameter
if TYPE_CHECKING:
from gpustack.schemas.model_files import ModelFile
logger = logging.getLogger(__name__)
# Models
@ -380,6 +394,82 @@ class ModelInstance(ModelInstanceBase, BaseModelMixin, table=True):
def __hash__(self):
return self.id
@classmethod
async def _enrich_instance_with_gpu_names(
cls, event: Event, gpu_list: List[Dict]
) -> None:
instance = event.data
worker_ip = getattr(instance, "worker_ip", None)
worker_name = getattr(instance, "worker_name", None)
gpu_indexes = getattr(instance, "gpu_indexes", [])
try:
gpu_names = get_gpu_names_by_gpus_worker_name(
gpu_list, worker_name, gpu_indexes
)
# 如果存在分布式子工作节点获取其GPU名称
subordinate_gpu_names = []
if (
instance.distributed_servers
and instance.distributed_servers.subordinate_workers
):
for (
subordinate_worker
) in instance.distributed_servers.subordinate_workers:
subordinate_gpu_names += get_gpu_names_by_gpus_worker_name(
gpu_list,
subordinate_worker.worker_name,
subordinate_worker.gpu_indexes,
)
# 合并主节点和子节点的GPU名称
gpu_names + subordinate_gpu_names
public_instance = ModelInstancePublicWithExtra.model_validate(instance)
public_instance.gpu_names = gpu_names
event.data = public_instance
except Exception as e:
logger.warning(
f"Failed to inject gpu for ModelInstance worker_ip={worker_ip}, gpu_indexes={gpu_indexes}: {e}"
)
@classmethod
async def streaming(
cls,
engine: AsyncEngine,
fields: Optional[dict] = None,
fuzzy_fields: Optional[dict] = None,
filter_func: Optional[Callable[[Any], bool]] = None,
gpu_list: Optional[List[Dict]] = None,
) -> AsyncGenerator[str, None]:
"""
在原有 streaming 基础上为每个 instance 事件注入 gpu_names
"""
try:
# 使用原有 subscribe 机制,保证 topic、过滤、转换逻辑不变
async for event in cls.subscribe(engine):
if event.type == EventType.HEARTBEAT:
yield "\n\n"
continue
if not cls._match_fields(event, fields):
continue
if not cls._match_fuzzy_fields(event, fuzzy_fields):
continue
if filter_func and not filter_func(event.data):
continue
if gpu_list:
await cls._enrich_instance_with_gpu_names(event, gpu_list)
# 格式化输出
yield cls._format_event(event)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in streaming {cls.__name__}: {e}")
class ModelInstanceCreate(ModelInstanceBase):
pass
@ -397,6 +487,12 @@ class ModelInstancePublic(
updated_at: datetime
class ModelInstancePublicWithExtra(ModelInstancePublic):
gpu_names: List[str] = Field(default_factory=list)
ModelInstancesPublicWithExtra = PaginatedList[ModelInstancePublicWithExtra]
ModelInstancesPublic = PaginatedList[ModelInstancePublic]
@ -498,3 +594,13 @@ def get_mmproj_filename(model: Union[Model, ModelSource]) -> Optional[str]:
return mmproj
return "*mmproj*.gguf"
def get_gpu_names_by_gpus_worker_name(
gpus: List[Dict], worker_name: str, gpu_indexes: List[int]
) -> List[str]:
return [
gpu.get("name")
for gpu in gpus
if gpu.get("worker_name") == worker_name and gpu.get("index") in gpu_indexes
]

@ -1,6 +1,14 @@
import asyncio
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Dict, Optional
from typing import Dict, Optional, AsyncGenerator, Callable, Any
from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlmodel.ext.asyncio.session import AsyncSession
from gpustack.schemas import ModelInstancePublic
from gpustack.schemas.models import ModelInstance
from pydantic import ConfigDict, BaseModel
from sqlmodel import Field, SQLModel, JSON, Column, Text
@ -9,7 +17,10 @@ from gpustack.schemas.common import PaginatedList, UTCDateTime, pydantic_column_
from typing import List
from sqlalchemy.orm import declarative_base
from gpustack.server.bus import EventType, Event
Base = declarative_base()
logger = logging.getLogger(__name__)
class UtilizationInfo(BaseModel):
@ -209,6 +220,79 @@ class Worker(WorkerBase, BaseModelMixin, table=True):
__tablename__ = 'workers'
id: Optional[int] = Field(default=None, primary_key=True)
@classmethod
async def _enrich_worker_with_model_instances(
cls, event: Event, engine: AsyncEngine
) -> None:
"""
查询 model_instances 并注入到 worker 对象中
"""
worker = event.data
worker_id = getattr(worker, "id", None)
if not worker_id:
if isinstance(worker, dict):
worker_id = worker.get("id")
if not worker_id:
return
async with AsyncSession(engine) as session:
try:
model_instances = await ModelInstance.all_by_fields(
session, fields={"worker_id": worker_id}
)
model_instances_public = []
for model_instance in model_instances:
model_instance_public = ModelInstancePublic.model_validate(
model_instance
)
model_instances_public.append(model_instance_public)
public_worker = WorkerPublicWithInstances.model_validate(worker)
public_worker.model_instances = model_instances_public
event.data = public_worker
except Exception as e:
logger.warning(
f"Failed to inject model_instances for worker {worker_id}: {e}"
)
@classmethod # noqa: C901
async def streaming(
cls,
engine: AsyncEngine,
fields: Optional[dict] = None,
fuzzy_fields: Optional[dict] = None,
filter_func: Optional[Callable[[Any], bool]] = None,
) -> AsyncGenerator[str, None]:
"""
在原有 streaming 基础上为每个 Worker 事件注入 model_instances
"""
try:
# 使用原有 subscribe 机制,保证 topic、过滤、转换逻辑不变
async for event in cls.subscribe(engine):
if event.type == EventType.HEARTBEAT:
yield "\n\n"
continue
if not cls._match_fields(event, fields):
continue
if not cls._match_fuzzy_fields(event, fuzzy_fields):
continue
if filter_func and not filter_func(event.data):
continue
await cls._enrich_worker_with_model_instances(event, engine)
# 格式化输出
yield cls._format_event(event)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in streaming {cls.__name__}: {e}")
def __hash__(self):
return hash(self.id)
@ -234,4 +318,10 @@ class WorkerPublic(
updated_at: datetime
class WorkerPublicWithInstances(WorkerPublic):
model_instances: List[ModelInstancePublic] = Field(default_factory=list)
WorkersPublicWithInstances = PaginatedList[WorkerPublicWithInstances]
WorkersPublic = PaginatedList[WorkerPublic]

@ -94,3 +94,61 @@ def any_gpu_match(
if not worker.status or not worker.status.gpu_devices:
return False
return any(verify(gpu) for gpu in worker.status.gpu_devices)
def get_gpu_names_by_gpus(
gpus: list, worker_ip: str, gpu_indexes: List[int]
) -> List[str]:
"""
根据 worker_ip gpu_indexes 列表过滤出对应的 GPU 设备名称列表
:param gpus: 所有 GPU 设备列表
:param worker_ip: 目标 worker IP 地址
:param gpu_indexes: 要匹配的 GPU 逻辑索引列表 [0, 1]
:return: 匹配的 GPU 名称列表 ["NVIDIA A100", "NVIDIA A100"]
"""
return [
gpu.name
for gpu in gpus
if gpu.worker_ip == worker_ip and gpu.index in gpu_indexes
]
def get_gpu_names_by_gpus_worker_name(
gpus: list, worker_name: str, gpu_indexes: List[int]
) -> List[str]:
"""
根据 worker_ip gpu_indexes 列表过滤出对应的 GPU 设备名称列表
:param gpus: 所有 GPU 设备列表
:param worker_name: 目标 worker
:param gpu_indexes: 要匹配的 GPU 逻辑索引列表 [0, 1]
:return: 匹配的 GPU 名称列表 ["NVIDIA A100", "NVIDIA A100"]
"""
return [
gpu.name
for gpu in gpus
if gpu.worker_name == worker_name and gpu.index in gpu_indexes
]
def get_gpu_names_by_distributed_servers(
gpus: list, main_worker_name: str, main_gpu_indexes: list, distributed_servers
) -> List[str]:
# 获取主工作节点的GPU名称
gpu_names = get_gpu_names_by_gpus_worker_name(
gpus, main_worker_name, main_gpu_indexes
)
# 如果存在分布式子工作节点获取其GPU名称
subordinate_gpu_names = []
if distributed_servers and distributed_servers.subordinate_workers:
for subordinate_worker in distributed_servers.subordinate_workers:
subordinate_gpu_names += get_gpu_names_by_gpus_worker_name(
gpus,
subordinate_worker.worker_name,
subordinate_worker.gpu_indexes,
)
# 合并主节点和子节点的GPU名称
return gpu_names + subordinate_gpu_names

@ -25,15 +25,30 @@ function download_ui() {
local ui_path="${ROOT_DIR}/gpustack/ui"
local tmp_ui_path="${ui_path}/tmp"
local tag="latest"
local local_ui_dir="/home/gpustack-ui/dist" # 本地 UI 构建目录
if [[ "${GIT_VERSION}" != "v0.0.0" ]]; then
tag="${GIT_VERSION}"
fi
# 删除旧的 ui 目录
rm -rf "${ui_path}"
mkdir -p "${tmp_ui_path}/ui"
mkdir -p "${ui_path}"
gpustack::log::info "Checking for local UI build at ${local_ui_dir}"
# 优先检查本地构建目录是否存在且非空
if [[ -d "${local_ui_dir}" ]] && [[ -n "$(ls -A ${local_ui_dir} 2>/dev/null)" ]]; then
gpustack::log::info "Local UI found at ${local_ui_dir}, copying..."
cp -a "${local_ui_dir}/." "${ui_path}"
gpustack::log::info "Local UI copied successfully."
return 0
else
gpustack::log::info "No valid local UI found at ${local_ui_dir}, proceeding with download..."
fi
gpustack::log::info "downloading '${tag}' UI assets"
# 如果本地没有,则下载远程 UI
mkdir -p "${tmp_ui_path}/ui"
if ! curl --retry 3 --retry-connrefused --retry-delay 3 -sSfL "https://gpustack-ui-1303613262.cos.accelerate.myqcloud.com/releases/${tag}.tar.gz" 2>/dev/null |
tar -xzf - --directory "${tmp_ui_path}/ui" 2>/dev/null; then
@ -48,8 +63,11 @@ function download_ui() {
gpustack::log::fatal "failed to download '${default_tag}' ui archive"
fi
fi
# 复制解压后的内容
cp -a "${tmp_ui_path}/ui/dist/." "${ui_path}"
# 清理临时目录
rm -rf "${tmp_ui_path}"
}

2
poetry.lock generated

@ -10685,4 +10685,4 @@ vllm = ["bitsandbytes", "mistral_common", "timm", "vllm"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "3b91cdb95388d8479acd383dfb3e585de2fce906b70709dff4a6a01e4d2819a7"
content-hash = "4462e07255a2ae6b0f3af20a305c4044dade8edffbdf8e5dd605cea8a03d5d96"

@ -86,6 +86,12 @@ twine = "^5.1.1"
mike = "^2.1.3"
mkdocs-glightbox = "^0.4.0"
[[tool.poetry.source]]
name = "tuna"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "explicit"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Loading…
Cancel
Save