You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

69 lines
3.0 KiB

import happybase
from pyspark.sql import SparkSession
import os
os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的
# 创建HBase连接
connection = happybase.Connection('192.168.23.128')
# 检查表是否存在
if b'bigdata' in connection.tables():
# 如果表存在,删除表
connection.disable_table('bigdata')
connection.delete_table('bigdata')
# 创建表
connection.create_table(
'bigdata',
{'info': dict(max_versions=10)}
)
# 获取表
table = connection.table('bigdata')
# 创建Spark会话
spark = SparkSession.builder.appName("ExcelDataCleaning").master('local').getOrCreate()
# 读取CSV文件为Spark DataFrame
df = spark.read.csv("washData.csv", header=True, inferSchema=True)
# 将Spark DataFrame转换为Pandas DataFrame
pd = df.toPandas()
# 将Pandas DataFrame的数据插入到HBase
for index, row in pd.iterrows():
data = {
f'info:地址'.encode(): str(row['地址']).encode(),
f'info:企业注册地址'.encode(): str(row['企业注册地址']).encode(),
f'info:营业收入'.encode(): str(row['营业收入']).encode(),
f'info:净利润'.encode(): str(row['净利润']).encode(),
f'info:其中:研发、试验检验费'.encode(): str(row['其中:研发、试验检验费']).encode(),
f'info:其中:技术(研究)开发费'.encode(): str(row['其中:技术(研究)开发费']).encode(),
f'info:其中:技术收入'.encode(): str(row['其中:技术收入']).encode(),
f'info:其中:技术转让收入'.encode(): str(row['其中:技术转让收入']).encode(),
f'info:技术承包收入'.encode(): str(row['技术承包收入']).encode(),
f'info:技术咨询与服务收入'.encode(): str(row['技术咨询与服务收入']).encode(),
f'info:年末资产总计'.encode(): str(row['年末资产总计']).encode(),
f'info:主要业务活动或主要产品1'.encode(): str(row['主要业务活动或主要产品1']).encode(),
f'info:主要外资来源国别或地区代码'.encode(): str(row['主要外资来源国别或地区代码']).encode(),
f'info:企业注册地是否在国家高新区内'.encode(): str(row['企业注册地是否在国家高新区内']).encode(),
f'info:企业主要生产经营活动是否在国家高新区内'.encode(): str(row['企业主要生产经营活动是否在国家高新区内']).encode(),
f'info:其中:支付科研人员的工资及福利费'.encode(): str(row['其中:支付科研人员的工资及福利费']).encode(),
f'info:营业成本'.encode(): str(row['营业成本']).encode()
}
table.put(str(row['序号']).encode(), data)
# 查询数据
# for key, data in table.scan():
# key = key.decode()
# data = {k.decode(): v.decode() for k, v in data.items()}
# print(key, data)
# 查询前20行数据
for i, (key, data) in enumerate(table.scan()):
if i >= 20:
break
key = key.decode()
data = {k.decode(): v.decode() for k, v in data.items()}
print(key, data)
# 关闭连接
connection.close()