You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
8.4 KiB
203 lines
8.4 KiB
from typing import List, Dict, Optional
|
|
from .config import LOG_DIR
|
|
from .ssh_utils import ssh_manager
|
|
|
|
class LogReader:
|
|
"""Log Reader for Hadoop cluster nodes"""
|
|
|
|
def __init__(self):
|
|
self.log_dir = LOG_DIR
|
|
self._node_log_dir: Dict[str, str] = {}
|
|
self._candidates = [
|
|
"/usr/local/hadoop/logs",
|
|
"/opt/hadoop/logs",
|
|
"/usr/local/hadoop-3.3.6/logs",
|
|
"/usr/local/hadoop-3.3.5/logs",
|
|
"/usr/local/hadoop-3.1.3/logs",
|
|
"/opt/module/hadoop-3.1.3/logs",
|
|
"/var/log/hadoop",
|
|
]
|
|
|
|
def get_log_file_path(self, node_name: str, log_type: str) -> str:
|
|
"""Generate log file path based on node name and log type"""
|
|
# Map log type to actual log file name
|
|
log_file_map = {
|
|
"namenode": "hadoop-hadoop-namenode",
|
|
"datanode": "hadoop-hadoop-datanode",
|
|
"resourcemanager": "hadoop-hadoop-resourcemanager",
|
|
"nodemanager": "hadoop-hadoop-nodemanager",
|
|
"historyserver": "hadoop-hadoop-historyserver"
|
|
}
|
|
|
|
# Get the base log file name
|
|
base_name = log_file_map.get(log_type.lower(), log_type.lower())
|
|
# Generate full log file path
|
|
return f"{self.log_dir}/{base_name}-{node_name.replace('_', '')}.log"
|
|
|
|
def read_log(self, node_name: str, log_type: str, ip: str) -> str:
|
|
"""Read log from a specific node"""
|
|
# Ensure working log dir
|
|
self.find_working_log_dir(node_name, ip)
|
|
paths = self.get_log_file_paths(node_name, log_type)
|
|
|
|
# Get SSH connection
|
|
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
|
|
|
|
# Read log file content
|
|
# try direct candidates
|
|
for p in paths:
|
|
out, err = ssh_client.execute_command(f"ls -la {p} 2>/dev/null")
|
|
if not err and out.strip():
|
|
out, err = ssh_client.execute_command(f"cat {p} 2>/dev/null")
|
|
if not err:
|
|
return out
|
|
# resolve by directory listing
|
|
base_dir = self._node_log_dir.get(node_name, self.log_dir)
|
|
out, err = ssh_client.execute_command(f"ls -la {base_dir} 2>/dev/null")
|
|
if not err and out.strip():
|
|
for line in out.splitlines():
|
|
parts = line.split()
|
|
if parts:
|
|
fn = parts[-1]
|
|
lf = fn.lower()
|
|
if log_type in lf and node_name in lf and (lf.endswith(".log") or lf.endswith(".out") or lf.endswith(".out.1")):
|
|
out2, err2 = ssh_client.execute_command(f"cat {base_dir}/{fn} 2>/dev/null")
|
|
if not err2:
|
|
return out2
|
|
raise FileNotFoundError("No such file")
|
|
|
|
def read_all_nodes_log(self, nodes: List[Dict[str, str]], log_type: str) -> Dict[str, str]:
|
|
"""Read log from all nodes"""
|
|
logs = {}
|
|
|
|
for node in nodes:
|
|
node_name = node['name']
|
|
ip = node.get('ip')
|
|
if not ip:
|
|
logs[node_name] = "Error: IP address not found"
|
|
continue
|
|
|
|
try:
|
|
logs[node_name] = self.read_log(node_name, log_type, ip)
|
|
except Exception as e:
|
|
logs[node_name] = f"Error reading log: {str(e)}"
|
|
|
|
return logs
|
|
|
|
def filter_log_by_date(self, log_content: str, start_date: str, end_date: str) -> str:
|
|
"""Filter log content by date range"""
|
|
filtered_lines = []
|
|
for line in log_content.splitlines():
|
|
# Check if line contains date in the format [YYYY-MM-DD HH:MM:SS,mmm]
|
|
if line.startswith('['):
|
|
# Extract date part
|
|
date_str = line[1:11] # Get YYYY-MM-DD part
|
|
if start_date <= date_str <= end_date:
|
|
filtered_lines.append(line)
|
|
return '\n'.join(filtered_lines)
|
|
|
|
def get_log_files_list(self, node_name: str, ip: Optional[str] = None) -> List[str]:
|
|
"""Get list of log files on a specific node"""
|
|
# Ensure working log dir
|
|
if ip:
|
|
self.find_working_log_dir(node_name, ip)
|
|
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
|
|
|
|
# Execute command to list log files from available directories
|
|
dirs = [self._node_log_dir.get(node_name, self.log_dir)] + self._candidates
|
|
stdout = ""
|
|
for d in dirs:
|
|
out, err = ssh_client.execute_command(f"ls -1 {d} 2>/dev/null")
|
|
if not err and out.strip():
|
|
stdout = out
|
|
self._node_log_dir[node_name] = d
|
|
break
|
|
stderr = ""
|
|
|
|
# Parse log files from output
|
|
log_files = []
|
|
if not stderr and stdout.strip():
|
|
for line in stdout.splitlines():
|
|
name = line.strip()
|
|
if name.endswith(".log") or name.endswith(".out") or name.endswith(".out.1"):
|
|
log_files.append(name)
|
|
|
|
return log_files
|
|
|
|
def check_log_file_exists(self, node_name: str, log_type: str, ip: Optional[str] = None) -> bool:
|
|
"""Check if log file exists on a specific node"""
|
|
# Ensure working log dir
|
|
if ip:
|
|
self.find_working_log_dir(node_name, ip)
|
|
paths = self.get_log_file_paths(node_name, log_type)
|
|
|
|
# Get SSH connection
|
|
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
|
|
|
|
try:
|
|
# Execute command to check if file exists
|
|
for p in paths:
|
|
stdout, stderr = ssh_client.execute_command(f"ls -la {p} 2>/dev/null")
|
|
if not stderr and stdout.strip():
|
|
return True
|
|
base_dir = self._node_log_dir.get(node_name, self.log_dir)
|
|
stdout, stderr = ssh_client.execute_command(f"ls -la {base_dir} 2>/dev/null")
|
|
if not stderr and stdout.strip():
|
|
for line in stdout.splitlines():
|
|
parts = line.split()
|
|
if parts:
|
|
fn = parts[-1].lower()
|
|
if log_type in fn and node_name in fn and (fn.endswith(".log") or fn.endswith(".out") or fn.endswith(".out.1")):
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error checking log file existence: {e}")
|
|
return False
|
|
|
|
def get_node_services(self, node_name: str) -> List[str]:
|
|
"""Get list of running services on a node based on log files"""
|
|
# Get all log files
|
|
log_files = self.get_log_files_list(node_name)
|
|
|
|
# Extract service types from log file names
|
|
services = []
|
|
for log_file in log_files:
|
|
if "namenode" in log_file:
|
|
services.append("namenode")
|
|
elif "datanode" in log_file:
|
|
services.append("datanode")
|
|
elif "resourcemanager" in log_file:
|
|
services.append("resourcemanager")
|
|
elif "nodemanager" in log_file:
|
|
services.append("nodemanager")
|
|
elif "secondarynamenode" in log_file:
|
|
services.append("secondarynamenode")
|
|
|
|
# Remove duplicates
|
|
return list(set(services))
|
|
|
|
def find_working_log_dir(self, node_name: str, ip: str) -> str:
|
|
"""Detect a working log directory on remote node and set it"""
|
|
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
|
|
# try current
|
|
current = self._node_log_dir.get(node_name, self.log_dir)
|
|
stdout, stderr = ssh_client.execute_command(f"ls -la {current}")
|
|
if not stderr and stdout.strip():
|
|
self._node_log_dir[node_name] = current
|
|
return current
|
|
for d in [current] + self._candidates:
|
|
stdout, stderr = ssh_client.execute_command(f"ls -la {d} 2>/dev/null")
|
|
if not stderr and stdout.strip():
|
|
self._node_log_dir[node_name] = d
|
|
return d
|
|
self._node_log_dir[node_name] = self.log_dir
|
|
return self._node_log_dir[node_name]
|
|
|
|
def get_log_file_paths(self, node_name: str, log_type: str) -> List[str]:
|
|
base_dir = self._node_log_dir.get(node_name, self.log_dir)
|
|
base = f"{base_dir}/hadoop-hadoop-{log_type}-{node_name}"
|
|
return [f"{base}.log", f"{base}.out", f"{base}.out.1"]
|
|
|
|
# Create a global LogReader instance
|
|
log_reader = LogReader()
|