pull/48/head
echo 1 month ago
parent 90eaf5395b
commit b49b2f72de

@ -1,4 +1,5 @@
import os
import json
from dotenv import load_dotenv
from typing import Dict, Tuple
@ -23,37 +24,12 @@ DATABASE_URL = _db_url
JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret")
JWT_EXPIRE_MINUTES = int(os.getenv("JWT_EXPIRE_MINUTES", "60"))
# Hadoop Cluster Configuration
HADOOP_HOME = os.getenv("HADOOP_HOME", "/opt/module/hadoop-3.1.3")
LOG_DIR = os.getenv("LOG_DIR", "/opt/module/hadoop-3.1.3/logs")
# SSH Configuration
SSH_PORT = int(os.getenv("SSH_PORT", "22"))
SSH_TIMEOUT = int(os.getenv("SSH_TIMEOUT", "10"))
# Hadoop Nodes Configuration
# Parse hadoop nodes from environment variables at module level
HADOOP_NODES = {}
for key, value in os.environ.items():
if key.startswith("NODE_"):
node_name = key.replace("NODE_", "").lower()
if "," in value:
ip, username, password = value.split(",")
HADOOP_NODES[node_name] = (ip, username, password)
# Static node configuration as fallback
if not HADOOP_NODES:
HADOOP_NODES = {
"hadoop102": ("192.168.10.102", "hadoop", "limouren..."),
"hadoop103": ("192.168.10.103", "hadoop", "limouren..."),
"hadoop104": ("192.168.10.104", "hadoop", "limouren..."),
"hadoop105": ("192.168.10.105", "hadoop", "limouren..."),
"hadoop100": ("192.168.10.100", "hadoop", "limouren...")
}
# Aliases for backward compatibility with backend_2 code
hadoop_home = HADOOP_HOME
log_dir = LOG_DIR
ssh_port = SSH_PORT
ssh_timeout = SSH_TIMEOUT
hadoop_nodes = HADOOP_NODES
LOG_DIR = os.getenv("HADOOP_LOG_DIR", "/usr/local/hadoop/logs")

@ -122,7 +122,7 @@ class LogCollector:
except Exception as e:
print(f"Error saving log to database: {e}")
def _collect_logs(self, node_name: str, log_type: str):
def _collect_logs(self, node_name: str, log_type: str, ip: str):
"""Internal method to collect logs continuously"""
print(f"Starting log collection for {node_name}_{log_type}")
@ -136,13 +136,13 @@ class LogCollector:
time.sleep(self.collection_interval)
# Check if log file still exists
if not log_reader.check_log_file_exists(node_name, log_type):
if not log_reader.check_log_file_exists(node_name, log_type, ip=ip):
print(f"Log file {node_name}_{log_type} no longer exists, stopping collection")
self.stop_collection(node_name, log_type)
break
# Read current log content
current_log_content = log_reader.read_log(node_name, log_type)
current_log_content = log_reader.read_log(node_name, log_type, ip=ip)
current_file_size = len(current_log_content)
# Check if log file has new content

@ -1,5 +1,5 @@
from typing import List, Dict, Optional
from .config import LOG_DIR, HADOOP_NODES
from .config import LOG_DIR
from .ssh_utils import ssh_manager
class LogReader:
@ -24,24 +24,30 @@ class LogReader:
# Generate full log file path
return f"{self.log_dir}/{base_name}-{node_name.replace('_', '')}.log"
def read_log(self, node_name: str, log_type: str) -> str:
def read_log(self, node_name: str, log_type: str, ip: str) -> str:
"""Read log from a specific node"""
# Get log file path
log_file_path = self.get_log_file_path(node_name, log_type)
# Get SSH connection
ssh_client = ssh_manager.get_connection(node_name)
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
# Read log file content
return ssh_client.read_file(log_file_path)
def read_all_nodes_log(self, log_type: str) -> Dict[str, str]:
def read_all_nodes_log(self, nodes: List[Dict[str, str]], log_type: str) -> Dict[str, str]:
"""Read log from all nodes"""
logs = {}
for node_name in HADOOP_NODES:
for node in nodes:
node_name = node['name']
ip = node.get('ip')
if not ip:
logs[node_name] = "Error: IP address not found"
continue
try:
logs[node_name] = self.read_log(node_name, log_type)
logs[node_name] = self.read_log(node_name, log_type, ip)
except Exception as e:
logs[node_name] = f"Error reading log: {str(e)}"

