You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
67 lines
2.6 KiB
67 lines
2.6 KiB
import os
|
|
import asyncio
|
|
import time
|
|
from sqlalchemy import select, text
|
|
from app.db import SessionLocal, engine
|
|
from app.models.clusters import Cluster
|
|
from app.models.nodes import Node
|
|
from app.log_reader import log_reader
|
|
from app.log_collector import log_collector
|
|
|
|
async def run(cluster_uuid: str, interval: int = 3, duration: int = 10):
|
|
async with engine.begin() as conn:
|
|
res = await conn.execute(text("SELECT id FROM clusters WHERE uuid=:u LIMIT 1"), {"u": cluster_uuid})
|
|
row = res.first()
|
|
if not row:
|
|
print("CLUSTER_NOT_FOUND")
|
|
return
|
|
cid = row[0]
|
|
before = await conn.execute(text("SELECT COUNT(*) FROM hadoop_logs"))
|
|
print("HADOOP_LOGS_BEFORE", before.scalar() or 0)
|
|
async with SessionLocal() as session:
|
|
nodes_res = await session.execute(select(Node.hostname, Node.ip_address).where(Node.cluster_id == cid))
|
|
nodes = [(r[0], str(r[1])) for r in nodes_res.all()]
|
|
started = []
|
|
for hn, ip in nodes:
|
|
try:
|
|
log_reader.find_working_log_dir(hn, ip)
|
|
files = log_reader.get_log_files_list(hn, ip=ip)
|
|
except Exception:
|
|
files = []
|
|
services = set()
|
|
for f in files:
|
|
lf = f.lower()
|
|
if "namenode" in lf:
|
|
services.add("namenode")
|
|
elif "secondarynamenode" in lf:
|
|
services.add("secondarynamenode")
|
|
elif "datanode" in lf:
|
|
services.add("datanode")
|
|
elif "resourcemanager" in lf:
|
|
services.add("resourcemanager")
|
|
elif "nodemanager" in lf:
|
|
services.add("nodemanager")
|
|
elif "historyserver" in lf:
|
|
services.add("historyserver")
|
|
for t in services:
|
|
ok = log_collector.start_collection(hn, t, ip=ip, interval=interval)
|
|
if ok:
|
|
started.append(f"{hn}_{t}")
|
|
time.sleep(duration)
|
|
log_collector.stop_all_collections()
|
|
async with engine.begin() as conn:
|
|
after = await conn.execute(text("SELECT COUNT(*) FROM hadoop_logs"))
|
|
print("HADOOP_LOGS_AFTER", after.scalar() or 0)
|
|
last = await conn.execute(text("SELECT cluster_name, node_host, title, log_time FROM hadoop_logs ORDER BY log_id DESC LIMIT 5"))
|
|
for row in last.all():
|
|
print("LOG", row)
|
|
|
|
def main():
|
|
uuid = os.environ.get("CLUSTER_UUID")
|
|
interval = int(os.environ.get("LOG_INTERVAL", "3"))
|
|
duration = int(os.environ.get("LOG_DURATION", "10"))
|
|
asyncio.run(run(uuid, interval=interval, duration=duration))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|