You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ErrorDetecting/backend/app/log_collector.py

337 lines
14 KiB

import threading
import time
import uuid
import datetime
from typing import Dict, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker, AsyncEngine
from .log_reader import log_reader
from .ssh_utils import ssh_manager
from .db import SessionLocal
from .models.hadoop_logs import HadoopLog
from sqlalchemy import text
import asyncio
from .config import BJ_TZ, DATABASE_URL, APP_TIMEZONE
class LogCollector:
"""Real-time log collector for Hadoop cluster"""
def __init__(self):
self.collectors: Dict[str, threading.Thread] = {}
self.is_running: bool = False
self.collection_interval: int = 5 # 默认采集间隔,单位:秒
self._loops: Dict[str, asyncio.AbstractEventLoop] = {}
self._engines: Dict[str, AsyncEngine] = {}
self._session_locals: Dict[str, async_sessionmaker[AsyncSession]] = {}
self._intervals: Dict[str, int] = {}
self._cluster_name_cache: Dict[str, str] = {}
self._targets: Dict[str, str] = {}
self._line_counts: Dict[str, int] = {}
self.max_bytes_per_pull: int = 256 * 1024
def start_collection(self, node_name: str, log_type: str, ip: Optional[str] = None, interval: Optional[int] = None) -> bool:
"""Start real-time log collection for a specific node and log type"""
collector_id = f"{node_name}_{log_type}"
if interval is not None:
self._intervals[collector_id] = max(1, int(interval))
if collector_id in self.collectors and self.collectors[collector_id].is_alive():
print(f"Collector {collector_id} is already running")
return False
# Start even if log file not yet exists; collector will self-check in loop
# Create a new collector thread
collector_thread = threading.Thread(
target=self._collect_logs,
args=(node_name, log_type, ip),
name=collector_id,
daemon=True
)
self.collectors[collector_id] = collector_thread
collector_thread.start()
print(f"Started collector {collector_id}")
return True
def stop_collection(self, node_name: str, log_type: str):
"""Stop log collection for a specific node and log type"""
collector_id = f"{node_name}_{log_type}"
if collector_id in self.collectors:
# Threads are daemon, so they will exit when main process exits
# We just remove it from our tracking
del self.collectors[collector_id]
self._intervals.pop(collector_id, None)
print(f"Stopped collector {collector_id}")
else:
print(f"Collector {collector_id} is not running")
def stop_all_collections(self):
"""Stop all log collections"""
for collector_id in list(self.collectors.keys()):
self.stop_collection(*collector_id.split("_"))
def _parse_log_line(self, line: str, node_name: str, log_type: str):
"""Parse a single log line and return a dictionary of log fields"""
# Extract timestamp from the log line (format: [2023-12-17 10:00:00,123])
timestamp = None
log_level = "INFO" # Default log level
message = line
exception = None
# Simple log parsing logic
if line.startswith('['):
# Extract timestamp
timestamp_end = line.find(']', 1)
if timestamp_end > 0:
timestamp_str = line[1:timestamp_end]
try:
timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=BJ_TZ)
except ValueError:
# If parsing fails, use current time
timestamp = datetime.datetime.now(BJ_TZ)
# Extract log level
log_levels = ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
for level in log_levels:
if f" {level} " in line:
log_level = level
break
return {
"timestamp": timestamp or datetime.datetime.now(BJ_TZ),
"log_level": log_level,
"message": message,
"host": node_name,
"service": log_type,
"raw_log": line
}
async def _save_log_to_db(self, log_data: Dict, collector_id: str | None = None):
"""Save log data to database"""
try:
session_local = self._session_locals.get(collector_id) if collector_id else None
async with (session_local() if session_local else SessionLocal()) as session:
# 获取集群名称
host = log_data["host"]
cluster_name = self._cluster_name_cache.get(host)
if not cluster_name:
cluster_res = await session.execute(text("""
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
WHERE n.hostname = :hn LIMIT 1
"""), {"hn": host})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
self._cluster_name_cache[host] = cluster_name
# Create HadoopLog instance
hadoop_log = HadoopLog(
log_time=log_data["timestamp"],
node_host=log_data["host"],
title=log_data["service"],
info=log_data["message"],
cluster_name=cluster_name
)
# Add to session and commit
session.add(hadoop_log)
await session.commit()
except Exception as e:
print(f"Error saving log to database: {e}")
async def _save_logs_to_db_batch(self, logs: List[Dict], collector_id: str | None = None):
"""Save a batch of logs to database in one transaction"""
try:
session_local = self._session_locals.get(collector_id) if collector_id else None
async with (session_local() if session_local else SessionLocal()) as session:
host = logs[0]["host"] if logs else None
cluster_name = self._cluster_name_cache.get(host) if host else None
if host and not cluster_name:
cluster_res = await session.execute(text("""
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
WHERE n.hostname = :hn LIMIT 1
"""), {"hn": host})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
self._cluster_name_cache[host] = cluster_name
objs: list[HadoopLog] = []
for log_data in logs:
objs.append(HadoopLog(
log_time=log_data["timestamp"],
node_host=log_data["host"],
title=log_data["service"],
info=log_data["message"],
cluster_name=cluster_name or "default_cluster",
))
session.add_all(objs)
await session.commit()
except Exception as e:
print(f"Error batch saving logs: {e}")
def _collect_logs(self, node_name: str, log_type: str, ip: str):
"""Internal method to collect logs continuously"""
print(f"Starting log collection for {node_name}_{log_type}")
collector_id = f"{node_name}_{log_type}"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loops[collector_id] = loop
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_pre_ping=True,
connect_args={"server_settings": {"timezone": APP_TIMEZONE}},
pool_size=1,
max_overflow=0,
)
self._engines[collector_id] = engine
self._session_locals[collector_id] = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
last_remote_size = 0
retry_count = 0
max_retries = 3
while collector_id in self.collectors:
try:
# Wait for next collection interval
interval = self._intervals.get(collector_id, self.collection_interval)
time.sleep(interval)
# Resolve target file once and reuse
target = self._targets.get(collector_id)
if not target:
try:
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
dirs = [
"/opt/module/hadoop-3.1.3/logs",
"/usr/local/hadoop/logs",
"/usr/local/hadoop-3.3.6/logs",
"/usr/local/hadoop-3.3.5/logs",
"/usr/local/hadoop-3.1.3/logs",
"/opt/hadoop/logs",
"/var/log/hadoop",
]
for d in dirs:
out, err = ssh_client.execute_command(f"ls -1 {d} 2>/dev/null")
if not err and out.strip():
for fn in out.splitlines():
f = fn.lower()
if log_type in f and node_name in f:
target = f"{d}/{fn}"
break
if target:
break
if target:
self._targets[collector_id] = target
except Exception:
target = None
if not target:
print(f"Log file {node_name}_{log_type} not found, will retry")
retry_count += 1
continue
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
size_out, size_err = ssh_client.execute_command(f"stat -c %s {target} 2>/dev/null")
if size_err:
retry_count += 1
continue
try:
remote_size = int((size_out or "").strip())
except Exception:
retry_count += 1
continue
if remote_size < last_remote_size:
last_remote_size = 0
if remote_size > last_remote_size:
delta = remote_size - last_remote_size
if delta > self.max_bytes_per_pull:
start_pos = remote_size - self.max_bytes_per_pull + 1
last_remote_size = remote_size - self.max_bytes_per_pull
else:
start_pos = last_remote_size + 1
out2, err2 = ssh_client.execute_command(f"tail -c +{start_pos} {target} 2>/dev/null")
if err2:
out2, err2 = ssh_client.execute_command(f"dd if={target} bs=1 skip={max(0, start_pos - 1)} 2>/dev/null")
if not err2 and out2 and out2.strip():
self._save_log_chunk(node_name, log_type, out2)
print(f"Collected new logs from {node_name}_{log_type} bytes={len(out2)}")
last_remote_size = remote_size
# Reset retry count on successful collection
retry_count = 0
except Exception as e:
print(f"Error collecting logs from {node_name}_{log_type}: {e}")
retry_count += 1
if retry_count > max_retries:
print(f"Max retries reached for {node_name}_{log_type}, stopping collection")
self.stop_collection(node_name, log_type)
break
print(f"Retrying in {self.collection_interval * 2} seconds... ({retry_count}/{max_retries})")
try:
loop = self._loops.pop(collector_id, None)
engine = self._engines.pop(collector_id, None)
self._session_locals.pop(collector_id, None)
if engine and loop:
loop.run_until_complete(engine.dispose())
if loop and loop.is_running():
loop.stop()
if loop:
loop.close()
except Exception:
pass
def _save_log_chunk(self, node_name: str, log_type: str, content: str):
"""Save a chunk of log content to database"""
# Split content into lines
lines = content.splitlines()
# Parse each line and save to database
log_batch: List[Dict] = []
for line in lines:
if line.strip():
log_data = self._parse_log_line(line, node_name, log_type)
log_batch.append(log_data)
if not log_batch:
return
collector_id = f"{node_name}_{log_type}"
loop = self._loops.get(collector_id)
if loop:
loop.run_until_complete(self._save_logs_to_db_batch(log_batch, collector_id=collector_id))
else:
asyncio.run(self._save_logs_to_db_batch(log_batch))
def get_collectors_status(self) -> Dict[str, bool]:
"""Get the status of all collectors"""
status = {}
for collector_id, thread in self.collectors.items():
status[collector_id] = thread.is_alive()
return status
def set_collection_interval(self, interval: int):
"""Set the collection interval"""
self.collection_interval = max(1, interval) # Ensure interval is at least 1 second
for k in list(self._intervals.keys()):
self._intervals[k] = self.collection_interval
print(f"Set collection interval to {self.collection_interval} seconds")
def set_log_dir(self, log_dir: str):
"""Set the log directory (deprecated, logs are now stored in database)"""
print(f"Warning: set_log_dir is deprecated. Logs are now stored in the database, not in local directory: {log_dir}")
# Create a global log collector instance
log_collector = LogCollector()