@ -1,9 +1,13 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..db import get_db
from ..deps.auth import get_current_user
from ..log_reader import log_reader
from ..log_collector import log_collector
from ..ssh_utils import ssh_manager
from ..config import HADOOP_NODES
from ..models.nodes import Node
from ..models.clusters import Cluster
from ..schemas import (
LogRequest,
LogResponse,
@ -14,17 +18,30 @@ from ..schemas import (
router = APIRouter()
async def get_node_ip(db: AsyncSession, node_name: str) -> str:
result = await db.execute(select(Node.ip_address).where(Node.hostname == node_name))
ip = result.scalar_one_or_none()
if not ip:
raise HTTPException(status_code=404, detail=f"Node {node_name} not found")
return str(ip)
@router.get("/hadoop/nodes/")
async def get_hadoop_nodes(user=Depends(get_current_user)):
async def get_hadoop_nodes(user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get list of all Hadoop nodes"""
return NodeListResponse(nodes=list(HADOOP_NODES.keys()))
# Assuming all nodes in DB are relevant, or filter by Cluster type if needed
stmt = select(Node.hostname).join(Cluster)
# Optional: .where(Cluster.type.ilike('%hadoop%'))
result = await db.execute(stmt)
nodes = result.scalars().all()
return NodeListResponse(nodes=nodes)
@router.get("/hadoop/logs/{node_name}/{log_type}/", response_model=LogResponse)
async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user)):
async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get log from a specific Hadoop node"""
ip = await get_node_ip(db, node_name)
try:
# Read log content
log_content = log_reader.read_log(node_name, log_type)
log_content = log_reader.read_log(node_name, log_type, ip=ip)
return LogResponse(
node_name=node_name,
log_type=log_type,
@ -34,21 +51,28 @@ async def get_hadoop_log(node_name: str, log_type: str, user=Depends(get_current
raise HTTPException(status_code=500, detail=str(e))
@router.get("/hadoop/logs/all/{log_type}/", response_model=MultiLogResponse)
async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user)):
async def get_all_hadoop_nodes_log(log_type: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get logs from all Hadoop nodes"""
stmt = select(Node.hostname, Node.ip_address).join(Cluster)
result = await db.execute(stmt)
nodes_data = result.all()
nodes_list = [{"name": n[0], "ip": str(n[1])} for n in nodes_data]
try:
# Read logs from all nodes
logs = log_reader.read_all_nodes_log(log_type)
logs = log_reader.read_all_nodes_log(nodes_list, log_type)
return MultiLogResponse(logs=logs)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/hadoop/logs/files/{node_name}/", response_model=LogFilesResponse)
async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user)):
async def get_hadoop_log_files(node_name: str, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Get list of log files on a specific Hadoop node"""
ip = await get_node_ip(db, node_name)
try:
# Get log files list
log_files = log_reader.get_log_files_list(node_name)
log_files = log_reader.get_log_files_list(node_name, ip=ip)
return LogFilesResponse(
node_name=node_name,
log_files=log_files
@ -67,10 +91,11 @@ async def get_hadoop_collectors_status(user=Depends(get_current_user)):
}
@router.post("/hadoop/collectors/start/{node_name}/{log_type}/")
async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user)):
async def start_hadoop_collector(node_name: str, log_type: str, interval: int = 5, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""Start log collection for a specific Hadoop node and log type"""
ip = await get_node_ip(db, node_name)
try:
log_collector.start_collection(node_name, log_type, interval)
log_collector.start_collection(node_name, log_type, ip=ip, interval=interval)
return {
"message": f"Started log collection for {node_name}_{log_type}",
"interval": interval
@ -81,6 +106,7 @@ async def start_hadoop_collector(node_name: str, log_type: str, interval: int =
@router.post("/hadoop/collectors/stop/{node_name}/{log_type}/")
async def stop_hadoop_collector(node_name: str, log_type: str, user=Depends(get_current_user)):
"""Stop log collection for a specific Hadoop node and log type"""
# stop doesn't need IP as it just stops the thread by ID
try:
log_collector.stop_collection(node_name, log_type)
return {

@ -90,16 +90,18 @@ class SSHConnectionManager:
def __init__(self):
self.connections = {}
def get_connection(self, node_name: str) -> SSHClient:
def get_connection(self, node_name: str, ip: str = None, username: str = None, password: str = None) -> SSHClient:
"""Get or create SSH connection for a node"""
if node_name not in self.connections:
# Get node configuration from static config instead of settings
if node_name not in STATIC_NODE_CONFIG:
raise ValueError(f"Node {node_name} not configured")
if not ip:
raise ValueError(f"IP address required for new connection to {node_name}")
_user = username or SSH_USER
_pass = password or SSH_PASSWORD
client = SSHClient(ip, _user, _pass)
self.connections[node_name] = client
ip, username, password = STATIC_NODE_CONFIG[node_name]
self.connections[node_name] = SSHClient(ip, username, password)
return self.connections[node_name]
def close_all(self) -> None:

Loading…
Cancel
Save