import happybase from pyspark.sql import functions as F from pyecharts.charts import Bar from pyspark.sql import SparkSession, Row import os from pyecharts import options as opts from pyspark.sql.functions import when, regexp_extract # 指定python环境 os.environ['PYSPARK_PYTHON'] = 'D:\python\python3.11.4\python.exe' os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的 # - 创新能力:通过分析高新技术企业的数量,以及这些企业的研发投入和技术收入,我们可以了解当地的创新能力。 def lyh_code01(): # 创建Spark会话 spark = SparkSession.builder.appName("HBaseDataLoading").master('local').getOrCreate() # 连接到HBase connection = happybase.Connection('192.168.23.128') # 获取表 table = connection.table('bigdata') # 定义列名 columns = ['info:地址', 'info:其中:研发、试验检验费', 'info:其中:技术(研究)开发费', 'info:其中:技术转让收入','info:技术承包收入','info:技术咨询与服务收入','info:其中:技术收入'] # 查询数据 data = [] for key, row in table.scan(columns=[col.encode('utf-8') for col in columns]): row_data = { '序号': key.decode(), '地址': row['info:地址'.encode('utf-8')].decode(), '其中:研发、试验检验费': row['info:其中:研发、试验检验费'.encode('utf-8')].decode(), '其中:技术(研究)开发费': row['info:其中:技术(研究)开发费'.encode('utf-8')].decode(), '其中:技术转让收入': row['info:其中:技术转让收入'.encode('utf-8')].decode(), '技术承包收入': row['info:技术承包收入'.encode('utf-8')].decode(), '技术咨询与服务收入': row['info:技术咨询与服务收入'.encode('utf-8')].decode(), '其中:技术收入': row['info:其中:技术收入'.encode('utf-8')].decode(), } data.append(row_data) # 关闭连接 connection.close() # 将数据转换为Spark DataFrame df = spark.createDataFrame([Row(**{k: str(v) for k, v in i.items()}) for i in data]) df = df.filter(~df['地址'].isin(['NULL', 'qingxiubgs2014@sina.com', '防城市', 'saddress'])) # 创建一个新列 '研发投入额',包含 '其中:研发、试验检验费' 和 '其中:技术(研究)开发费' 的总和 df = df.withColumn('研发投入额', F.col('其中:研发、试验检验费') + F.col('其中:技术(研究)开发费')) df = df.withColumn('技术收入总额', F.col('其中:技术收入') + F.col('其中:技术转让收入') + F.col('技术承包收入') + F.col( '技术咨询与服务收入')) # 去除重复项 df = df.dropDuplicates() # 需要的列名 df = df.select('地址', '研发投入额', '技术收入总额') # 创建一个正则表达式,匹配"市"及其之前的所有字符 pattern = "(.*市)" # 使用regexp_extract函数提取地址中的城市名,如果没有匹配到,则保持原始地址不变 df = df.withColumn('地址', when(df['地址'].rlike(pattern), regexp_extract(df['地址'], pattern, 1)).otherwise(df['地址'])) # 检查"地址"列是否以"广西"开头,如果是,则去掉前两个字符,否则保持原始地址不变 df = df.withColumn('地址', when(df['地址'].startswith('广西'), df['地址'].substr(3, 50)).otherwise(df['地址'])) # 广西的14个市级名称 cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市', '贺州市', '河池市', '来宾市', '崇左市'] # 对每个市进行处理 for city in cities: df = df.withColumn('地址', when(df['地址'].contains(city[:-1]), city).otherwise(df['地址'])) # 广西的所有县级行政区划及其对应的市级行政区划 county_to_city = { # 南宁市 '兴宁区': '南宁市', '青秀区': '南宁市', '江南区': '南宁市', '西乡塘区': '南宁市', '良庆区': '南宁市', '邕宁区': '南宁市', '武鸣区': '南宁市', '隆安县': '南宁市', '马山县': '南宁市', '上林县': '南宁市', '宾阳县': '南宁市', '横县': '南宁市', # 柳州市 '城中区': '柳州市', '鱼峰区': '柳州市', '柳南区': '柳州市', '柳北区': '柳州市', '柳江区': '柳州市', '柳城县': '柳州市', '鹿寨县': '柳州市', '融安县': '柳州市', '融水苗族自治县': '柳州市', '三江侗族自治县': '柳州市', # 桂林市 '秀峰区': '桂林市', '叠彩区': '桂林市', '象山区': '桂林市', '七星区': '桂林市', '雁山区': '桂林市', '临桂区': '桂林市', '阳朔县': '桂林市', '灵川县': '桂林市', '全州县': '桂林市', '兴安县': '桂林市', '永福县': '桂林市', '灌阳县': '桂林市', '龙胜各族自治县': '桂林市', '资源县': '桂林市', '平乐县': '桂林市', '荔浦市': '桂林市', '恭城瑶族自治县': '桂林市', # 梧州市 '万秀区': '梧州市', '长洲区': '梧州市', '龙圩区': '梧州市', '苍梧县': '梧州市', '藤县': '梧州市', '蒙山县': '梧州市', '岑溪市': '梧州市', # 北海市 '海城区': '北海市', '银海区': '北海市', '铁山港区': '北海市', '合浦县': '北海市', # 防城港市 '港口区': '防城港市', '防城区': '防城港市', '上思县': '防城港市', '东兴市': '防城港市', # 钦州市 '钦南区': '钦州市', '钦北区': '钦州市', '灵山县': '钦州市', '浦北县': '钦州市', # 贵港市 '港北区': '贵港市', '港南区': '贵港市', '覃塘区': '贵港市', '平南县': '贵港市', '桂平市': '贵港市', # 玉林市 '玉州区': '玉林市', '福绵区': '玉林市', '容县': '玉林市', '陆川县': '玉林市', '博白县': '玉林市', '兴业县': '玉林市', '北流市': '玉林市', # 百色市 '右江区': '百色市', '田阳县': '百色市', '田东县': '百色市', '平果县': '百色市', '德保县': '百色市', '那坡县': '百色市', '凌云县': '百色市', '乐业县': '百色市', '田林县': '百色市', '西林县': '百色市', '隆林各族自治县': '百色市', '靖西市': '百色市', # 贺州市 '八步区': '贺州市', '平桂区': '贺州市', '昭平县': '贺州市', '钟山县': '贺州市', '富川瑶族自治县': '贺州市', # 河池市 '金城江区': '河池市', '南丹县': '河池市', '天峨县': '河池市', '凤山县': '河池市', '东兰县': '河池市', '罗城仫佬族自治县': '河池市', '环江毛南族自治县': '河池市', '巴马瑶族自治县': '河池市', '都安瑶族自治县': '河池市', '大化瑶族自治县': '河池市', '宜州市': '河池市', # 来宾市 '兴宾区': '来宾市', '忻城县': '来宾市', '象州县': '来宾市', '武宣县': '来宾市', '金秀瑶族自治县': '来宾市', '合山市': '来宾市', # 崇左市 '江州区': '崇左市', '扶绥县': '崇左市', '宁明县': '崇左市', '龙州县': '崇左市', '大新县': '崇左市', '天等县': '崇左市', '凭祥市': '崇左市', # 特殊值 '高新技术产业开发区': '南宁市', '南宁经济技术开发区': '南宁市', '东盟经济技术开发区': '南宁市', '高新区': '南宁市', '武鸣县': '南宁市', '隆安': '南宁市', '融水县': '柳州市', '融安': '柳州市', '柳江县': '柳州市', '柳邕': '柳州市', '平果': '百色市', '田阳': '百色市', '灵川': '桂林市', '临桂': '桂林市', '龙胜县': '桂林市', '荔浦县': '桂林市', '恭城县': '桂林市', '巴马县': '河池市', '罗城': '河池市', '合浦': '北海市', # 特殊值2 '科园西十路24号': '南宁市', '科园东四路5号': '南宁市', '秀安路13-11号': '南宁市', '科园大道31号财智时代12楼': '南宁市', '新兴工业园创业路6号': '柳州市', '洛维工业集中区': '柳州市', '中马产业园区': '钦州市', '长安工业集中区': '桂林市', '西江四路扶典上冲29号': '梧州市', '田东石化工业': '百色市', '防城市': '防城港市' } # 对每个县级行政区划进行处理 for county, city in county_to_city.items(): df = df.withColumn('地址', when(df['地址'].contains(county), city).otherwise(df['地址'])) # 地域经济活力分析 df_geo_analysis = df.groupBy('地址').agg( F.count('研发投入额').alias('企业数量'), F.round(F.sum('研发投入额')).alias('总研发投入额'), F.round(F.sum('技术收入总额')).alias('总技术收入额') # 这里确保使用正确的列名 '技术收入总额' ) # 按企业数量降序排列 df_geo_analysis = df_geo_analysis.orderBy('企业数量', ascending=False) # 将结果转换为Pandas DataFrame pandas_geo_df = df_geo_analysis.toPandas() # 假设你想使用相等的权重,即 alpha = beta = 1 alpha = 1 beta = 1 # 计算各指标的最大值 max_count = df_geo_analysis.agg(F.max('企业数量')).collect()[0][0] max_rd_investment = df_geo_analysis.agg(F.max('总研发投入额')).collect()[0][0] max_tech_income = df_geo_analysis.agg(F.max('总技术收入额')).collect()[0][0] # 添加新列 '创新能力',并保留五位小数 df_geo_analysis = df_geo_analysis.withColumn( '创新能力', F.round((F.col('企业数量') + alpha * F.col('总研发投入额') + beta * F.col('总技术收入额')) / (max_count + alpha * max_rd_investment + beta * max_tech_income), 5) ) # 将结果转换为Pandas DataFrame,包括 '创新能力' 列 pandas_geo_df = df_geo_analysis.toPandas() # 排序 DataFrame,按照创新能力降序排列 sorted_pandas_geo_df = pandas_geo_df.sort_values(by='创新能力', ascending=False) # 使用过滤后的数据创建 Pyecharts 条形图 lyh_code01_bar = ( Bar() .add_xaxis(sorted_pandas_geo_df['地址'].tolist()) .add_yaxis('创新能力', sorted_pandas_geo_df['创新能力'].tolist(), label_opts=opts.LabelOpts(position='inside')) .set_series_opts(label_opts=opts.LabelOpts(is_show=True, position='top', formatter='{c}')) .set_global_opts( title_opts=opts.TitleOpts(title="地域创新能力分析"), xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30, font_size=10)), ) ) return lyh_code01_bar