diff --git a/backend/app/config.py b/backend/app/config.py index 92b0105..55e4711 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,4 +1,5 @@ import os +import json from dotenv import load_dotenv from typing import Dict, Tuple @@ -23,37 +24,12 @@ DATABASE_URL = _db_url JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret") JWT_EXPIRE_MINUTES = int(os.getenv("JWT_EXPIRE_MINUTES", "60")) -# Hadoop Cluster Configuration -HADOOP_HOME = os.getenv("HADOOP_HOME", "/opt/module/hadoop-3.1.3") -LOG_DIR = os.getenv("LOG_DIR", "/opt/module/hadoop-3.1.3/logs") # SSH Configuration SSH_PORT = int(os.getenv("SSH_PORT", "22")) SSH_TIMEOUT = int(os.getenv("SSH_TIMEOUT", "10")) -# Hadoop Nodes Configuration -# Parse hadoop nodes from environment variables at module level -HADOOP_NODES = {} -for key, value in os.environ.items(): - if key.startswith("NODE_"): - node_name = key.replace("NODE_", "").lower() - if "," in value: - ip, username, password = value.split(",") - HADOOP_NODES[node_name] = (ip, username, password) - -# Static node configuration as fallback -if not HADOOP_NODES: - HADOOP_NODES = { - "hadoop102": ("192.168.10.102", "hadoop", "limouren..."), - "hadoop103": ("192.168.10.103", "hadoop", "limouren..."), - "hadoop104": ("192.168.10.104", "hadoop", "limouren..."), - "hadoop105": ("192.168.10.105", "hadoop", "limouren..."), - "hadoop100": ("192.168.10.100", "hadoop", "limouren...") - } - -# Aliases for backward compatibility with backend_2 code -hadoop_home = HADOOP_HOME -log_dir = LOG_DIR ssh_port = SSH_PORT ssh_timeout = SSH_TIMEOUT -hadoop_nodes = HADOOP_NODES + +LOG_DIR = os.getenv("HADOOP_LOG_DIR", "/usr/local/hadoop/logs") diff --git a/backend/app/log_collector.py b/backend/app/log_collector.py index 017a038..4def679 100644 --- a/backend/app/log_collector.py +++ b/backend/app/log_collector.py @@ -122,7 +122,7 @@ class LogCollector: except Exception as e: print(f"Error saving log to database: {e}") - def _collect_logs(self, node_name: str, log_type: str): + def _collect_logs(self, node_name: str, log_type: str, ip: str): """Internal method to collect logs continuously""" print(f"Starting log collection for {node_name}_{log_type}") @@ -136,13 +136,13 @@ class LogCollector: time.sleep(self.collection_interval) # Check if log file still exists - if not log_reader.check_log_file_exists(node_name, log_type): + if not log_reader.check_log_file_exists(node_name, log_type, ip=ip): print(f"Log file {node_name}_{log_type} no longer exists, stopping collection") self.stop_collection(node_name, log_type) break # Read current log content - current_log_content = log_reader.read_log(node_name, log_type) + current_log_content = log_reader.read_log(node_name, log_type, ip=ip) current_file_size = len(current_log_content) # Check if log file has new content diff --git a/backend/app/log_reader.py b/backend/app/log_reader.py index 06f90dc..919a01d 100644 --- a/backend/app/log_reader.py +++ b/backend/app/log_reader.py @@ -1,5 +1,5 @@ from typing import List, Dict, Optional -from .config import LOG_DIR, HADOOP_NODES +from .config import LOG_DIR from .ssh_utils import ssh_manager class LogReader: @@ -24,24 +24,30 @@ class LogReader: # Generate full log file path return f"{self.log_dir}/{base_name}-{node_name.replace('_', '')}.log" - def read_log(self, node_name: str, log_type: str) -> str: + def read_log(self, node_name: str, log_type: str, ip: str) -> str: """Read log from a specific node""" # Get log file path log_file_path = self.get_log_file_path(node_name, log_type) # Get SSH connection - ssh_client = ssh_manager.get_connection(node_name) + ssh_client = ssh_manager.get_connection(node_name, ip=ip) # Read log file content return ssh_client.read_file(log_file_path) - def read_all_nodes_log(self, log_type: str) -> Dict[str, str]: + def read_all_nodes_log(self, nodes: List[Dict[str, str]], log_type: str) -> Dict[str, str]: """Read log from all nodes""" logs = {} - for node_name in HADOOP_NODES: + for node in nodes: + node_name = node['name'] + ip = node.get('ip') + if not ip: + logs[node_name] = "Error: IP address not found" + continue + try: - logs[node_name] = self.read_log(node_name, log_type) + logs[node_name] = self.read_log(node_name, log_type, ip) except Exception as e: logs[node_name] = f"Error reading log: {str(e)}" diff --git a/backend/app/routers/hadoop_logs.py b/backend/app/routers/hadoop_logs.py index b08d400..146b028 100644 --- a/backend/app/routers/hadoop_logs.py +++ b/backend/app/routers/hadoop_logs.py @@ -1,9 +1,13 @@ from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from ..db import get_db from ..deps.auth import get_current_user from ..log_reader import log_reader from ..log_collector import log_collector from ..ssh_utils import ssh_manager -from ..config import HADOOP_NODES +from ..models.nodes import Node +from ..models.clusters import Cluster from ..schemas import ( LogRequest, LogResponse, @@ -14,17 +18,30 @@ from ..schemas import ( router = APIRouter() +async def get_node_ip(db: AsyncSession, node_name: str) -> str: + result = await db.execute(select(Node.ip_address).where(Node.hostname == node_name)) + ip = result.scalar_one_or_none() + if not ip: + raise HTTPException(status_code=404, detail=f"Node {node_name} not found") + return str(ip) + @router.get("/hadoop/nodes/") -async def get_hadoop_nodes(user=Depends(get_current_user)): +async def get_hadoop_nodes(user=Depends(get_current_user), db: AsyncSession = Depends(get_db)): """Get list of all Hadoop nodes""" - return NodeListResponse(nodes=list(HADOOP_NODES.keys())) + # Assuming all nodes in DB are relevant, or filter by Cluster type if needed + stmt = select(Node.hostname).join(Cluster) + # Optional: .where(Cluster.type.ilike('%hadoop%')) + result = await db.execute(stmt) + nodes = result.scalars().all() + return NodeListResponse(nodes=nodes) @router.get("/hadoop/logs/{node_name}/{log_type}/", response_model=LogResponse) -async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user)): +async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)): """Get log from a specific Hadoop node""" + ip = await get_node_ip(db, node_name) try: # Read log content - log_content = log_reader.read_log(node_name, log_type) + log_content = log_reader.read_log(node_name, log_type, ip=ip) return LogResponse( node_name=node_name, log_type=log_type, @@ -34,21 +51,28 @@ async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current raise HTTPException(status_code=500, detail=str(e)) @router.get("/hadoop/logs/all/{log_type}/", response_model=MultiLogResponse) -async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user)): +async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)): """Get logs from all Hadoop nodes""" + stmt = select(Node.hostname, Node.ip_address).join(Cluster) + result = await db.execute(stmt) + nodes_data = result.all() + + nodes_list = [{"name": n[0], "ip": str(n[1])} for n in nodes_data] + try: # Read logs from all nodes - logs = log_reader.read_all_nodes_log(log_type) + logs = log_reader.read_all_nodes_log(nodes_list, log_type) return MultiLogResponse(logs=logs) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/hadoop/logs/files/{node_name}/", response_model=LogFilesResponse) -async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user)): +async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)): """Get list of log files on a specific Hadoop node""" + ip = await get_node_ip(db, node_name) try: # Get log files list - log_files = log_reader.get_log_files_list(node_name) + log_files = log_reader.get_log_files_list(node_name, ip=ip) return LogFilesResponse( node_name=node_name, log_files=log_files @@ -67,10 +91,11 @@ async def get_hadoop_collectors_status(user=Depends(get_current_user)): } @router.post("/hadoop/collectors/start/{node_name}/{log_type}/") -async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user)): +async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)): """Start log collection for a specific Hadoop node and log type""" + ip = await get_node_ip(db, node_name) try: - log_collector.start_collection(node_name, log_type, interval) + log_collector.start_collection(node_name, log_type, ip=ip, interval=interval) return { "message": f"Started log collection for {node_name}_{log_type}", "interval": interval @@ -81,6 +106,7 @@ async def start_hadoop_collector(node_name: str, log_type: str, interval: int = @router.post("/hadoop/collectors/stop/{node_name}/{log_type}/") async def stop_hadoop_collector(node_name: str, log_type: str, user=Depends(get_current_user)): """Stop log collection for a specific Hadoop node and log type""" + # stop doesn't need IP as it just stops the thread by ID try: log_collector.stop_collection(node_name, log_type) return { diff --git a/backend/app/ssh_utils.py b/backend/app/ssh_utils.py index 37f09bd..8ae5592 100644 --- a/backend/app/ssh_utils.py +++ b/backend/app/ssh_utils.py @@ -90,16 +90,18 @@ class SSHConnectionManager: def __init__(self): self.connections = {} - def get_connection(self, node_name: str) -> SSHClient: + def get_connection(self, node_name: str, ip: str = None, username: str = None, password: str = None) -> SSHClient: """Get or create SSH connection for a node""" if node_name not in self.connections: - # Get node configuration from static config instead of settings - if node_name not in STATIC_NODE_CONFIG: - raise ValueError(f"Node {node_name} not configured") + if not ip: + raise ValueError(f"IP address required for new connection to {node_name}") + + _user = username or SSH_USER + _pass = password or SSH_PASSWORD + + client = SSHClient(ip, _user, _pass) + self.connections[node_name] = client - ip, username, password = STATIC_NODE_CONFIG[node_name] - self.connections[node_name] = SSHClient(ip, username, password) - return self.connections[node_name] def close_all(self) -> None: