集群注册_容器ssh连接问题修复

pull/48/head
echo 4 months ago
parent f0fccd5c60
commit 22e4772346

@ -1,3 +1,5 @@
import os
import socket
import paramiko
from typing import Optional, TextIO, Dict, Tuple
from .config import SSH_PORT, SSH_TIMEOUT
@ -27,13 +29,17 @@ class SSHClient:
self.client = paramiko.SSHClient()
# Automatically add host keys
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect to the server
sock = None
socks5 = os.getenv("TS_SOCKS5_SERVER") or os.getenv("TAILSCALE_SOCKS5_SERVER")
if socks5:
sock = _socks5_connect(socks5, self.hostname, self.port, SSH_TIMEOUT)
self.client.connect(
hostname=self.hostname,
username=self.username,
password=self.password,
port=self.port,
timeout=SSH_TIMEOUT
timeout=SSH_TIMEOUT,
sock=sock,
)
def execute_command(self, command: str) -> tuple:
@ -119,4 +125,72 @@ class SSHConnectionManager:
self.close_all()
# Create a global SSH connection manager instance
ssh_manager = SSHConnectionManager()
ssh_manager = SSHConnectionManager()
def _parse_hostport(value: str, default_port: int) -> tuple[str, int]:
s = (value or "").strip()
if not s:
return ("127.0.0.1", default_port)
if s.startswith("http://"):
s = s[7:]
if s.startswith("socks5://"):
s = s[9:]
if "/" in s:
s = s.split("/", 1)[0]
if ":" in s:
host, port_s = s.rsplit(":", 1)
try:
return (host.strip() or "127.0.0.1", int(port_s.strip()))
except Exception:
return (host.strip() or "127.0.0.1", default_port)
return (s, default_port)
def _socks5_connect(proxy: str, dest_host: str, dest_port: int, timeout: int) -> socket.socket:
proxy_host, proxy_port = _parse_hostport(proxy, 1080)
s = socket.create_connection((proxy_host, proxy_port), timeout=timeout)
s.settimeout(timeout)
s.sendall(b"\x05\x01\x00")
resp = s.recv(2)
if len(resp) != 2 or resp[0] != 0x05 or resp[1] != 0x00:
s.close()
raise RuntimeError("socks5_auth_failed")
atyp = 0x03
addr = dest_host.encode("utf-8")
try:
packed = socket.inet_pton(socket.AF_INET, dest_host)
atyp = 0x01
addr_field = packed
except Exception:
try:
packed6 = socket.inet_pton(socket.AF_INET6, dest_host)
atyp = 0x04
addr_field = packed6
except Exception:
if len(addr) > 255:
s.close()
raise RuntimeError("socks5_domain_too_long")
addr_field = bytes([len(addr)]) + addr
port_field = int(dest_port).to_bytes(2, "big", signed=False)
req = b"\x05\x01\x00" + bytes([atyp]) + addr_field + port_field
s.sendall(req)
head = s.recv(4)
if len(head) != 4 or head[0] != 0x05:
s.close()
raise RuntimeError("socks5_bad_reply")
rep = head[1]
if rep != 0x00:
s.close()
raise RuntimeError(f"socks5_connect_failed:{rep}")
bnd_atyp = head[3]
if bnd_atyp == 0x01:
s.recv(4)
elif bnd_atyp == 0x04:
s.recv(16)
elif bnd_atyp == 0x03:
ln = s.recv(1)
if ln:
s.recv(ln[0])
s.recv(2)
return s

