|
|
|
|
@ -1,9 +1,13 @@
|
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
from ..db import get_db
|
|
|
|
|
from ..deps.auth import get_current_user
|
|
|
|
|
from ..log_reader import log_reader
|
|
|
|
|
from ..log_collector import log_collector
|
|
|
|
|
from ..ssh_utils import ssh_manager
|
|
|
|
|
from ..config import HADOOP_NODES
|
|
|
|
|
from ..models.nodes import Node
|
|
|
|
|
from ..models.clusters import Cluster
|
|
|
|
|
from ..schemas import (
|
|
|
|
|
LogRequest,
|
|
|
|
|
LogResponse,
|
|
|
|
|
@ -14,17 +18,30 @@ from ..schemas import (
|
|
|
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
|
|
async def get_node_ip(db: AsyncSession, node_name: str) -> str:
|
|
|
|
|
result = await db.execute(select(Node.ip_address).where(Node.hostname == node_name))
|
|
|
|
|
ip = result.scalar_one_or_none()
|
|
|
|
|
if not ip:
|
|
|
|
|
raise HTTPException(status_code=404, detail=f"Node {node_name} not found")
|
|
|
|
|
return str(ip)
|
|
|
|
|
|
|
|
|
|
@router.get("/hadoop/nodes/")
|
|
|
|
|
async def get_hadoop_nodes(user=Depends(get_current_user)):
|
|
|
|
|
async def get_hadoop_nodes(user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
|
|
"""Get list of all Hadoop nodes"""
|
|
|
|
|
return NodeListResponse(nodes=list(HADOOP_NODES.keys()))
|
|
|
|
|
# Assuming all nodes in DB are relevant, or filter by Cluster type if needed
|
|
|
|
|
stmt = select(Node.hostname).join(Cluster)
|
|
|
|
|
# Optional: .where(Cluster.type.ilike('%hadoop%'))
|
|
|
|
|
result = await db.execute(stmt)
|
|
|
|
|
nodes = result.scalars().all()
|
|
|
|
|
return NodeListResponse(nodes=nodes)
|
|
|
|
|
|
|
|
|
|
@router.get("/hadoop/logs/{node_name}/{log_type}/", response_model=LogResponse)
|
|
|
|
|
async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user)):
|
|
|
|
|
async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
|
|
"""Get log from a specific Hadoop node"""
|
|
|
|
|
ip = await get_node_ip(db, node_name)
|
|
|
|
|
try:
|
|
|
|
|
# Read log content
|
|
|
|
|
log_content = log_reader.read_log(node_name, log_type)
|
|
|
|
|
log_content = log_reader.read_log(node_name, log_type, ip=ip)
|
|
|
|
|
return LogResponse(
|
|
|
|
|
node_name=node_name,
|
|
|
|
|
log_type=log_type,
|
|
|
|
|
@ -34,21 +51,28 @@ async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current
|
|
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
@router.get("/hadoop/logs/all/{log_type}/", response_model=MultiLogResponse)
|
|
|
|
|
async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user)):
|
|
|
|
|
async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
|
|
"""Get logs from all Hadoop nodes"""
|
|
|
|
|
stmt = select(Node.hostname, Node.ip_address).join(Cluster)
|
|
|
|
|
result = await db.execute(stmt)
|
|
|
|
|
nodes_data = result.all()
|
|
|
|
|
|
|
|
|
|
nodes_list = [{"name": n[0], "ip": str(n[1])} for n in nodes_data]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Read logs from all nodes
|
|
|
|
|
logs = log_reader.read_all_nodes_log(log_type)
|
|
|
|
|
logs = log_reader.read_all_nodes_log(nodes_list, log_type)
|
|
|
|
|
return MultiLogResponse(logs=logs)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
@router.get("/hadoop/logs/files/{node_name}/", response_model=LogFilesResponse)
|
|
|
|
|
async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user)):
|
|
|
|
|
async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
|
|
"""Get list of log files on a specific Hadoop node"""
|
|
|
|
|
ip = await get_node_ip(db, node_name)
|
|
|
|
|
try:
|
|
|
|
|
# Get log files list
|
|
|
|
|
log_files = log_reader.get_log_files_list(node_name)
|
|
|
|
|
log_files = log_reader.get_log_files_list(node_name, ip=ip)
|
|
|
|
|
return LogFilesResponse(
|
|
|
|
|
node_name=node_name,
|
|
|
|
|
log_files=log_files
|
|
|
|
|
@ -67,10 +91,11 @@ async def get_hadoop_collectors_status(user=Depends(get_current_user)):
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@router.post("/hadoop/collectors/start/{node_name}/{log_type}/")
|
|
|
|
|
async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user)):
|
|
|
|
|
async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
|
|
"""Start log collection for a specific Hadoop node and log type"""
|
|
|
|
|
ip = await get_node_ip(db, node_name)
|
|
|
|
|
try:
|
|
|
|
|
log_collector.start_collection(node_name, log_type, interval)
|
|
|
|
|
log_collector.start_collection(node_name, log_type, ip=ip, interval=interval)
|
|
|
|
|
return {
|
|
|
|
|
"message": f"Started log collection for {node_name}_{log_type}",
|
|
|
|
|
"interval": interval
|
|
|
|
|
@ -81,6 +106,7 @@ async def start_hadoop_collector(node_name: str, log_type: str, interval: int =
|
|
|
|
|
@router.post("/hadoop/collectors/stop/{node_name}/{log_type}/")
|
|
|
|
|
async def stop_hadoop_collector(node_name: str, log_type: str, user=Depends(get_current_user)):
|
|
|
|
|
"""Stop log collection for a specific Hadoop node and log type"""
|
|
|
|
|
# stop doesn't need IP as it just stops the thread by ID
|
|
|
|
|
try:
|
|
|
|
|
log_collector.stop_collection(node_name, log_type)
|
|
|
|
|
return {
|
|
|
|
|
|