You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 lines
17 KiB

import happybase
from pyecharts.charts import Bar, Pie, Page, Map
from pyecharts.commons.utils import JsCode
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, Row
import os
from pyspark.sql.functions import when, count
from pyecharts import options as opts
# 指定python环境
os.environ['PYSPARK_PYTHON'] = 'D:\python\python3.11.4\python.exe'
os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的
def cky_code03():
# 创建Spark会话
spark = SparkSession.builder.appName("HBaseDataLoading").master('local').getOrCreate()
# 连接到HBase
connection = happybase.Connection('192.168.23.128')
# 获取表
table = connection.table('bigdata')
# 定义列名
columns = ['info:企业注册地址', 'info:主要业务活动或主要产品1']
# 查询数据
data = []
for key, row in table.scan(columns=[col.encode('utf-8') for col in columns]):
row_data = {
'序号': key.decode(),
'企业注册地址': row['info:企业注册地址'.encode('utf-8')].decode(),
'主要业务活动或主要产品1': row['info:主要业务活动或主要产品1'.encode('utf-8')].decode()
}
data.append(row_data)
# 关闭连接
connection.close()
# 将数据转换为Spark DataFrame
df = spark.createDataFrame([Row(**{k: str(v) for k, v in i.items()}) for i in data])
# 广西的14个市级名称
cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市',
'贺州市', '河池市', '来宾市', '崇左市']
# 对每个市进行处理
for city in cities:
df = df.withColumn('企业注册地址',
when(df['企业注册地址'].contains(city[:-1]), city).otherwise(df['企业注册地址']))
# 广西的所有县级行政区划及其对应的市级行政区划
county_to_city = {
# 南宁市
'兴宁区': '南宁市',
'青秀区': '南宁市',
'江南区': '南宁市',
'西乡塘区': '南宁市',
'良庆区': '南宁市',
'邕宁区': '南宁市',
'武鸣区': '南宁市',
'隆安县': '南宁市',
'马山县': '南宁市',
'上林县': '南宁市',
'宾阳县': '南宁市',
'横县': '南宁市',
# 柳州市
'城中区': '柳州市',
'鱼峰区': '柳州市',
'柳南区': '柳州市',
'柳北区': '柳州市',
'柳江区': '柳州市',
'柳城县': '柳州市',
'鹿寨县': '柳州市',
'融安县': '柳州市',
'融水苗族自治县': '柳州市',
'三江侗族自治县': '柳州市',
# 桂林市
'秀峰区': '桂林市',
'叠彩区': '桂林市',
'象山区': '桂林市',
'七星区': '桂林市',
'雁山区': '桂林市',
'临桂区': '桂林市',
'阳朔县': '桂林市',
'灵川县': '桂林市',
'全州县': '桂林市',
'兴安县': '桂林市',
'永福县': '桂林市',
'灌阳县': '桂林市',
'龙胜各族自治县': '桂林市',
'资源县': '桂林市',
'平乐县': '桂林市',
'荔浦市': '桂林市',
'恭城瑶族自治县': '桂林市',
# 梧州市
'万秀区': '梧州市',
'长洲区': '梧州市',
'龙圩区': '梧州市',
'苍梧县': '梧州市',
'藤县': '梧州市',
'蒙山县': '梧州市',
'岑溪市': '梧州市',
# 北海市
'海城区': '北海市',
'银海区': '北海市',
'铁山港区': '北海市',
'合浦县': '北海市',
# 防城港市
'港口区': '防城港市',
'防城区': '防城港市',
'上思县': '防城港市',
'东兴市': '防城港市',
# 钦州市
'钦南区': '钦州市',
'钦北区': '钦州市',
'灵山县': '钦州市',
'浦北县': '钦州市',
# 贵港市
'港北区': '贵港市',
'港南区': '贵港市',
'覃塘区': '贵港市',
'平南县': '贵港市',
'桂平市': '贵港市',
# 玉林市
'玉州区': '玉林市',
'福绵区': '玉林市',
'容县': '玉林市',
'陆川县': '玉林市',
'博白县': '玉林市',
'兴业县': '玉林市',
'北流市': '玉林市',
# 百色市
'右江区': '百色市',
'田阳县': '百色市',
'田东县': '百色市',
'平果县': '百色市',
'德保县': '百色市',
'那坡县': '百色市',
'凌云县': '百色市',
'乐业县': '百色市',
'田林县': '百色市',
'西林县': '百色市',
'隆林各族自治县': '百色市',
'靖西市': '百色市',
# 贺州市
'八步区': '贺州市',
'平桂区': '贺州市',
'昭平县': '贺州市',
'钟山县': '贺州市',
'富川瑶族自治县': '贺州市',
# 河池市
'金城江区': '河池市',
'南丹县': '河池市',
'天峨县': '河池市',
'凤山县': '河池市',
'东兰县': '河池市',
'罗城仫佬族自治县': '河池市',
'环江毛南族自治县': '河池市',
'巴马瑶族自治县': '河池市',
'都安瑶族自治县': '河池市',
'大化瑶族自治县': '河池市',
'宜州市': '河池市',
# 来宾市
'兴宾区': '来宾市',
'忻城县': '来宾市',
'象州县': '来宾市',
'武宣县': '来宾市',
'金秀瑶族自治县': '来宾市',
'合山市': '来宾市',
# 崇左市
'江州区': '崇左市',
'扶绥县': '崇左市',
'宁明县': '崇左市',
'龙州县': '崇左市',
'大新县': '崇左市',
'天等县': '崇左市',
'凭祥市': '崇左市',
# 特殊值
'高新技术产业开发区': '南宁市',
'南宁经济技术开发区': '南宁市',
'东盟经济技术开发区': '南宁市',
'高新区': '南宁市',
'武鸣县': '南宁市',
'隆安': '南宁市',
'融水县': '柳州市',
'融安': '柳州市',
'柳江县': '柳州市',
'柳邕': '柳州市',
'平果': '百色市',
'田阳': '百色市',
'灵川': '桂林市',
'临桂': '桂林市',
'龙胜县': '桂林市',
'荔浦县': '桂林市',
'恭城县': '桂林市',
'巴马县': '河池市',
'罗城': '河池市',
'合浦': '北海市',
# 特殊值2
'科园西十路24号': '南宁市',
'科园东四路5号': '南宁市',
'秀安路13-11号': '南宁市',
'科园大道31号财智时代12楼': '南宁市',
'新兴工业园创业路6号': '柳州市',
'洛维工业集中区': '柳州市',
'中马产业园区': '钦州市',
'长安工业集中区': '桂林市',
'西江四路扶典上冲29号': '梧州市',
'田东石化工业': '百色市',
'柳太路': '柳州市',
'黎塘工业集中区东部产业园': '南宁市',
'广西壮族自治区工商行政管理局': '南宁市',
'三江县职业中学': '柳州市',
'心圩街道办事处高新工业园社区居委会': '南宁市',
'司能石油化工有限公司': '柳州市',
# 数字代号
'530003': '南宁市'
}
# 对每个县级行政区划进行处理
for county, city in county_to_city.items():
df = df.withColumn('企业注册地址',
when(df['企业注册地址'].contains(county), city).otherwise(df['企业注册地址']))
# 制造业
filter_df1 = df.filter(
df["主要业务活动或主要产品1"].like("%制造%") |
df["主要业务活动或主要产品1"].like("%生产%") |
df["主要业务活动或主要产品1"].like("%产品%") |
df["主要业务活动或主要产品1"].like("%工厂%")
)
grouped_df1 = filter_df1.groupBy("企业注册地址")
result_df1 = grouped_df1.agg(
F.count("*").alias("制造业数量")
)
# 服务业
filter_df2 = df.filter(
df["主要业务活动或主要产品1"].like("%服务%") |
df["主要业务活动或主要产品1"].like("%咨询%") |
df["主要业务活动或主要产品1"].like("%提供%") |
df["主要业务活动或主要产品1"].like("%客户%") |
df["主要业务活动或主要产品1"].like("%顾客%")
)
grouped_df2 = filter_df2.groupBy("企业注册地址")
result_df2 = grouped_df2.agg(
F.count("*").alias("服务业数量")
)
# 农业
filter_df3 = df.filter(
df["主要业务活动或主要产品1"].like("%农业%") |
df["主要业务活动或主要产品1"].like("%农田%") |
df["主要业务活动或主要产品1"].like("%种植%") |
df["主要业务活动或主要产品1"].like("%养殖%")
)
grouped_df3 = filter_df3.groupBy("企业注册地址")
result_df3 = grouped_df3.agg(
F.count("*").alias("农业数量")
)
# 金融业
filter_df4 = df.filter(
df["主要业务活动或主要产品1"].like("%金融%") |
df["主要业务活动或主要产品1"].like("%银行%") |
df["主要业务活动或主要产品1"].like("%贷款%") |
df["主要业务活动或主要产品1"].like("%投资%") |
df["主要业务活动或主要产品1"].like("%金融投资%")
)
grouped_df4 = filter_df4.groupBy("企业注册地址")
result_df4 = grouped_df4.agg(
F.count("*").alias("金融业数量")
)
# IT行业
filter_df5 = df.filter(
df["主要业务活动或主要产品1"].like("%技术%") |
df["主要业务活动或主要产品1"].like("%IT%") |
df["主要业务活动或主要产品1"].like("%软件%") |
df["主要业务活动或主要产品1"].like("%信息技术%") |
df["主要业务活动或主要产品1"].like("%开发%") |
df["主要业务活动或主要产品1"].like("%应用程序%")
)
grouped_df5 = filter_df5.groupBy("企业注册地址")
result_df5 = grouped_df5.agg(
F.count("*").alias("IT行业数量")
)
# 零售业
filter_df6 = df.filter(
df["主要业务活动或主要产品1"].like("%零售%") |
df["主要业务活动或主要产品1"].like("%销售%") |
df["主要业务活动或主要产品1"].like("%商店%") |
df["主要业务活动或主要产品1"].like("%商品%") |
df["主要业务活动或主要产品1"].like("%购物%")
)
grouped_df6 = filter_df6.groupBy("企业注册地址")
result_df6 = grouped_df6.agg(
F.count("*").alias("零售业数量")
)
# 建筑和房地产
filter_df7 = df.filter(
df["主要业务活动或主要产品1"].like("%建筑%") |
df["主要业务活动或主要产品1"].like("%房地产%") |
df["主要业务活动或主要产品1"].like("%工程%") |
df["主要业务活动或主要产品1"].like("%租赁%") |
df["主要业务活动或主要产品1"].like("%房产%")
)
grouped_df7 = filter_df7.groupBy("企业注册地址")
result_df7 = grouped_df7.agg(
F.count("*").alias("建筑和房地产数量")
)
# 媒体和娱乐业
filter_df8 = df.filter(
df["主要业务活动或主要产品1"].like("%媒体%") |
df["主要业务活动或主要产品1"].like("%广告%") |
df["主要业务活动或主要产品1"].like("%娱乐%") |
df["主要业务活动或主要产品1"].like("%节目%") |
df["主要业务活动或主要产品1"].like("%体育%")
)
grouped_df8 = filter_df8.groupBy("企业注册地址")
result_df8 = grouped_df8.agg(
F.count("*").alias("媒体和娱乐业数量")
)
# 制造业
data1 = result_df1.select("企业注册地址", "制造业数量").collect()
region_names1 = [row["企业注册地址"] for row in data1]
enterprise_counts1 = [row["制造业数量"] for row in data1]
# 服务业
data2 = result_df2.select("企业注册地址", "服务业数量").collect()
region_names2 = [row["企业注册地址"] for row in data2]
enterprise_counts2 = [row["服务业数量"] for row in data2]
# 农业
data3 = result_df3.select("企业注册地址", "农业数量").collect()
region_names3 = [row["企业注册地址"] for row in data3]
enterprise_counts3 = [row["农业数量"] for row in data3]
# 金融业
data4 = result_df4.select("企业注册地址", "金融业数量").collect()
region_names4 = [row["企业注册地址"] for row in data4]
enterprise_counts4 = [row["金融业数量"] for row in data4]
# IT行业
data5 = result_df5.select("企业注册地址", "IT行业数量").collect()
region_names5 = [row["企业注册地址"] for row in data5]
enterprise_counts5 = [row["IT行业数量"] for row in data5]
# 零售业
data6 = result_df6.select("企业注册地址", "零售业数量").collect()
region_names6 = [row["企业注册地址"] for row in data6]
enterprise_counts6 = [row["零售业数量"] for row in data6]
# 建筑和房地产
data7 = result_df7.select("企业注册地址", "建筑和房地产数量").collect()
region_names7 = [row["企业注册地址"] for row in data7]
enterprise_counts7 = [row["建筑和房地产数量"] for row in data7]
# 媒体和娱乐业
data8 = result_df8.select("企业注册地址", "媒体和娱乐业数量").collect()
region_names8 = [row["企业注册地址"] for row in data8]
enterprise_counts8 = [row["媒体和娱乐业数量"] for row in data8]
# 创建一个 Map 图表对象,并设置地图的基本属性
map_chart1 = (
Map()
.add("制造业", list(zip(region_names1, enterprise_counts1)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西制造业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts1)),
)
)
map_chart2 = (
Map()
.add("服务业", list(zip(region_names2, enterprise_counts2)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西服务业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts2)),
)
)
map_chart3 = (
Map()
.add("农业", list(zip(region_names3, enterprise_counts3)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西农业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts3)),
)
)
map_chart4 = (
Map()
.add("金融业", list(zip(region_names4, enterprise_counts4)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西金融业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts4)),
)
)
map_chart5 = (
Map()
.add("IT行业", list(zip(region_names5, enterprise_counts5)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西IT行业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts5)),
)
)
map_chart6 = (
Map()
.add("零售业", list(zip(region_names6, enterprise_counts6)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西零售业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts6)),
)
)
map_chart7 = (
Map()
.add("建筑和房地产", list(zip(region_names7, enterprise_counts7)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西建筑和房地产企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts7)),
)
)
map_chart8 = (
Map()
.add("媒体和娱乐业", list(zip(region_names8, enterprise_counts8)), "广西")
.set_global_opts(
title_opts=opts.TitleOpts(title="2018年广西媒体和娱乐业企业数量分布图"),
visualmap_opts=opts.VisualMapOpts(max_=max(enterprise_counts8)),
)
)
page = Page(layout=Page.SimplePageLayout)
page.add(map_chart1)
page.add(map_chart2)
page.add(map_chart3)
page.add(map_chart4)
page.add(map_chart5)
page.add(map_chart6)
page.add(map_chart7)
page.add(map_chart8)
return page