|
|
import pandas as pd
|
|
|
from pyspark.sql import SparkSession
|
|
|
from pyspark.sql import functions as F
|
|
|
import tkinter as tk
|
|
|
from tkinter import filedialog
|
|
|
|
|
|
import os
|
|
|
os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的
|
|
|
|
|
|
selected_file_path = ""
|
|
|
# 打开选择文件函数
|
|
|
def select_and_open_excel_file():
|
|
|
# 创建主窗口
|
|
|
root = tk.Tk()
|
|
|
|
|
|
def on_button_click():
|
|
|
global selected_file_path
|
|
|
# 调用文件选择对话框
|
|
|
file_path = filedialog.askopenfilename(
|
|
|
title="选择Excel文件",
|
|
|
filetypes=[("Excel files", "*.xlsx;*.xls"), ("All files", "*.*")]
|
|
|
)
|
|
|
|
|
|
# 用户取消选择,file_path 为空
|
|
|
if not file_path:
|
|
|
print("用户取消选择文件")
|
|
|
return
|
|
|
|
|
|
# 这里可以使用 file_path 来获取选择的 Excel 文件路径
|
|
|
print("选择的Excel文件路径:", file_path)
|
|
|
selected_file_path = file_path
|
|
|
root.destroy()
|
|
|
# 创建按钮
|
|
|
button = tk.Button(root, text="选择文件", command=on_button_click)
|
|
|
button.pack(pady=20)
|
|
|
|
|
|
# 进入主循环
|
|
|
root.mainloop()
|
|
|
|
|
|
# 调用函数
|
|
|
select_and_open_excel_file()
|
|
|
|
|
|
print(selected_file_path)
|
|
|
|
|
|
# 读取Excel文件
|
|
|
excel_data = pd.read_excel(selected_file_path)
|
|
|
|
|
|
# 将数据写入CSV文件
|
|
|
excel_data.to_csv("excel_data.csv", index=False)
|
|
|
|
|
|
# 创建Spark会话
|
|
|
spark = SparkSession.builder.appName("ExcelDataCleaning").getOrCreate()
|
|
|
|
|
|
# 读取CSV文件为DataFrame
|
|
|
df = spark.read.csv("excel_data.csv", header=True, inferSchema=True)
|
|
|
|
|
|
# 创建一个新列作为行号
|
|
|
df = df.withColumn("row_id", F.monotonically_increasing_id())
|
|
|
|
|
|
# 过滤掉行号为1的行,即第二行
|
|
|
df_without_second_row = df.filter(df.row_id != 1).drop("row_id")
|
|
|
df = df_without_second_row.filter(df.row_id != 0).drop("row_id")
|
|
|
|
|
|
# 需要的列名
|
|
|
df = df.select('序号', '地址', '企业注册地址', '营业收入', '净利润', '其中:研发、试验检验费', '其中:技术(研究)开发费', '其中:技术收入', '其中:技术转让收入',
|
|
|
'技术承包收入', '技术咨询与服务收入', '年末资产总计', '主要业务活动或主要产品1', '主要外资来源国别或地区代码', '企业注册地是否在国家高新区内',
|
|
|
'企业主要生产经营活动是否在国家高新区内', '其中:支付科研人员的工资及福利费','营业成本')
|
|
|
|
|
|
# 输出列名
|
|
|
print(df.columns)
|
|
|
|
|
|
# 删除'地址'列为'qingxiubgs2014@sina.com'或null的行
|
|
|
df = df.filter((F.col('地址') != 'qingxiubgs2014@sina.com') & (F.col('地址').isNotNull()))
|
|
|
|
|
|
# 填充某列空值
|
|
|
df = df.fillna({'主要业务活动或主要产品1': '无'})
|
|
|
df = df.fillna({'主要外资来源国别或地区代码': '0'})
|
|
|
df = df.fillna({'其中:技术收入': '0'})
|
|
|
|
|
|
# 去除重复项
|
|
|
df = df.dropDuplicates()
|
|
|
|
|
|
# 根据序号排序
|
|
|
df = df.withColumn('序号', F.round(F.col('序号')).cast('integer'))
|
|
|
df = df.orderBy('序号')
|
|
|
|
|
|
# 将Spark DataFrame转换为Pandas DataFrame
|
|
|
pd = df.toPandas()
|
|
|
|
|
|
# 将Pandas DataFrame保存为CSV文件
|
|
|
pd.to_csv('washData.csv', index=False)
|