|
|
# -*- coding: utf-8 -*-
|
|
|
"""
|
|
|
CSV 数据导入 Neo4j
|
|
|
- 读取 CSV 文件,解析预算,批量 MERGE 到 Neo4j
|
|
|
"""
|
|
|
import os
|
|
|
import csv
|
|
|
import re
|
|
|
import math
|
|
|
from database import driver, run_unwind_batch
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
CSV_DIR = os.path.join(BASE_DIR, "data")
|
|
|
|
|
|
|
|
|
def read_csv(filename):
|
|
|
"""读取 CSV,自动处理 BOM,清理 Neo4j 专用列名"""
|
|
|
path = os.path.join(CSV_DIR, filename)
|
|
|
rows = []
|
|
|
with open(path, newline="", encoding="utf-8-sig") as f:
|
|
|
reader = csv.DictReader(f)
|
|
|
for row in reader:
|
|
|
clean = {}
|
|
|
for k, v in row.items():
|
|
|
key = k.strip()
|
|
|
key = re.sub(r":ID\([^)]*\)$", "", key) # projectId:ID(Project) → projectId
|
|
|
key = re.sub(r"^:?(START_ID|END_ID)\([^)]*\)$", # :START_ID(Project) → START_ID
|
|
|
r"\1", key)
|
|
|
key = re.sub(r":int$", "", key) # startYear:int → startYear
|
|
|
key = re.sub(r"^:", "", key) # :LABEL → LABEL
|
|
|
clean[key.strip()] = (v or "").strip()
|
|
|
rows.append(clean)
|
|
|
return rows
|
|
|
|
|
|
|
|
|
def parse_budget(raw):
|
|
|
"""
|
|
|
解析预算字符串为万元 float
|
|
|
'$1,500万' -> 1500.0
|
|
|
'$2亿' -> 20000.0
|
|
|
'$1.5亿' -> 15000.0
|
|
|
"""
|
|
|
if not raw:
|
|
|
return 0.0
|
|
|
s = str(raw).replace(",", "").replace("$", "").replace(" ", "").replace(",", "")
|
|
|
m = re.search(r"([\d.]+)亿", s)
|
|
|
if m:
|
|
|
return float(m.group(1)) * 10000.0
|
|
|
m = re.search(r"([\d.]+)万", s)
|
|
|
if m:
|
|
|
return float(m.group(1))
|
|
|
m = re.search(r"[\d.]+", s)
|
|
|
if m:
|
|
|
return float(m.group(0))
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
def _has_existing_data():
|
|
|
"""检查 Neo4j 中是否已有 Project 节点"""
|
|
|
with driver.session() as s:
|
|
|
result = s.run("MATCH (p:Project) RETURN count(p) AS cnt")
|
|
|
return result.single()["cnt"] > 0
|
|
|
|
|
|
|
|
|
def import_all(force=False):
|
|
|
print("=" * 60)
|
|
|
print("[导入] 开始将 CSV 数据写入 Neo4j …")
|
|
|
print(f"[导入] CSV 目录: {CSV_DIR}")
|
|
|
|
|
|
# 检查是否已存在数据
|
|
|
if not force and _has_existing_data():
|
|
|
with driver.session() as s:
|
|
|
proj_count = s.run("MATCH (p:Project) RETURN count(p) AS cnt").single()["cnt"]
|
|
|
print(f"[导入] 数据库中已有 {proj_count} 个 Project 节点,跳过导入。")
|
|
|
print("[导入] 如需强制重导,请使用: python app.py --force-import")
|
|
|
print("=" * 60)
|
|
|
return
|
|
|
|
|
|
if force:
|
|
|
with driver.session() as s:
|
|
|
proj_count = s.run("MATCH (p:Project) RETURN count(p) AS cnt").single()["cnt"]
|
|
|
print(f"[导入] 强制重导模式,将更新已有 {proj_count} 个项目…")
|
|
|
|
|
|
# 索引
|
|
|
with driver.session() as s:
|
|
|
for idx in [
|
|
|
"CREATE INDEX IF NOT EXISTS FOR (p:Project) ON (p.projectId)",
|
|
|
"CREATE INDEX IF NOT EXISTS FOR (t:Technology) ON (t.techId)",
|
|
|
"CREATE INDEX IF NOT EXISTS FOR (i:Institution) ON (i.instId)",
|
|
|
"CREATE INDEX IF NOT EXISTS FOR (m:Manager) ON (m.managerId)",
|
|
|
"CREATE INDEX IF NOT EXISTS FOR (o:Office) ON (o.officeId)",
|
|
|
]:
|
|
|
s.run(idx)
|
|
|
print("[导入] 索引创建完毕")
|
|
|
|
|
|
# Office
|
|
|
rows = [{"officeId": r.get("officeId","").strip(), "name": r.get("name","").strip()}
|
|
|
for r in read_csv("nodes_office.csv") if r.get("officeId","").strip()]
|
|
|
run_unwind_batch("MERGE (o:Office {officeId:row.officeId}) SET o.name=row.name", rows)
|
|
|
print(f"[导入] Office: {len(rows)}")
|
|
|
|
|
|
# Technology
|
|
|
rows = [{"techId": r.get("techId","").strip(), "name": r.get("name","").strip()}
|
|
|
for r in read_csv("nodes_technology.csv") if r.get("techId","").strip()]
|
|
|
run_unwind_batch("MERGE (t:Technology {techId:row.techId}) SET t.name=row.name", rows)
|
|
|
print(f"[导入] Technology: {len(rows)}")
|
|
|
|
|
|
# Institution
|
|
|
rows = [{"instId": r.get("instId","").strip(), "name": r.get("name","").strip()}
|
|
|
for r in read_csv("nodes_institution.csv") if r.get("instId","").strip()]
|
|
|
run_unwind_batch("MERGE (i:Institution {instId:row.instId}) SET i.name=row.name", rows)
|
|
|
print(f"[导入] Institution: {len(rows)}")
|
|
|
|
|
|
# Manager
|
|
|
rows = [{"managerId": r.get("managerId","").strip(), "name": r.get("name","").strip()}
|
|
|
for r in read_csv("nodes_manager.csv") if r.get("managerId","").strip()]
|
|
|
run_unwind_batch("MERGE (m:Manager {managerId:row.managerId}) SET m.name=row.name", rows)
|
|
|
print(f"[导入] Manager: {len(rows)}")
|
|
|
|
|
|
# Project
|
|
|
proj_params = []
|
|
|
for r in read_csv("nodes_project.csv"):
|
|
|
pid = r.get("projectId","").strip()
|
|
|
if not pid:
|
|
|
continue
|
|
|
try: sy = int(r.get("startYear","0") or 0)
|
|
|
except: sy = 0
|
|
|
try: ey = int(r.get("endYear","0") or 0)
|
|
|
except: ey = 0
|
|
|
proj_params.append({
|
|
|
"projectId": pid,
|
|
|
"nameEn": r.get("nameEn","").strip(),
|
|
|
"nameCn": r.get("nameCn","").strip(),
|
|
|
"status": r.get("status","").strip(),
|
|
|
"startYear": sy, "endYear": ey,
|
|
|
"budget": r.get("budget","").strip(),
|
|
|
"budgetVal": parse_budget(r.get("budget","")),
|
|
|
"domain": r.get("transferDomain", r.get("domain","")).strip(),
|
|
|
})
|
|
|
run_unwind_batch("""MERGE (p:Project {projectId:row.projectId})
|
|
|
SET p.nameEn=row.nameEn, p.nameCn=row.nameCn, p.status=row.status,
|
|
|
p.startYear=row.startYear, p.endYear=row.endYear,
|
|
|
p.budget=row.budget, p.budgetVal=row.budgetVal, p.domain=row.domain""",
|
|
|
proj_params)
|
|
|
print(f"[导入] Project: {len(proj_params)}")
|
|
|
|
|
|
# USES_TECHNOLOGY
|
|
|
rows = [{"pid": r.get("START_ID","").strip(), "tid": r.get("END_ID","").strip()}
|
|
|
for r in read_csv("rels_project_technology.csv")
|
|
|
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
|
|
|
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(t:Technology {techId:row.tid}) MERGE (p)-[:USES_TECHNOLOGY]->(t)", rows)
|
|
|
print(f"[导入] USES_TECHNOLOGY: {len(rows)}")
|
|
|
|
|
|
# PARTICIPATED_BY
|
|
|
rows = [{"pid": r.get("START_ID","").strip(), "iid": r.get("END_ID","").strip(),
|
|
|
"rtype": r.get("relationType","合作方").strip() or "合作方"}
|
|
|
for r in read_csv("rels_project_institution.csv")
|
|
|
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
|
|
|
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(i:Institution {instId:row.iid}) MERGE (p)-[rel:PARTICIPATED_BY {relationType:row.rtype}]->(i)", rows)
|
|
|
print(f"[导入] PARTICIPATED_BY: {len(rows)}")
|
|
|
|
|
|
# MANAGED_BY
|
|
|
rows = [{"pid": r.get("START_ID","").strip(), "mid": r.get("END_ID","").strip()}
|
|
|
for r in read_csv("rels_project_manager.csv")
|
|
|
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
|
|
|
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(m:Manager {managerId:row.mid}) MERGE (p)-[:MANAGED_BY]->(m)", rows)
|
|
|
print(f"[导入] MANAGED_BY: {len(rows)}")
|
|
|
|
|
|
# BELONGS_TO
|
|
|
rows = [{"pid": r.get("START_ID","").strip(), "oid": r.get("END_ID","").strip()}
|
|
|
for r in read_csv("rels_project_office.csv")
|
|
|
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
|
|
|
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(o:Office {officeId:row.oid}) MERGE (p)-[:BELONGS_TO]->(o)", rows)
|
|
|
print(f"[导入] BELONGS_TO: {len(rows)}")
|
|
|
|
|
|
print("[导入] 全部完成!")
|
|
|
print("=" * 60)
|