import happybase from pyspark.sql import SparkSession # 连接到HBase connection = happybase.Connection('192.168.142.144') # 输出所有表名 print(connection.tables()) # 检查表是否存在 if b'mytest_table' in connection.tables(): # 如果表存在,删除表 connection.disable_table('mytest_table') connection.delete_table('mytest_table') # 创建表 connection.create_table( 'mytest_table', {'info': dict(max_versions=10)} ) # 获取表 table = connection.table('mytest_table') # 创建Spark会话 spark = SparkSession.builder.appName("ExcelDataCleaning").master('local').getOrCreate() # 读取CSV文件为Spark DataFrame df = spark.read.csv("./Output/数据清洗结果.csv", header=True, inferSchema=True) # 将Spark DataFrame转换为Pandas DataFrame pd = df.toPandas() # 将Pandas DataFrame的数据插入到HBase for index, row in pd.iterrows(): data = { f'info:{column}'.encode(): str(row[column]).encode() for column in row.index } table.put(str(row['序号']).encode(), data) # 查询数据 # for key, data in table.scan(): # key = key.decode() # data = {k.decode(): v.decode() for k, v in data.items()} # print(key, data) # 查询前20行数据 for i, (key, data) in enumerate(table.scan()): if i >= 20: break key = key.decode() data = {k.decode(): v.decode() for k, v in data.items()} print(key, data) # 关闭连接 connection.close()