You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
7.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""
CSV 数据导入 Neo4j
- 读取 CSV 文件,解析预算,批量 MERGE 到 Neo4j
"""
import os
import csv
import re
import math
from database import driver, run_unwind_batch
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CSV_DIR = os.path.join(BASE_DIR, "data")
def read_csv(filename):
"""读取 CSV自动处理 BOM清理 Neo4j 专用列名"""
path = os.path.join(CSV_DIR, filename)
rows = []
with open(path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
clean = {}
for k, v in row.items():
key = k.strip()
key = re.sub(r":ID\([^)]*\)$", "", key) # projectId:ID(Project) → projectId
key = re.sub(r"^:?(START_ID|END_ID)\([^)]*\)$", # :START_ID(Project) → START_ID
r"\1", key)
key = re.sub(r":int$", "", key) # startYear:int → startYear
key = re.sub(r"^:", "", key) # :LABEL → LABEL
clean[key.strip()] = (v or "").strip()
rows.append(clean)
return rows
def parse_budget(raw):
"""
解析预算字符串为万元 float
'$1,500万' -> 1500.0
'$2亿' -> 20000.0
'$1.5亿' -> 15000.0
"""
if not raw:
return 0.0
s = str(raw).replace(",", "").replace("$", "").replace(" ", "").replace("", "")
m = re.search(r"([\d.]+)亿", s)
if m:
return float(m.group(1)) * 10000.0
m = re.search(r"([\d.]+)万", s)
if m:
return float(m.group(1))
m = re.search(r"[\d.]+", s)
if m:
return float(m.group(0))
return 0.0
def _has_existing_data():
"""检查 Neo4j 中是否已有 Project 节点"""
with driver.session() as s:
result = s.run("MATCH (p:Project) RETURN count(p) AS cnt")
return result.single()["cnt"] > 0
def import_all(force=False):
print("=" * 60)
print("[导入] 开始将 CSV 数据写入 Neo4j …")
print(f"[导入] CSV 目录: {CSV_DIR}")
# 检查是否已存在数据
if not force and _has_existing_data():
with driver.session() as s:
proj_count = s.run("MATCH (p:Project) RETURN count(p) AS cnt").single()["cnt"]
print(f"[导入] 数据库中已有 {proj_count} 个 Project 节点,跳过导入。")
print("[导入] 如需强制重导,请使用: python app.py --force-import")
print("=" * 60)
return
if force:
with driver.session() as s:
proj_count = s.run("MATCH (p:Project) RETURN count(p) AS cnt").single()["cnt"]
print(f"[导入] 强制重导模式,将更新已有 {proj_count} 个项目…")
# 索引
with driver.session() as s:
for idx in [
"CREATE INDEX IF NOT EXISTS FOR (p:Project) ON (p.projectId)",
"CREATE INDEX IF NOT EXISTS FOR (t:Technology) ON (t.techId)",
"CREATE INDEX IF NOT EXISTS FOR (i:Institution) ON (i.instId)",
"CREATE INDEX IF NOT EXISTS FOR (m:Manager) ON (m.managerId)",
"CREATE INDEX IF NOT EXISTS FOR (o:Office) ON (o.officeId)",
]:
s.run(idx)
print("[导入] 索引创建完毕")
# Office
rows = [{"officeId": r.get("officeId","").strip(), "name": r.get("name","").strip()}
for r in read_csv("nodes_office.csv") if r.get("officeId","").strip()]
run_unwind_batch("MERGE (o:Office {officeId:row.officeId}) SET o.name=row.name", rows)
print(f"[导入] Office: {len(rows)}")
# Technology
rows = [{"techId": r.get("techId","").strip(), "name": r.get("name","").strip()}
for r in read_csv("nodes_technology.csv") if r.get("techId","").strip()]
run_unwind_batch("MERGE (t:Technology {techId:row.techId}) SET t.name=row.name", rows)
print(f"[导入] Technology: {len(rows)}")
# Institution
rows = [{"instId": r.get("instId","").strip(), "name": r.get("name","").strip()}
for r in read_csv("nodes_institution.csv") if r.get("instId","").strip()]
run_unwind_batch("MERGE (i:Institution {instId:row.instId}) SET i.name=row.name", rows)
print(f"[导入] Institution: {len(rows)}")
# Manager
rows = [{"managerId": r.get("managerId","").strip(), "name": r.get("name","").strip()}
for r in read_csv("nodes_manager.csv") if r.get("managerId","").strip()]
run_unwind_batch("MERGE (m:Manager {managerId:row.managerId}) SET m.name=row.name", rows)
print(f"[导入] Manager: {len(rows)}")
# Project
proj_params = []
for r in read_csv("nodes_project.csv"):
pid = r.get("projectId","").strip()
if not pid:
continue
try: sy = int(r.get("startYear","0") or 0)
except: sy = 0
try: ey = int(r.get("endYear","0") or 0)
except: ey = 0
proj_params.append({
"projectId": pid,
"nameEn": r.get("nameEn","").strip(),
"nameCn": r.get("nameCn","").strip(),
"status": r.get("status","").strip(),
"startYear": sy, "endYear": ey,
"budget": r.get("budget","").strip(),
"budgetVal": parse_budget(r.get("budget","")),
"domain": r.get("transferDomain", r.get("domain","")).strip(),
})
run_unwind_batch("""MERGE (p:Project {projectId:row.projectId})
SET p.nameEn=row.nameEn, p.nameCn=row.nameCn, p.status=row.status,
p.startYear=row.startYear, p.endYear=row.endYear,
p.budget=row.budget, p.budgetVal=row.budgetVal, p.domain=row.domain""",
proj_params)
print(f"[导入] Project: {len(proj_params)}")
# USES_TECHNOLOGY
rows = [{"pid": r.get("START_ID","").strip(), "tid": r.get("END_ID","").strip()}
for r in read_csv("rels_project_technology.csv")
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(t:Technology {techId:row.tid}) MERGE (p)-[:USES_TECHNOLOGY]->(t)", rows)
print(f"[导入] USES_TECHNOLOGY: {len(rows)}")
# PARTICIPATED_BY
rows = [{"pid": r.get("START_ID","").strip(), "iid": r.get("END_ID","").strip(),
"rtype": r.get("relationType","合作方").strip() or "合作方"}
for r in read_csv("rels_project_institution.csv")
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(i:Institution {instId:row.iid}) MERGE (p)-[rel:PARTICIPATED_BY {relationType:row.rtype}]->(i)", rows)
print(f"[导入] PARTICIPATED_BY: {len(rows)}")
# MANAGED_BY
rows = [{"pid": r.get("START_ID","").strip(), "mid": r.get("END_ID","").strip()}
for r in read_csv("rels_project_manager.csv")
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(m:Manager {managerId:row.mid}) MERGE (p)-[:MANAGED_BY]->(m)", rows)
print(f"[导入] MANAGED_BY: {len(rows)}")
# BELONGS_TO
rows = [{"pid": r.get("START_ID","").strip(), "oid": r.get("END_ID","").strip()}
for r in read_csv("rels_project_office.csv")
if r.get("START_ID","").strip() and r.get("END_ID","").strip()]
run_unwind_batch("MATCH (p:Project {projectId:row.pid}),(o:Office {officeId:row.oid}) MERGE (p)-[:BELONGS_TO]->(o)", rows)
print(f"[导入] BELONGS_TO: {len(rows)}")
print("[导入] 全部完成!")
print("=" * 60)