forked from p5e6vibhr/python_bigData
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
57 lines
1.4 KiB
57 lines
1.4 KiB
11 months ago
|
import happybase
|
||
|
from pyspark.sql import SparkSession
|
||
|
|
||
|
# 连接到HBase
|
||
|
connection = happybase.Connection('192.168.142.144')
|
||
|
|
||
|
# 输出所有表名
|
||
|
print(connection.tables())
|
||
|
|
||
|
# 检查表是否存在
|
||
|
if b'mytest_table' in connection.tables():
|
||
|
# 如果表存在,删除表
|
||
|
connection.disable_table('mytest_table')
|
||
|
connection.delete_table('mytest_table')
|
||
|
|
||
|
# 创建表
|
||
|
connection.create_table(
|
||
|
'mytest_table',
|
||
|
{'info': dict(max_versions=10)}
|
||
|
)
|
||
|
|
||
|
# 获取表
|
||
|
table = connection.table('mytest_table')
|
||
|
|
||
|
# 创建Spark会话
|
||
|
spark = SparkSession.builder.appName("ExcelDataCleaning").master('local').getOrCreate()
|
||
|
|
||
|
# 读取CSV文件为Spark DataFrame
|
||
|
df = spark.read.csv("./Output/数据清洗结果.csv", header=True, inferSchema=True)
|
||
|
|
||
|
# 将Spark DataFrame转换为Pandas DataFrame
|
||
|
pd = df.toPandas()
|
||
|
|
||
|
# 将Pandas DataFrame的数据插入到HBase
|
||
|
for index, row in pd.iterrows():
|
||
|
data = {
|
||
|
f'info:{column}'.encode(): str(row[column]).encode() for column in row.index
|
||
|
}
|
||
|
table.put(str(row['序号']).encode(), data)
|
||
|
|
||
|
# 查询数据
|
||
|
# for key, data in table.scan():
|
||
|
# key = key.decode()
|
||
|
# data = {k.decode(): v.decode() for k, v in data.items()}
|
||
|
# print(key, data)
|
||
|
|
||
|
# 查询前20行数据
|
||
|
for i, (key, data) in enumerate(table.scan()):
|
||
|
if i >= 20:
|
||
|
break
|
||
|
key = key.decode()
|
||
|
data = {k.decode(): v.decode() for k, v in data.items()}
|
||
|
print(key, data)
|
||
|
|
||
|
# 关闭连接
|
||
|
connection.close()
|