You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ErrorDetecting/backend/app/scripts/test_logs_realtime.py

67 lines
2.6 KiB

import os
import asyncio
import time
from sqlalchemy import select, text
from app.db import SessionLocal, engine
from app.models.clusters import Cluster
from app.models.nodes import Node
from app.log_reader import log_reader
from app.log_collector import log_collector
async def run(cluster_uuid: str, interval: int = 3, duration: int = 10):
async with engine.begin() as conn:
res = await conn.execute(text("SELECT id FROM clusters WHERE uuid=:u LIMIT 1"), {"u": cluster_uuid})
row = res.first()
if not row:
print("CLUSTER_NOT_FOUND")
return
cid = row[0]
before = await conn.execute(text("SELECT COUNT(*) FROM hadoop_logs"))
print("HADOOP_LOGS_BEFORE", before.scalar() or 0)
async with SessionLocal() as session:
nodes_res = await session.execute(select(Node.hostname, Node.ip_address).where(Node.cluster_id == cid))
nodes = [(r[0], str(r[1])) for r in nodes_res.all()]
started = []
for hn, ip in nodes:
try:
log_reader.find_working_log_dir(hn, ip)
files = log_reader.get_log_files_list(hn, ip=ip)
except Exception:
files = []
services = set()
for f in files:
lf = f.lower()
if "namenode" in lf:
services.add("namenode")
elif "secondarynamenode" in lf:
services.add("secondarynamenode")
elif "datanode" in lf:
services.add("datanode")
elif "resourcemanager" in lf:
services.add("resourcemanager")
elif "nodemanager" in lf:
services.add("nodemanager")
elif "historyserver" in lf:
services.add("historyserver")
for t in services:
ok = log_collector.start_collection(hn, t, ip=ip, interval=interval)
if ok:
started.append(f"{hn}_{t}")
time.sleep(duration)
log_collector.stop_all_collections()
async with engine.begin() as conn:
after = await conn.execute(text("SELECT COUNT(*) FROM hadoop_logs"))
print("HADOOP_LOGS_AFTER", after.scalar() or 0)
last = await conn.execute(text("SELECT cluster_name, node_host, title, log_time FROM hadoop_logs ORDER BY log_id DESC LIMIT 5"))
for row in last.all():
print("LOG", row)
def main():
uuid = os.environ.get("CLUSTER_UUID")
interval = int(os.environ.get("LOG_INTERVAL", "3"))
duration = int(os.environ.get("LOG_DURATION", "10"))
asyncio.run(run(uuid, interval=interval, duration=duration))
if __name__ == "__main__":
main()