You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

53 lines
2.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pandas
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# 读取Excel文件
excel_data = pandas.read_excel("dataExcel.xlsx")
# 将数据写入CSV文件
excel_data.to_csv("./Output/dataExcel.csv", index=False)
# 创建Spark会话
spark = SparkSession.builder.appName("ExcelDataCleaning").master('local').getOrCreate()
# 读取CSV文件为Spark DataFrame
df = spark.read.csv("./Output/dataExcel.csv", header=True, inferSchema=True)
# 创建一个新列作为行号
df = df.withColumn("row_id", F.monotonically_increasing_id())
# 过滤掉行号为1的行即第二行
df_without_second_row = df.filter(df.row_id != 1).drop("row_id")
df = df_without_second_row.filter(df.row_id != 0).drop("row_id")
# 需要的列名
df = df.select('序号', '地址', '企业注册地址', '注册资金', '注册时间', '营业收入', '营业成本', '净利润', '其中:研发、试验检验费', '其中:技术(研究)开发费', '其中:支付科研人员的工资及福利费', '其中:出口总额',
'主要业务活动或主要产品1', '主要外资来源国别或地区代码', '主要外资出资比例', '本公司是否为上市(挂牌)企业主体', '工业总产值(当年价格)', '实际上缴税费总额',
'从业人员期末人数', '其中:博士', '其中:硕士', '具有大学本科学历(位)人员', '具有大学专科学历人员', '企业被认定为高新技术企业的时间', '上市挂牌时间', '其中:技术转让收入')
# 输出列名
print(df.columns)
# 删除'地址'列为'qingxiubgs2014@sina.com'或null的行
df = df.filter((F.col('地址') != 'qingxiubgs2014@sina.com') & (F.col('地址').isNotNull()))
# 填充某列空值
df = df.fillna({'本公司是否为上市(挂牌)企业主体': '0'})
df = df.fillna({'主要业务活动或主要产品1': ''})
df = df.fillna({'主要外资来源国别或地区代码': '0'})
df = df.fillna({'上市挂牌时间': '0'})
# 去除重复项
df = df.dropDuplicates()
# 根据序号排序
df = df.withColumn('序号', F.round(F.col('序号')).cast('integer'))
df = df.orderBy('序号')
# 将Spark DataFrame转换为Pandas DataFrame
pd = df.toPandas()
# 将Pandas DataFrame保存为CSV文件
pd.to_csv('./Output/数据清洗结果.csv', index=False)