You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
python_bigData/ZJS/各市企业工业总产值和人员分布.py

261 lines
9.7 KiB

from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts import options as opts
from 大数据.读取Hbase函数 import HBaseDataLoading
def create_line_bar():
# 需要的列族:列限定符
column = ['info:地址', 'info:本公司是否为上市(挂牌)企业主体', 'info:工业总产值(当年价格)', 'info:从业人员期末人数',
'info:其中:博士', 'info:其中:硕士', 'info:具有大学本科学历(位)人员', 'info:具有大学专科学历人员']
df = HBaseDataLoading(column)
# 过滤出"主要外资来源国别或地区代码"列不为空的行
# df = df.filter(F.col("本公司是否为上市(挂牌)企业主体") != 0)
# 上市公司数量
# print(df.count())
# 广西的14个市级名称
cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市',
'贺州市', '河池市', '来宾市', '崇左市']
# 对每个市进行处理
for city in cities:
df = df.withColumn('地址', when(df['地址'].contains(city[:-1]), city).otherwise(df['地址']))
# 广西的所有县级行政区划及其对应的市级行政区划
county_to_city = {
# 南宁市
'兴宁区': '南宁市',
'青秀区': '南宁市',
'江南区': '南宁市',
'西乡塘区': '南宁市',
'良庆区': '南宁市',
'邕宁区': '南宁市',
'武鸣区': '南宁市',
'隆安县': '南宁市',
'马山县': '南宁市',
'上林县': '南宁市',
'宾阳县': '南宁市',
'横县': '南宁市',
# 柳州市
'城中区': '柳州市',
'鱼峰区': '柳州市',
'柳南区': '柳州市',
'柳北区': '柳州市',
'柳江区': '柳州市',
'柳城县': '柳州市',
'鹿寨县': '柳州市',
'融安县': '柳州市',
'融水苗族自治县': '柳州市',
'三江侗族自治县': '柳州市',
# 桂林市
'秀峰区': '桂林市',
'叠彩区': '桂林市',
'象山区': '桂林市',
'七星区': '桂林市',
'雁山区': '桂林市',
'临桂区': '桂林市',
'阳朔县': '桂林市',
'灵川县': '桂林市',
'全州县': '桂林市',
'兴安县': '桂林市',
'永福县': '桂林市',
'灌阳县': '桂林市',
'龙胜各族自治县': '桂林市',
'资源县': '桂林市',
'平乐县': '桂林市',
'荔浦市': '桂林市',
'恭城瑶族自治县': '桂林市',
# 梧州市
'万秀区': '梧州市',
'长洲区': '梧州市',
'龙圩区': '梧州市',
'苍梧县': '梧州市',
'藤县': '梧州市',
'蒙山县': '梧州市',
'岑溪市': '梧州市',
# 北海市
'海城区': '北海市',
'银海区': '北海市',
'铁山港区': '北海市',
'合浦县': '北海市',
# 防城港市
'港口区': '防城港市',
'防城区': '防城港市',
'上思县': '防城港市',
'东兴市': '防城港市',
# 钦州市
'钦南区': '钦州市',
'钦北区': '钦州市',
'灵山县': '钦州市',
'浦北县': '钦州市',
# 贵港市
'港北区': '贵港市',
'港南区': '贵港市',
'覃塘区': '贵港市',
'平南县': '贵港市',
'桂平市': '贵港市',
# 玉林市
'玉州区': '玉林市',
'福绵区': '玉林市',
'容县': '玉林市',
'陆川县': '玉林市',
'博白县': '玉林市',
'兴业县': '玉林市',
'北流市': '玉林市',
# 百色市
'右江区': '百色市',
'田阳县': '百色市',
'田东县': '百色市',
'平果县': '百色市',
'德保县': '百色市',
'那坡县': '百色市',
'凌云县': '百色市',
'乐业县': '百色市',
'田林县': '百色市',
'西林县': '百色市',
'隆林各族自治县': '百色市',
'靖西市': '百色市',
# 贺州市
'八步区': '贺州市',
'平桂区': '贺州市',
'昭平县': '贺州市',
'钟山县': '贺州市',
'富川瑶族自治县': '贺州市',
# 河池市
'金城江区': '河池市',
'南丹县': '河池市',
'天峨县': '河池市',
'凤山县': '河池市',
'东兰县': '河池市',
'罗城仫佬族自治县': '河池市',
'环江毛南族自治县': '河池市',
'巴马瑶族自治县': '河池市',
'都安瑶族自治县': '河池市',
'大化瑶族自治县': '河池市',
'宜州市': '河池市',
# 来宾市
'兴宾区': '来宾市',
'忻城县': '来宾市',
'象州县': '来宾市',
'武宣县': '来宾市',
'金秀瑶族自治县': '来宾市',
'合山市': '来宾市',
# 崇左市
'江州区': '崇左市',
'扶绥县': '崇左市',
'宁明县': '崇左市',
'龙州县': '崇左市',
'大新县': '崇左市',
'天等县': '崇左市',
'凭祥市': '崇左市',
# 特殊值
'高新技术产业开发区': '南宁市',
'南宁经济技术开发区': '南宁市',
'东盟经济技术开发区': '南宁市',
'高新区': '南宁市',
'武鸣县': '南宁市',
'隆安': '南宁市',
'融水县': '柳州市',
'融安': '柳州市',
'柳江县': '柳州市',
'柳邕': '柳州市',
'平果': '百色市',
'田阳': '百色市',
'灵川': '桂林市',
'临桂': '桂林市',
'龙胜县': '桂林市',
'荔浦县': '桂林市',
'恭城县': '桂林市',
'巴马县': '河池市',
'罗城': '河池市',
'合浦': '北海市',
# 特殊值2
'科园西十路24号': '南宁市',
'科园东四路5号': '南宁市',
'秀安路13-11号': '南宁市',
'科园大道31号财智时代12楼': '南宁市',
'新兴工业园创业路6号': '柳州市',
'洛维工业集中区': '柳州市',
'中马产业园区': '钦州市',
'长安工业集中区': '桂林市',
'西江四路扶典上冲29号': '梧州市',
'田东石化工业': '百色市',
}
# 对每个县级行政区划进行处理
for county, city in county_to_city.items():
df = df.withColumn('地址', when(df['地址'].contains(county), city).otherwise(df['地址']))
# 转成整型
df = df.withColumn("工业总产值(当年价格)", df["工业总产值(当年价格)"].cast('integer'))
# 按市分组,计算每个市的工业总产值和人员分布
df_grouped = df.groupBy('地址').agg(
{"工业总产值(当年价格)": "sum",
"从业人员期末人数": "sum",
"其中:博士": "sum",
"其中:硕士": "sum",
"具有大学本科学历(位)人员": "sum",
"具有大学专科学历人员": "sum"}
)
# 改名
df_grouped = df_grouped.withColumnRenamed('sum(工业总产值(当年价格))', '工业总产值')
df_grouped = df_grouped.withColumnRenamed('sum(从业人员期末人数)', '从业人员期末人数')
df_grouped = df_grouped.withColumnRenamed('sum(其中:博士)', '博士总数')
df_grouped = df_grouped.withColumnRenamed('sum(其中:硕士)', '硕士总数')
df_grouped = df_grouped.withColumnRenamed('sum(具有大学本科学历(位)人员)', '本科生总数')
df_grouped = df_grouped.withColumnRenamed('sum(具有大学专科学历人员)', '专科生总数')
# 排序
df_grouped_sorted1 = df_grouped.orderBy('工业总产值')
df_grouped_sorted2 = df_grouped.orderBy('从业人员期末人数')
# 输出
df_grouped.show()
# 可视化
# 提取x轴和y轴的数据
x_data = [row['地址'] for row in df_grouped_sorted1.collect()]
y_data = [round(row['工业总产值'] / 100000, 2) for row in df_grouped_sorted1.collect()]
x_data1 = [row['地址'] for row in df_grouped_sorted2.collect()]
y_data1 = [row['博士总数'] for row in df_grouped_sorted2.collect()]
y_data2 = [row['硕士总数'] for row in df_grouped_sorted2.collect()]
y_data3 = [row['本科生总数'] for row in df_grouped_sorted2.collect()]
y_data4 = [row['专科生总数'] for row in df_grouped_sorted2.collect()]
# 创建折线图
line = (
Line()
.add_xaxis(x_data)
.add_yaxis("工业总产值(亿元)", y_data)
.set_global_opts(
xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(interval=0)) # 调整x轴标签间隔)
)
)
# 创建柱状图
bar = (
Bar()
.add_xaxis(x_data1)
.add_yaxis("专科", y_data4, stack="stack1")
.add_yaxis("本科", y_data3, stack="stack1")
.add_yaxis("硕士", y_data2, stack="stack1")
.add_yaxis("博士", y_data1, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(
xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(interval=0)) # 调整x轴标签间隔
# xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30)) # 调整x轴标签角度
)
)
# 返回对象
return line, bar
if __name__ == "__main__":
create_line_bar()