from pyspark.sql import functions as F from pyspark.sql.functions import when from pyecharts.charts import Bar from pyecharts.charts import Line from pyecharts import options as opts from 大数据.读取Hbase函数 import HBaseDataLoading def create_line_bar(): # 需要的列族:列限定符 column = ['info:地址', 'info:本公司是否为上市(挂牌)企业主体', 'info:工业总产值(当年价格)', 'info:从业人员期末人数', 'info:其中:博士', 'info:其中:硕士', 'info:具有大学本科学历(位)人员', 'info:具有大学专科学历人员'] df = HBaseDataLoading(column) # 过滤出"主要外资来源国别或地区代码"列不为空的行 # df = df.filter(F.col("本公司是否为上市(挂牌)企业主体") != 0) # 上市公司数量 # print(df.count()) # 广西的14个市级名称 cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市', '贺州市', '河池市', '来宾市', '崇左市'] # 对每个市进行处理 for city in cities: df = df.withColumn('地址', when(df['地址'].contains(city[:-1]), city).otherwise(df['地址'])) # 广西的所有县级行政区划及其对应的市级行政区划 county_to_city = { # 南宁市 '兴宁区': '南宁市', '青秀区': '南宁市', '江南区': '南宁市', '西乡塘区': '南宁市', '良庆区': '南宁市', '邕宁区': '南宁市', '武鸣区': '南宁市', '隆安县': '南宁市', '马山县': '南宁市', '上林县': '南宁市', '宾阳县': '南宁市', '横县': '南宁市', # 柳州市 '城中区': '柳州市', '鱼峰区': '柳州市', '柳南区': '柳州市', '柳北区': '柳州市', '柳江区': '柳州市', '柳城县': '柳州市', '鹿寨县': '柳州市', '融安县': '柳州市', '融水苗族自治县': '柳州市', '三江侗族自治县': '柳州市', # 桂林市 '秀峰区': '桂林市', '叠彩区': '桂林市', '象山区': '桂林市', '七星区': '桂林市', '雁山区': '桂林市', '临桂区': '桂林市', '阳朔县': '桂林市', '灵川县': '桂林市', '全州县': '桂林市', '兴安县': '桂林市', '永福县': '桂林市', '灌阳县': '桂林市', '龙胜各族自治县': '桂林市', '资源县': '桂林市', '平乐县': '桂林市', '荔浦市': '桂林市', '恭城瑶族自治县': '桂林市', # 梧州市 '万秀区': '梧州市', '长洲区': '梧州市', '龙圩区': '梧州市', '苍梧县': '梧州市', '藤县': '梧州市', '蒙山县': '梧州市', '岑溪市': '梧州市', # 北海市 '海城区': '北海市', '银海区': '北海市', '铁山港区': '北海市', '合浦县': '北海市', # 防城港市 '港口区': '防城港市', '防城区': '防城港市', '上思县': '防城港市', '东兴市': '防城港市', # 钦州市 '钦南区': '钦州市', '钦北区': '钦州市', '灵山县': '钦州市', '浦北县': '钦州市', # 贵港市 '港北区': '贵港市', '港南区': '贵港市', '覃塘区': '贵港市', '平南县': '贵港市', '桂平市': '贵港市', # 玉林市 '玉州区': '玉林市', '福绵区': '玉林市', '容县': '玉林市', '陆川县': '玉林市', '博白县': '玉林市', '兴业县': '玉林市', '北流市': '玉林市', # 百色市 '右江区': '百色市', '田阳县': '百色市', '田东县': '百色市', '平果县': '百色市', '德保县': '百色市', '那坡县': '百色市', '凌云县': '百色市', '乐业县': '百色市', '田林县': '百色市', '西林县': '百色市', '隆林各族自治县': '百色市', '靖西市': '百色市', # 贺州市 '八步区': '贺州市', '平桂区': '贺州市', '昭平县': '贺州市', '钟山县': '贺州市', '富川瑶族自治县': '贺州市', # 河池市 '金城江区': '河池市', '南丹县': '河池市', '天峨县': '河池市', '凤山县': '河池市', '东兰县': '河池市', '罗城仫佬族自治县': '河池市', '环江毛南族自治县': '河池市', '巴马瑶族自治县': '河池市', '都安瑶族自治县': '河池市', '大化瑶族自治县': '河池市', '宜州市': '河池市', # 来宾市 '兴宾区': '来宾市', '忻城县': '来宾市', '象州县': '来宾市', '武宣县': '来宾市', '金秀瑶族自治县': '来宾市', '合山市': '来宾市', # 崇左市 '江州区': '崇左市', '扶绥县': '崇左市', '宁明县': '崇左市', '龙州县': '崇左市', '大新县': '崇左市', '天等县': '崇左市', '凭祥市': '崇左市', # 特殊值 '高新技术产业开发区': '南宁市', '南宁经济技术开发区': '南宁市', '东盟经济技术开发区': '南宁市', '高新区': '南宁市', '武鸣县': '南宁市', '隆安': '南宁市', '融水县': '柳州市', '融安': '柳州市', '柳江县': '柳州市', '柳邕': '柳州市', '平果': '百色市', '田阳': '百色市', '灵川': '桂林市', '临桂': '桂林市', '龙胜县': '桂林市', '荔浦县': '桂林市', '恭城县': '桂林市', '巴马县': '河池市', '罗城': '河池市', '合浦': '北海市', # 特殊值2 '科园西十路24号': '南宁市', '科园东四路5号': '南宁市', '秀安路13-11号': '南宁市', '科园大道31号财智时代12楼': '南宁市', '新兴工业园创业路6号': '柳州市', '洛维工业集中区': '柳州市', '中马产业园区': '钦州市', '长安工业集中区': '桂林市', '西江四路扶典上冲29号': '梧州市', '田东石化工业': '百色市', } # 对每个县级行政区划进行处理 for county, city in county_to_city.items(): df = df.withColumn('地址', when(df['地址'].contains(county), city).otherwise(df['地址'])) # 转成整型 df = df.withColumn("工业总产值(当年价格)", df["工业总产值(当年价格)"].cast('integer')) # 按市分组,计算每个市的工业总产值和人员分布 df_grouped = df.groupBy('地址').agg( {"工业总产值(当年价格)": "sum", "从业人员期末人数": "sum", "其中:博士": "sum", "其中:硕士": "sum", "具有大学本科学历(位)人员": "sum", "具有大学专科学历人员": "sum"} ) # 改名 df_grouped = df_grouped.withColumnRenamed('sum(工业总产值(当年价格))', '工业总产值') df_grouped = df_grouped.withColumnRenamed('sum(从业人员期末人数)', '从业人员期末人数') df_grouped = df_grouped.withColumnRenamed('sum(其中:博士)', '博士总数') df_grouped = df_grouped.withColumnRenamed('sum(其中:硕士)', '硕士总数') df_grouped = df_grouped.withColumnRenamed('sum(具有大学本科学历(位)人员)', '本科生总数') df_grouped = df_grouped.withColumnRenamed('sum(具有大学专科学历人员)', '专科生总数') # 排序 df_grouped_sorted1 = df_grouped.orderBy('工业总产值') df_grouped_sorted2 = df_grouped.orderBy('从业人员期末人数') # 输出 df_grouped.show() # 可视化 # 提取x轴和y轴的数据 x_data = [row['地址'] for row in df_grouped_sorted1.collect()] y_data = [round(row['工业总产值'] / 100000, 2) for row in df_grouped_sorted1.collect()] x_data1 = [row['地址'] for row in df_grouped_sorted2.collect()] y_data1 = [row['博士总数'] for row in df_grouped_sorted2.collect()] y_data2 = [row['硕士总数'] for row in df_grouped_sorted2.collect()] y_data3 = [row['本科生总数'] for row in df_grouped_sorted2.collect()] y_data4 = [row['专科生总数'] for row in df_grouped_sorted2.collect()] # 创建折线图 line = ( Line() .add_xaxis(x_data) .add_yaxis("工业总产值(亿元)", y_data) .set_global_opts( xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(interval=0)) # 调整x轴标签间隔) ) ) # 创建柱状图 bar = ( Bar() .add_xaxis(x_data1) .add_yaxis("专科", y_data4, stack="stack1") .add_yaxis("本科", y_data3, stack="stack1") .add_yaxis("硕士", y_data2, stack="stack1") .add_yaxis("博士", y_data1, stack="stack1") .set_series_opts(label_opts=opts.LabelOpts(is_show=False)) .set_global_opts( xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(interval=0)) # 调整x轴标签间隔 # xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30)) # 调整x轴标签角度 ) ) # 返回对象 return line, bar if __name__ == "__main__": create_line_bar()