@ -0,0 +1,119 @@
# Tailscale 启动指南
本文档用于在本项目部署/联调环境中启动与验证 Tailscale 连接。覆盖两类常见环境:
- 机器/VM 有 systemd可用 `systemctl` 管理 `tailscaled`
- 容器/受限环境无 systemd例如 PID 1 不是 systemd使用 userspace networking 启动 `tailscaled`
## 1. 前置检查
确认已安装:
```bash
which tailscale
tailscale version
```
查看当前状态:
```bash
tailscale status
```
如果提示 `failed to connect to local tailscaled`,说明 `tailscaled` 没有运行或 socket 不对。
## 2. 方式 Asystemd 环境启动(推荐)
启动并设置开机自启:
```bash
sudo systemctl start tailscaled
sudo systemctl enable tailscaled
```
登录并启用(首次需要网页登录授权):
```bash
sudo tailscale up --accept-dns=false --accept-routes=false
```
验证:
```bash
tailscale status
tailscale ip -4
```
## 3. 方式 B无 systemd / 容器环境启动userspace networking
`systemctl` 不可用(例如 PID 1 不是 systemd用 userspace networking 启动 `tailscaled`
```bash
sudo tailscaled \
--tun=userspace-networking \
--socket=/var/run/tailscale/tailscaled.sock \
--state=/var/lib/tailscale/tailscaled.state
```
为了让其在后台运行且不占用终端,可使用:
```bash
sudo nohup tailscaled \
--tun=userspace-networking \
--socket=/var/run/tailscale/tailscaled.sock \
--state=/var/lib/tailscale/tailscaled.state \
>/tmp/tailscaled.log 2>&1 &
```
首次登录(会输出一个 URL打开后完成授权
```bash
sudo tailscale up --accept-dns=false --accept-routes=false
```
验证:
```bash
tailscale status
tailscale ip -4
```
## 4. 常用参数说明
- `--accept-dns=false`:避免 Tailscale 接管系统 DNS更稳妥减少联调环境干扰
- `--accept-routes=false`:不接收其它节点宣告的子网路由(除非明确需要)
如果你看到 `Some peers are advertising routes but --accept-routes is false` 属正常提示。
## 5. 常见问题
### 5.1 `Logged out.` / `NeedsLogin`
执行:
```bash
sudo tailscale up --accept-dns=false --accept-routes=false
```
根据输出提示访问登录链接完成授权。
### 5.2 `failed to connect to local tailscaled`
说明 `tailscaled` 未运行或 socket 路径不一致:
- systemd 环境:确认 `sudo systemctl status tailscaled`
- 无 systemd 环境:确认 `tailscaled` 进程存在,以及 `--socket` 路径与 `tailscale` 命令一致
### 5.3 退出/停止
退出网络:
```bash
sudo tailscale down
```
停止守护进程:
- systemd`sudo systemctl stop tailscaled`
- 无 systemd`sudo pkill tailscaled`

@ -0,0 +1,97 @@
# 后端 Socks 代理启动指南
本文档说明如何在“无 TUN / 无 systemd”的环境中通过 Tailscale userspace networking + 本地 SOCKS5让后端在注册集群时能正常用 SSH 连接到各节点。
适用场景:
- `ls -l /dev/net/tun` 提示不存在
- `ssh hadoop@100.x.x.x` 出现 `Connection closed by UNKNOWN port 65535` 或 TCP 22 超时
- 后端集群注册返回 `注册失败SSH不可连接 (timed out)`
## 1. 启动 tailscaleduserspace + SOCKS5
以 root 权限后台启动,并在本机开一个 SOCKS5 代理端口 `127.0.0.1:1080`
```bash
sudo nohup /usr/sbin/tailscaled \
--tun=userspace-networking \
--socket=/var/run/tailscale/tailscaled.sock \
--state=/var/lib/tailscale/tailscaled.state \
--socks5-server=127.0.0.1:1080 \
>/tmp/tailscaled.log 2>&1 &
```
检查是否启动成功:
```bash
pgrep -a tailscaled
python3 -c "import socket; s=socket.create_connection(('127.0.0.1',1080),2); print('socks_up'); s.close()"
```
## 2. 登录并加入 tailnet
首次使用需要登录授权:
```bash
sudo tailscale up --accept-dns=false --accept-routes=true
```
按输出提示打开登录链接完成授权,然后验证:
```bash
tailscale status
```
## 3. 启动后端(让 SSH 走 SOCKS5
启动后端时设置环境变量 `TS_SOCKS5_SERVER`,让后端的 SSH 探测通过 SOCKS5 走 tailscale netstack
```bash
TS_SOCKS5_SERVER='127.0.0.1:1080' \
python -m uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
```
说明:
- `TS_SOCKS5_SERVER`:后端通过此 SOCKS5 建立到节点的 SSH 连接
- `--reload`:开发态自动重载(生产环境建议去掉)
## 4. 快速验证(后端视角)
在后端启动后运行集群注册测试(见“集群注册测试运行指南”):
```bash
LOGIN_PASSWORD='admin123' \
HADOOP_PASSWORD='limouren...' \
BASE_URL='http://127.0.0.1:8000' \
python3 /home/devbox/project/backend/tests/test_cluster_register.py
```
如果成功用例通过,说明 SSH 路由链路已打通。
## 5. 常见问题
### 5.1 `tailscale status` 显示 `NeedsLogin`
重新执行:
```bash
sudo tailscale up --accept-dns=false --accept-routes=true
```
### 5.2 SOCKS5 端口不可用
检查 `tailscaled` 是否已启动:
```bash
pgrep -a tailscaled
tail -n 50 /tmp/tailscaled.log
```
如需重启:
```bash
sudo pkill tailscaled
sudo nohup /usr/sbin/tailscaled --tun=userspace-networking --socket=/var/run/tailscale/tailscaled.sock --state=/var/lib/tailscale/tailscaled.state --socks5-server=127.0.0.1:1080 >/tmp/tailscaled.log 2>&1 &
```

@ -0,0 +1,73 @@
# 集群注册测试运行指南
本文档说明如何运行集群注册测试脚本 `backend/tests/test_cluster_register.py`,覆盖“成功/失败”两种注册情况。
## 1. 前置条件
- 后端服务已启动并可访问(默认 `http://127.0.0.1:8000`
- 你使用的登录账号有集群注册权限(后端限制为 `admin``ops`
- 后端能够连通集群各节点的 SSH若当前环境无 TUN请先按“后端带 SOCKS5 代理启动指南”启动)
## 2. 测试数据说明
脚本内置了你提供的“正确集群 test”节点信息
- hadoop102 100.71.90.16 NameNode
- hadoop103 100.74.47.4 ResourceManager
- hadoop104 100.99.172.96 SecondaryNameNode
- hadoop105 100.91.174.104
- hadoop100 100.73.220.46
所有节点默认使用:
- 用户名:`hadoop`
- 密码:`limouren...`
失败用例由脚本自动生成(将 `type` 改为非法值),期望后端返回 `400`
## 3. 运行方式
在项目根目录执行:
```bash
LOGIN_PASSWORD='admin123' \
HADOOP_PASSWORD='limouren...' \
BASE_URL='http://127.0.0.1:8000' \
python3 /home/devbox/project/backend/tests/test_cluster_register.py
```
可选环境变量:
- `BASE_URL`:后端地址,默认 `http://127.0.0.1:8000`
- `LOGIN_USER`:登录用户名,默认 `admin`
- `LOGIN_PASSWORD`:登录密码(必填)
- `HADOOP_USER`:集群 SSH 用户名,默认 `hadoop`
- `HADOOP_PASSWORD`:集群 SSH 密码(必填)
## 4. 预期输出
成功时输出类似:
- `成功用例通过: uuid= ...`
- `失败用例通过: status=400 detail= ...`
脚本退出码:
- `0`:两种情况都符合预期
- `1`:用例未按预期(例如成功用例返回非 200或失败用例未返回 400
- `2`:缺少必要环境变量
## 5. 常见失败与排查
### 5.1 成功用例报 SSH 不可连接 / timed out
现象通常为后端返回:
- `注册失败SSH不可连接`
- `detail: timed out`
处理方式:
- 确认 tailscale 侧能够看到节点在线:`tailscale status`
- 若当前环境没有 `/dev/net/tun`,必须使用 userspace networking + SOCKS5并让后端通过 `TS_SOCKS5_SERVER` 走代理启动

@ -0,0 +1,138 @@
import os
import time
import json
import httpx
def _env(name: str, default: str | None = None) -> str | None:
v = os.environ.get(name)
if v is None:
return default
v2 = v.strip()
return v2 if v2 else default
def _login(client: httpx.Client, base_url: str, username: str, password: str) -> str:
r = client.post(
f"{base_url}/api/v1/user/login",
json={"username": username, "password": password},
timeout=20,
)
r.raise_for_status()
data = r.json()
token = data.get("token")
if not token:
raise RuntimeError("login_no_token")
return token
def _auth_headers(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _list_clusters(client: httpx.Client, base_url: str, token: str) -> list[dict]:
r = client.get(f"{base_url}/api/v1/clusters", headers=_auth_headers(token), timeout=20)
r.raise_for_status()
data = r.json() or {}
return data.get("clusters") or []
def _delete_cluster(client: httpx.Client, base_url: str, token: str, uuid: str) -> None:
r = client.delete(f"{base_url}/api/v1/clusters/{uuid}", headers=_auth_headers(token), timeout=30)
r.raise_for_status()
def _create_cluster(client: httpx.Client, base_url: str, token: str, payload: dict) -> httpx.Response:
return client.post(
f"{base_url}/api/v1/clusters",
headers={**_auth_headers(token), "Content-Type": "application/json"},
content=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
timeout=120,
)
def _success_payload(cluster_name: str, ssh_user: str, ssh_password: str) -> dict:
nodes = [
{"hostname": "hadoop102", "ip_address": "100.71.90.16", "ssh_user": ssh_user, "ssh_password": ssh_password},
{"hostname": "hadoop103", "ip_address": "100.74.47.4", "ssh_user": ssh_user, "ssh_password": ssh_password},
{"hostname": "hadoop104", "ip_address": "100.99.172.96", "ssh_user": ssh_user, "ssh_password": ssh_password},
{"hostname": "hadoop105", "ip_address": "100.91.174.104", "ssh_user": ssh_user, "ssh_password": ssh_password},
{"hostname": "hadoop100", "ip_address": "100.73.220.46", "ssh_user": ssh_user, "ssh_password": ssh_password},
]
return {
"name": cluster_name,
"type": "hadoop",
"node_count": 5,
"health_status": "unknown",
"description": "test cluster register",
"namenode_ip": "100.71.90.16",
"namenode_psw": ssh_password,
"rm_ip": "100.74.47.4",
"rm_psw": ssh_password,
"nodes": nodes,
}
def _failure_payload(base: dict) -> dict:
bad = dict(base)
bad["name"] = f"{base['name']}-bad-{int(time.time())}"
bad["type"] = "bad_type"
return bad
def main() -> int:
base_url = _env("BASE_URL", "http://127.0.0.1:8000")
login_user = _env("LOGIN_USER", "admin")
login_password = _env("LOGIN_PASSWORD")
ssh_user = _env("HADOOP_USER", "hadoop")
ssh_password = _env("HADOOP_PASSWORD")
missing = [k for k, v in [("LOGIN_PASSWORD", login_password), ("HADOOP_PASSWORD", ssh_password)] if not v]
if missing:
print(f"缺少环境变量: {', '.join(missing)}")
print("示例LOGIN_PASSWORD=admin123 HADOOP_PASSWORD='limouren...' BASE_URL=http://127.0.0.1:8000 python3 backend/tests/test_cluster_register.py")
return 2
with httpx.Client() as client:
token = _login(client, base_url, login_user, login_password)
clusters = _list_clusters(client, base_url, token)
for c in clusters:
if c.get("name") == "test" and c.get("uuid"):
_delete_cluster(client, base_url, token, c["uuid"])
ok_payload = _success_payload("test", ssh_user, ssh_password)
r_ok = _create_cluster(client, base_url, token, ok_payload)
if r_ok.status_code != 200:
try:
print("成功用例失败:", r_ok.status_code, r_ok.json())
except Exception:
print("成功用例失败:", r_ok.status_code, r_ok.text[:500])
return 1
data_ok = r_ok.json() or {}
if data_ok.get("status") != "success":
print("成功用例返回异常:", data_ok)
return 1
uuid = data_ok.get("uuid")
print("成功用例通过: uuid=", uuid)
bad_payload = _failure_payload(ok_payload)
r_bad = _create_cluster(client, base_url, token, bad_payload)
if r_bad.status_code != 400:
try:
print("失败用例未按预期返回 400:", r_bad.status_code, r_bad.json())
except Exception:
print("失败用例未按预期返回 400:", r_bad.status_code, r_bad.text[:500])
return 1
try:
detail = r_bad.json()
except Exception:
detail = {"raw": r_bad.text[:500]}
print("失败用例通过: status=400 detail=", detail)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Loading…
Cancel
Save