import happybase from pyecharts.charts import Bar, Pie, Page, Map from pyecharts.commons.utils import JsCode from pyspark.sql import functions as F from pyspark.sql import SparkSession, Row import os from pyspark.sql.functions import when, count from pyecharts import options as opts # 指定python环境 os.environ['PYSPARK_PYTHON'] = 'D:\python\python3.11.4\python.exe' os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的 def cky_code03(): # 创建Spark会话 spark = SparkSession.builder.appName("HBaseDataLoading").master('local').getOrCreate() # 连接到HBase connection = happybase.Connection('192.168.23.128') # 获取表 table = connection.table('bigdata') # 定义列名 columns = ['info:企业注册地址', 'info:主要业务活动或主要产品1'] # 查询数据 data = [] for key, row in table.scan(columns=[col.encode('utf-8') for col in columns]): row_data = { '序号': key.decode(), '企业注册地址': row['info:企业注册地址'.encode('utf-8')].decode(), '主要业务活动或主要产品1': row['info:主要业务活动或主要产品1'.encode('utf-8')].decode() } data.append(row_data) # 关闭连接 connection.close() # 将数据转换为Spark DataFrame df = spark.createDataFrame([Row(**{k: str(v) for k, v in i.items()}) for i in data]) # 广西的14个市级名称 cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市', '贺州市', '河池市', '来宾市', '崇左市'] # 对每个市进行处理 for city in cities: df = df.withColumn('企业注册地址', when(df['企业注册地址'].contains(city[:-1]), city).otherwise(df['企业注册地址'])) # 广西的所有县级行政区划及其对应的市级行政区划 county_to_city = { # 南宁市 '兴宁区': '南宁市', '青秀区': '南宁市', '江南区': '南宁市', '西乡塘区': '南宁市', '良庆区': '南宁市', '邕宁区': '南宁市', '武鸣区': '南宁市', '隆安县': '南宁市', '马山县': '南宁市', '上林县': '南宁市', '宾阳县': '南宁市', '横县': '南宁市', # 柳州市 '城中区': '柳州市', '鱼峰区': '柳州市', '柳南区': '柳州市', '柳北区': '柳州市', '柳江区': '柳州市', '柳城县': '柳州市', '鹿寨县': '柳州市', '融安县': '柳州市', '融水苗族自治县': '柳州市', '三江侗族自治县': '柳州市', # 桂林市 '秀峰区': '桂林市', '叠彩区': '桂林市', '象山区': '桂林市', '七星区': '桂林市', '雁山区': '桂林市', '临桂区': '桂林市', '阳朔县': '桂林市', '灵川县': '桂林市', '全州县': '桂林市', '兴安县': '桂林市', '永福县': '桂林市', '灌阳县': '桂林市', '龙胜各族自治县': '桂林市', '资源县': '桂林市', '平乐县': '桂林市', '荔浦市': '桂林市', '恭城瑶族自治县': '桂林市', # 梧州市 '万秀区': '梧州市', '长洲区': '梧州市', '龙圩区': '梧州市', '苍梧县': '梧州市', '藤县': '梧州市', '蒙山县': '梧州市', '岑溪市': '梧州市', # 北海市 '海城区': '北海市', '银海区': '北海市', '铁山港区': '北海市', '合浦县': '北海市', # 防城港市 '港口区': '防城港市', '防城区': '防城港市', '上思县': '防城港市', '东兴市': '防城港市', # 钦州市 '钦南区': '钦州市', '钦北区': '钦州市', '灵山县': '钦州市', '浦北县': '钦州市', # 贵港市 '港北区': '贵港市', '港南区': '贵港市', '覃塘区': '贵港市', '平南县': '贵港市', '桂平市': '贵港市', # 玉林市 '玉州区': '玉林市', '福绵区': '玉林市', '容县': '玉林市', '陆川县': '玉林市', '博白县': '玉林市', '兴业县': '玉林市', '北流市': '玉林市', # 百色市 '右江区': '百色市', '田阳县': '百色市', '田东县': '百色市', '平果县': '百色市', '德保县': '百色市', '那坡县': '百色市', '凌云县': '百色市', '乐业县': '百色市', '田林县': '百色市', '西林县': '百色市', '隆林各族自治县': '百色市', '靖西市': '百色市', # 贺州市 '八步区': '贺州市', '平桂区': '贺州市', '昭平县': '贺州市', '钟山县': '贺州市', '富川瑶族自治县': '贺州市', # 河池市 '金城江区': '河池市', '南丹县': '河池市', '天峨县': '河池市', '凤山县': '河池市', '东兰县': '河池市', '罗城仫佬族自治县': '河池市', '环江毛南族自治县': '河池市', '巴马瑶族自治县': '河池市', '都安瑶族自治县': '河池市', '大化瑶族自治县': '河池市', '宜州市': '河池市', # 来宾市 '兴宾区': '来宾市', '忻城县': '来宾市', '象州县': '来宾市', '武宣县': '来宾市', '金秀瑶族自治县': '来宾市', '合山市': '来宾市', # 崇左市 '江州区': '崇左市', '扶绥县': '崇左市', '宁明县': '崇左市', '龙州县': '崇左市', '大新县': '崇左市', '天等县': '崇左市', '凭祥市': '崇左市', # 特殊值 '高新技术产业开发区': '南宁市', '南宁经济技术开发区': '南宁市', '东盟经济技术开发区': '南宁市', '高新区': '南宁市', '武鸣县': '南宁市', '隆安': '南宁市', '融水县': '柳州市', '融安': '柳州市', '柳江县': '柳州市', '柳邕': '柳州市', '平果': '百色市', '田阳': '百色市', '灵川': '桂林市', '临桂': '桂林市', '龙胜县': '桂林市', '荔浦县': '桂林市', '恭城县': '桂林市', '巴马县': '河池市', '罗城': '河池市', '合浦': '北海市', # 特殊值2 '科园西十路24号': '南宁市', '科园东四路5号': '南宁市', '秀安路13-11号': '南宁市', '科园大道31号财智时代12楼': '南宁市', '新兴工业园创业路6号': '柳州市', '洛维工业集中区': '柳州市', '中马产业园区': '钦州市', '长安工业集中区': '桂林市', '西江四路扶典上冲29号': '梧州市', '田东石化工业': '百色市', '柳太路': '柳州市', '黎塘工业集中区东部产业园': '南宁市', '广西壮族自治区工商行政管理局': '南宁市', '三江县职业中学': '柳州市', '心圩街道办事处高新工业园社区居委会': '南宁市', '司能石油化工有限公司': '柳州市', # 数字代号 '530003': '南宁市' } # 对每个县级行政区划进行处理 for county, city in county_to_city.items(): df = df.withColumn('企业注册地址', when(df['企业注册地址'].contains(county), city).otherwise(df['企业注册地址'])) # 制造业 filter_df1 = df.filter( df["主要业务活动或主要产品1"].like("%制造%") | df["主要业务活动或主要产品1"].like("%生产%") | df["主要业务活动或主要产品1"].like("%产品%") | df["主要业务活动或主要产品1"].like("%工厂%") ) grouped_df1 = filter_df1.groupBy("企业注册地址") result_df1 = grouped_df1.agg( F.count("*").alias("制造业数量") ) # 服务业 filter_df2 = df.filter( df["主要业务活动或主要产品1"].like("%服务%") | df["主要业务活动或主要产品1"].like("%咨询%") | df["主要业务活动或主要产品1"].like("%提供%") | df["主要业务活动或主要产品1"].like("%客户%") | df["主要业务活动或主要产品1"].like("%顾客%") ) grouped_df2 = filter_df2.groupBy("企业注册地址") result_df2 = grouped_df2.agg( F.count("*").alias("服务业数量") ) # 农业 filter_df3 = df.filter( df["主要业务活动或主要产品1"].like("%农业%") | df["主要业务活动或主要产品1"].like("%农田%") | df["主要业务活动或主要产品1"].like("%种植%") | df["主要业务活动或主要产品1"].like("%养殖%") ) grouped_df3 = filter_df3.groupBy("企业注册地址") result_df3 = grouped_df3.agg( F.count("*").alias("农业数量") ) # 金融业 filter_df4 = df.filter( df["主要业务活动或主要产品1"].like("%金融%") | df["主要业务活动或主要产品1"].like("%银行%") | df["主要业务活动或主要产品1"].like("%贷款%") | df["主要业务活动或主要产品1"].like("%投资%") | df["主要业务活动或主要产品1"].like("%金融投资%") ) grouped_df4 = filter_df4.groupBy("企业注册地址") result_df4 = grouped_df4.agg( F.count("*").alias("金融业数量") ) # IT行业 filter_df5 = df.filter( df["主要业务活动或主要产品1"].like("%技术%") | df["主要业务活动或主要产品1"].like("%IT%") | df["主要业务活动或主要产品1"].like("%软件%") | df["主要业务活动或主要产品1"].like("%信息技术%") | df["主要业务活动或主要产品1"].like("%开发%") | df["主要业务活动或主要产品1"].like("%应用程序%") ) grouped_df5 = filter_df5.groupBy("企业注册地址") result_df5 = grouped_df5.agg( F.count("*").alias("IT行业数量") ) # 零售业 filter_df6 = df.filter( df["主要业务活动或主要产品1"].like("%零售%") | df["主要业务活动或主要产品1"].like("%销售%") | df["主要业务活动或主要产品1"].like("%商店%") | df["主要业务活动或主要产品1"].like("%商品%") | df["主要业务活动或主要产品1"].like("%购物%") ) grouped_df6 = filter_df6.groupBy("企业注册地址") result_df6 = grouped_df6.agg( F.count("*").alias("零售业数量") ) # 建筑和房地产 filter_df7 = df.filter( df["主要业务活动或主要产品1"].like("%建筑%") | df["主要业务活动或主要产品1"].like("%房地产%") | df["主要业务活动或主要产品1"].like("%工程%") | df["主要业务活动或主要产品1"].like("%租赁%") | df["主要业务活动或主要产品1"].like("%房产%") ) grouped_df7 = filter_df7.groupBy("企业注册地址") result_df7 = grouped_df7.agg( F.count("*").alias("建筑和房地产数量") ) # 媒体和娱乐业 filter_df8 = df.filter( df["主要业务活动或主要产品1"].like("%媒体%") | df["主要业务活动或主要产品1"].like("%广告%") | df["主要业务活动或主要产品1"].like("%娱乐%") | df["主要业务活动或主要产品1"].like("%节目%") | df["主要业务活动或主要产品1"].like("%体育%") ) grouped_df8 = filter_df8.groupBy("企业注册地址") result_df8 = grouped_df8.agg( F.count("*").alias("媒体和娱乐业数量") ) # 制造业 data1 = result_df1.select("企业注册地址", "制造业数量").collect() region_names1 = [row["企业注册地址"] for row in data1] enterprise_counts1 = [row["制造业数量"] for row in data1] # 服务业 data2 = result_df2.select("企业注册地址", "服务业数量").collect() region_names2 = [row["企业注册地址"] for row in data2] enterprise_counts2 = [row["服务业数量"] for row in data2] # 农业 data3 = result_df3.select("企业注册地址", "农业数量").collect() region_names3 = [row["企业注册地址"] for row in data3] enterprise_counts3 = [row["农业数量"] for row in data3] # 金融业 data4 = result_df4.select("企业注册地址", "金融业数量").collect() region_names4 = [row["企业注册地址"] for row in data4] enterprise_counts4 = [row["金融业数量"] for row in data4] # IT行业 data5 = result_df5.select("企业注册地址", "IT行业数量").collect() region_names5 = [row["企业注册地址"] for row in data5] enterprise_counts5 = [row["IT行业数量"] for row in data5] # 零售业 data6 = result_df6.select("企业注册地址", "零售业数量").collect() region_names6 = [row["企业注册地址"] for row in data6] enterprise_counts6 = [row["零售业数量"] for row in data6] # 建筑和房地产 data7 = result_df7.select("企业注册地址", "建筑和房地产数量").collect() region_names7 = [row["企业注册地址"] for row in data7] enterprise_counts7 = [row["建筑和房地产数量"] for row in data7] # 媒体和娱乐业 data8 = result_df8.select("企业注册地址", "媒体和娱乐业数量").collect() region_names8 = [row["企业注册地址"] for row in data8] enterprise_counts8 = [row["媒体和娱乐业数量"] for row in data8] # 创建一个 Map 图表对象,并设置地图的基本属性 map_chart1 = ( Map() .add("制造业", list(zip(region_names1, enterprise_counts1)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西制造业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts1)), ) ) map_chart2 = ( Map() .add("服务业", list(zip(region_names2, enterprise_counts2)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西服务业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts2)), ) ) map_chart3 = ( Map() .add("农业", list(zip(region_names3, enterprise_counts3)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西农业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts3)), ) ) map_chart4 = ( Map() .add("金融业", list(zip(region_names4, enterprise_counts4)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西金融业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts4)), ) ) map_chart5 = ( Map() .add("IT行业", list(zip(region_names5, enterprise_counts5)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西IT行业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts5)), ) ) map_chart6 = ( Map() .add("零售业", list(zip(region_names6, enterprise_counts6)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西零售业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts6)), ) ) map_chart7 = ( Map() .add("建筑和房地产", list(zip(region_names7, enterprise_counts7)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西建筑和房地产企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts7)), ) ) map_chart8 = ( Map() .add("媒体和娱乐业", list(zip(region_names8, enterprise_counts8)), "广西") .set_global_opts( title_opts=opts.TitleOpts(title="2018年广西媒体和娱乐业企业数量分布图"), visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts8)), ) ) page = Page(layout=Page.SimplePageLayout) page.add(map_chart1) page.add(map_chart2) page.add(map_chart3) page.add(map_chart4) page.add(map_chart5) page.add(map_chart6) page.add(map_chart7) page.add(map_chart8) return page