pull/48/head
echo 1 month ago
parent 90eaf5395b
commit b49b2f72de

@ -1,4 +1,5 @@
import os import os
import json
from dotenv import load_dotenv from dotenv import load_dotenv
from typing import Dict, Tuple from typing import Dict, Tuple
@ -23,37 +24,12 @@ DATABASE_URL = _db_url
JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret") JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret")
JWT_EXPIRE_MINUTES = int(os.getenv("JWT_EXPIRE_MINUTES", "60")) JWT_EXPIRE_MINUTES = int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
# Hadoop Cluster Configuration
HADOOP_HOME = os.getenv("HADOOP_HOME", "/opt/module/hadoop-3.1.3")
LOG_DIR = os.getenv("LOG_DIR", "/opt/module/hadoop-3.1.3/logs")
# SSH Configuration # SSH Configuration
SSH_PORT = int(os.getenv("SSH_PORT", "22")) SSH_PORT = int(os.getenv("SSH_PORT", "22"))
SSH_TIMEOUT = int(os.getenv("SSH_TIMEOUT", "10")) SSH_TIMEOUT = int(os.getenv("SSH_TIMEOUT", "10"))
# Hadoop Nodes Configuration
# Parse hadoop nodes from environment variables at module level
HADOOP_NODES = {}
for key, value in os.environ.items():
if key.startswith("NODE_"):
node_name = key.replace("NODE_", "").lower()
if "," in value:
ip, username, password = value.split(",")
HADOOP_NODES[node_name] = (ip, username, password)
# Static node configuration as fallback
if not HADOOP_NODES:
HADOOP_NODES = {
"hadoop102": ("192.168.10.102", "hadoop", "limouren..."),
"hadoop103": ("192.168.10.103", "hadoop", "limouren..."),
"hadoop104": ("192.168.10.104", "hadoop", "limouren..."),
"hadoop105": ("192.168.10.105", "hadoop", "limouren..."),
"hadoop100": ("192.168.10.100", "hadoop", "limouren...")
}
# Aliases for backward compatibility with backend_2 code
hadoop_home = HADOOP_HOME
log_dir = LOG_DIR
ssh_port = SSH_PORT ssh_port = SSH_PORT
ssh_timeout = SSH_TIMEOUT ssh_timeout = SSH_TIMEOUT
hadoop_nodes = HADOOP_NODES
LOG_DIR = os.getenv("HADOOP_LOG_DIR", "/usr/local/hadoop/logs")

@ -122,7 +122,7 @@ class LogCollector:
except Exception as e: except Exception as e:
print(f"Error saving log to database: {e}") print(f"Error saving log to database: {e}")
def _collect_logs(self, node_name: str, log_type: str): def _collect_logs(self, node_name: str, log_type: str, ip: str):
"""Internal method to collect logs continuously""" """Internal method to collect logs continuously"""
print(f"Starting log collection for {node_name}_{log_type}") print(f"Starting log collection for {node_name}_{log_type}")
@ -136,13 +136,13 @@ class LogCollector:
time.sleep(self.collection_interval) time.sleep(self.collection_interval)
# Check if log file still exists # Check if log file still exists
if not log_reader.check_log_file_exists(node_name, log_type): if not log_reader.check_log_file_exists(node_name, log_type, ip=ip):
print(f"Log file {node_name}_{log_type} no longer exists, stopping collection") print(f"Log file {node_name}_{log_type} no longer exists, stopping collection")
self.stop_collection(node_name, log_type) self.stop_collection(node_name, log_type)
break break
# Read current log content # Read current log content
current_log_content = log_reader.read_log(node_name, log_type) current_log_content = log_reader.read_log(node_name, log_type, ip=ip)
current_file_size = len(current_log_content) current_file_size = len(current_log_content)
# Check if log file has new content # Check if log file has new content

@ -1,5 +1,5 @@
from typing import List, Dict, Optional from typing import List, Dict, Optional
from .config import LOG_DIR, HADOOP_NODES from .config import LOG_DIR
from .ssh_utils import ssh_manager from .ssh_utils import ssh_manager
class LogReader: class LogReader:
@ -24,24 +24,30 @@ class LogReader:
# Generate full log file path # Generate full log file path
return f"{self.log_dir}/{base_name}-{node_name.replace('_', '')}.log" return f"{self.log_dir}/{base_name}-{node_name.replace('_', '')}.log"
def read_log(self, node_name: str, log_type: str) -> str: def read_log(self, node_name: str, log_type: str, ip: str) -> str:
"""Read log from a specific node""" """Read log from a specific node"""
# Get log file path # Get log file path
log_file_path = self.get_log_file_path(node_name, log_type) log_file_path = self.get_log_file_path(node_name, log_type)
# Get SSH connection # Get SSH connection
ssh_client = ssh_manager.get_connection(node_name) ssh_client = ssh_manager.get_connection(node_name, ip=ip)
# Read log file content # Read log file content
return ssh_client.read_file(log_file_path) return ssh_client.read_file(log_file_path)
def read_all_nodes_log(self, log_type: str) -> Dict[str, str]: def read_all_nodes_log(self, nodes: List[Dict[str, str]], log_type: str) -> Dict[str, str]:
"""Read log from all nodes""" """Read log from all nodes"""
logs = {} logs = {}
for node_name in HADOOP_NODES: for node in nodes:
node_name = node['name']
ip = node.get('ip')
if not ip:
logs[node_name] = "Error: IP address not found"
continue
try: try:
logs[node_name] = self.read_log(node_name, log_type) logs[node_name] = self.read_log(node_name, log_type, ip)
except Exception as e: except Exception as e:
logs[node_name] = f"Error reading log: {str(e)}" logs[node_name] = f"Error reading log: {str(e)}"

@ -1,9 +1,13 @@
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..db import get_db
from ..deps.auth import get_current_user from ..deps.auth import get_current_user
from ..log_reader import log_reader from ..log_reader import log_reader
from ..log_collector import log_collector from ..log_collector import log_collector
from ..ssh_utils import ssh_manager from ..ssh_utils import ssh_manager
from ..config import HADOOP_NODES from ..models.nodes import Node
from ..models.clusters import Cluster
from ..schemas import ( from ..schemas import (
LogRequest, LogRequest,
LogResponse, LogResponse,
@ -14,17 +18,30 @@ from ..schemas import (
router = APIRouter() router = APIRouter()
async def get_node_ip(db: AsyncSession, node_name: str) -> str:
result = await db.execute(select(Node.ip_address).where(Node.hostname == node_name))
ip = result.scalar_one_or_none()
if not ip:
raise HTTPException(status_code=404, detail=f"Node {node_name} not found")
return str(ip)
@router.get("/hadoop/nodes/") @router.get("/hadoop/nodes/")
async def get_hadoop_nodes(user=Depends(get_current_user)): async def get_hadoop_nodes(user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get list of all Hadoop nodes""" """Get list of all Hadoop nodes"""
return NodeListResponse(nodes=list(HADOOP_NODES.keys())) # Assuming all nodes in DB are relevant, or filter by Cluster type if needed
stmt = select(Node.hostname).join(Cluster)
# Optional: .where(Cluster.type.ilike('%hadoop%'))
result = await db.execute(stmt)
nodes = result.scalars().all()
return NodeListResponse(nodes=nodes)
@router.get("/hadoop/logs/{node_name}/{log_type}/", response_model=LogResponse) @router.get("/hadoop/logs/{node_name}/{log_type}/", response_model=LogResponse)
async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user)): async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get log from a specific Hadoop node""" """Get log from a specific Hadoop node"""
ip = await get_node_ip(db, node_name)
try: try:
# Read log content # Read log content
log_content = log_reader.read_log(node_name, log_type) log_content = log_reader.read_log(node_name, log_type, ip=ip)
return LogResponse( return LogResponse(
node_name=node_name, node_name=node_name,
log_type=log_type, log_type=log_type,
@ -34,21 +51,28 @@ async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/hadoop/logs/all/{log_type}/", response_model=MultiLogResponse) @router.get("/hadoop/logs/all/{log_type}/", response_model=MultiLogResponse)
async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user)): async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get logs from all Hadoop nodes""" """Get logs from all Hadoop nodes"""
stmt = select(Node.hostname, Node.ip_address).join(Cluster)
result = await db.execute(stmt)
nodes_data = result.all()
nodes_list = [{"name": n[0], "ip": str(n[1])} for n in nodes_data]
try: try:
# Read logs from all nodes # Read logs from all nodes
logs = log_reader.read_all_nodes_log(log_type) logs = log_reader.read_all_nodes_log(nodes_list, log_type)
return MultiLogResponse(logs=logs) return MultiLogResponse(logs=logs)
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/hadoop/logs/files/{node_name}/", response_model=LogFilesResponse) @router.get("/hadoop/logs/files/{node_name}/", response_model=LogFilesResponse)
async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user)): async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get list of log files on a specific Hadoop node""" """Get list of log files on a specific Hadoop node"""
ip = await get_node_ip(db, node_name)
try: try:
# Get log files list # Get log files list
log_files = log_reader.get_log_files_list(node_name) log_files = log_reader.get_log_files_list(node_name, ip=ip)
return LogFilesResponse( return LogFilesResponse(
node_name=node_name, node_name=node_name,
log_files=log_files log_files=log_files
@ -67,10 +91,11 @@ async def get_hadoop_collectors_status(user=Depends(get_current_user)):
} }
@router.post("/hadoop/collectors/start/{node_name}/{log_type}/") @router.post("/hadoop/collectors/start/{node_name}/{log_type}/")
async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user)): async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Start log collection for a specific Hadoop node and log type""" """Start log collection for a specific Hadoop node and log type"""
ip = await get_node_ip(db, node_name)
try: try:
log_collector.start_collection(node_name, log_type, interval) log_collector.start_collection(node_name, log_type, ip=ip, interval=interval)
return { return {
"message": f"Started log collection for {node_name}_{log_type}", "message": f"Started log collection for {node_name}_{log_type}",
"interval": interval "interval": interval
@ -81,6 +106,7 @@ async def start_hadoop_collector(node_name: str, log_type: str, interval: int =
@router.post("/hadoop/collectors/stop/{node_name}/{log_type}/") @router.post("/hadoop/collectors/stop/{node_name}/{log_type}/")
async def stop_hadoop_collector(node_name: str, log_type: str, user=Depends(get_current_user)): async def stop_hadoop_collector(node_name: str, log_type: str, user=Depends(get_current_user)):
"""Stop log collection for a specific Hadoop node and log type""" """Stop log collection for a specific Hadoop node and log type"""
# stop doesn't need IP as it just stops the thread by ID
try: try:
log_collector.stop_collection(node_name, log_type) log_collector.stop_collection(node_name, log_type)
return { return {

@ -90,16 +90,18 @@ class SSHConnectionManager:
def __init__(self): def __init__(self):
self.connections = {} self.connections = {}
def get_connection(self, node_name: str) -> SSHClient: def get_connection(self, node_name: str, ip: str = None, username: str = None, password: str = None) -> SSHClient:
"""Get or create SSH connection for a node""" """Get or create SSH connection for a node"""
if node_name not in self.connections: if node_name not in self.connections:
# Get node configuration from static config instead of settings if not ip:
if node_name not in STATIC_NODE_CONFIG: raise ValueError(f"IP address required for new connection to {node_name}")
raise ValueError(f"Node {node_name} not configured")
_user = username or SSH_USER
_pass = password or SSH_PASSWORD
client = SSHClient(ip, _user, _pass)
self.connections[node_name] = client
ip, username, password = STATIC_NODE_CONFIG[node_name]
self.connections[node_name] = SSHClient(ip, username, password)
return self.connections[node_name] return self.connections[node_name]
def close_all(self) -> None: def close_all(self) -> None:

Loading…
Cancel
Save