You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
12 KiB

11 months ago
import happybase
from pyspark.sql import functions as F
from pyecharts.charts import Bar
from pyspark.sql import SparkSession, Row
import os
from pyecharts import options as opts
from pyspark.sql.functions import when, regexp_extract
# 指定python环境
os.environ['PYSPARK_PYTHON'] = 'D:\python\python3.11.4\python.exe'
os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的
# - 创新能力:通过分析高新技术企业的数量,以及这些企业的研发投入和技术收入,我们可以了解当地的创新能力。
def lyh_code01():
# 创建Spark会话
spark = SparkSession.builder.appName("HBaseDataLoading").master('local').getOrCreate()
# 连接到HBase
connection = happybase.Connection('192.168.23.128')
# 获取表
table = connection.table('bigdata')
# 定义列名
columns = ['info:地址', 'info:其中:研发、试验检验费', 'info:其中:技术(研究)开发费',
'info:其中:技术转让收入','info:技术承包收入','info:技术咨询与服务收入','info:其中:技术收入']
# 查询数据
data = []
for key, row in table.scan(columns=[col.encode('utf-8') for col in columns]):
row_data = {
'序号': key.decode(),
'地址': row['info:地址'.encode('utf-8')].decode(),
'其中:研发、试验检验费': row['info:其中:研发、试验检验费'.encode('utf-8')].decode(),
'其中:技术(研究)开发费': row['info:其中:技术(研究)开发费'.encode('utf-8')].decode(),
'其中:技术转让收入': row['info:其中:技术转让收入'.encode('utf-8')].decode(),
'技术承包收入': row['info:技术承包收入'.encode('utf-8')].decode(),
'技术咨询与服务收入': row['info:技术咨询与服务收入'.encode('utf-8')].decode(),
'其中:技术收入': row['info:其中:技术收入'.encode('utf-8')].decode(),
}
data.append(row_data)
# 关闭连接
connection.close()
# 将数据转换为Spark DataFrame
df = spark.createDataFrame([Row(**{k: str(v) for k, v in i.items()}) for i in data])
df = df.filter(~df['地址'].isin(['NULL', 'qingxiubgs2014@sina.com', '防城市', 'saddress']))
# 创建一个新列 '研发投入额',包含 '其中:研发、试验检验费' 和 '其中:技术(研究)开发费' 的总和
df = df.withColumn('研发投入额', F.col('其中:研发、试验检验费') + F.col('其中:技术(研究)开发费'))
df = df.withColumn('技术收入总额',
F.col('其中:技术收入') + F.col('其中:技术转让收入') + F.col('技术承包收入') + F.col(
'技术咨询与服务收入'))
# 去除重复项
df = df.dropDuplicates()
# 需要的列名
df = df.select('地址', '研发投入额', '技术收入总额')
# 创建一个正则表达式,匹配"市"及其之前的所有字符
pattern = "(.*市)"
# 使用regexp_extract函数提取地址中的城市名如果没有匹配到则保持原始地址不变
df = df.withColumn('地址',
when(df['地址'].rlike(pattern), regexp_extract(df['地址'], pattern, 1)).otherwise(df['地址']))
# 检查"地址"列是否以"广西"开头,如果是,则去掉前两个字符,否则保持原始地址不变
df = df.withColumn('地址', when(df['地址'].startswith('广西'), df['地址'].substr(3, 50)).otherwise(df['地址']))
# 广西的14个市级名称
cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市',
'贺州市', '河池市', '来宾市', '崇左市']
# 对每个市进行处理
for city in cities:
df = df.withColumn('地址', when(df['地址'].contains(city[:-1]), city).otherwise(df['地址']))
# 广西的所有县级行政区划及其对应的市级行政区划
county_to_city = {
# 南宁市
'兴宁区': '南宁市',
'青秀区': '南宁市',
'江南区': '南宁市',
'西乡塘区': '南宁市',
'良庆区': '南宁市',
'邕宁区': '南宁市',
'武鸣区': '南宁市',
'隆安县': '南宁市',
'马山县': '南宁市',
'上林县': '南宁市',
'宾阳县': '南宁市',
'横县': '南宁市',
# 柳州市
'城中区': '柳州市',
'鱼峰区': '柳州市',
'柳南区': '柳州市',
'柳北区': '柳州市',
'柳江区': '柳州市',
'柳城县': '柳州市',
'鹿寨县': '柳州市',
'融安县': '柳州市',
'融水苗族自治县': '柳州市',
'三江侗族自治县': '柳州市',
# 桂林市
'秀峰区': '桂林市',
'叠彩区': '桂林市',
'象山区': '桂林市',
'七星区': '桂林市',
'雁山区': '桂林市',
'临桂区': '桂林市',
'阳朔县': '桂林市',
'灵川县': '桂林市',
'全州县': '桂林市',
'兴安县': '桂林市',
'永福县': '桂林市',
'灌阳县': '桂林市',
'龙胜各族自治县': '桂林市',
'资源县': '桂林市',
'平乐县': '桂林市',
'荔浦市': '桂林市',
'恭城瑶族自治县': '桂林市',
# 梧州市
'万秀区': '梧州市',
'长洲区': '梧州市',
'龙圩区': '梧州市',
'苍梧县': '梧州市',
'藤县': '梧州市',
'蒙山县': '梧州市',
'岑溪市': '梧州市',
# 北海市
'海城区': '北海市',
'银海区': '北海市',
'铁山港区': '北海市',
'合浦县': '北海市',
# 防城港市
'港口区': '防城港市',
'防城区': '防城港市',
'上思县': '防城港市',
'东兴市': '防城港市',
# 钦州市
'钦南区': '钦州市',
'钦北区': '钦州市',
'灵山县': '钦州市',
'浦北县': '钦州市',
# 贵港市
'港北区': '贵港市',
'港南区': '贵港市',
'覃塘区': '贵港市',
'平南县': '贵港市',
'桂平市': '贵港市',
# 玉林市
'玉州区': '玉林市',
'福绵区': '玉林市',
'容县': '玉林市',
'陆川县': '玉林市',
'博白县': '玉林市',
'兴业县': '玉林市',
'北流市': '玉林市',
# 百色市
'右江区': '百色市',
'田阳县': '百色市',
'田东县': '百色市',
'平果县': '百色市',
'德保县': '百色市',
'那坡县': '百色市',
'凌云县': '百色市',
'乐业县': '百色市',
'田林县': '百色市',
'西林县': '百色市',
'隆林各族自治县': '百色市',
'靖西市': '百色市',
# 贺州市
'八步区': '贺州市',
'平桂区': '贺州市',
'昭平县': '贺州市',
'钟山县': '贺州市',
'富川瑶族自治县': '贺州市',
# 河池市
'金城江区': '河池市',
'南丹县': '河池市',
'天峨县': '河池市',
'凤山县': '河池市',
'东兰县': '河池市',
'罗城仫佬族自治县': '河池市',
'环江毛南族自治县': '河池市',
'巴马瑶族自治县': '河池市',
'都安瑶族自治县': '河池市',
'大化瑶族自治县': '河池市',
'宜州市': '河池市',
# 来宾市
'兴宾区': '来宾市',
'忻城县': '来宾市',
'象州县': '来宾市',
'武宣县': '来宾市',
'金秀瑶族自治县': '来宾市',
'合山市': '来宾市',
# 崇左市
'江州区': '崇左市',
'扶绥县': '崇左市',
'宁明县': '崇左市',
'龙州县': '崇左市',
'大新县': '崇左市',
'天等县': '崇左市',
'凭祥市': '崇左市',
# 特殊值
'高新技术产业开发区': '南宁市',
'南宁经济技术开发区': '南宁市',
'东盟经济技术开发区': '南宁市',
'高新区': '南宁市',
'武鸣县': '南宁市',
'隆安': '南宁市',
'融水县': '柳州市',
'融安': '柳州市',
'柳江县': '柳州市',
'柳邕': '柳州市',
'平果': '百色市',
'田阳': '百色市',
'灵川': '桂林市',
'临桂': '桂林市',
'龙胜县': '桂林市',
'荔浦县': '桂林市',
'恭城县': '桂林市',
'巴马县': '河池市',
'罗城': '河池市',
'合浦': '北海市',
# 特殊值2
'科园西十路24号': '南宁市',
'科园东四路5号': '南宁市',
'秀安路13-11号': '南宁市',
'科园大道31号财智时代12楼': '南宁市',
'新兴工业园创业路6号': '柳州市',
'洛维工业集中区': '柳州市',
'中马产业园区': '钦州市',
'长安工业集中区': '桂林市',
'西江四路扶典上冲29号': '梧州市',
'田东石化工业': '百色市',
'防城市': '防城港市'
}
# 对每个县级行政区划进行处理
for county, city in county_to_city.items():
df = df.withColumn('地址', when(df['地址'].contains(county), city).otherwise(df['地址']))
# 地域经济活力分析
df_geo_analysis = df.groupBy('地址').agg(
F.count('研发投入额').alias('企业数量'),
F.round(F.sum('研发投入额')).alias('总研发投入额'),
F.round(F.sum('技术收入总额')).alias('总技术收入额') # 这里确保使用正确的列名 '技术收入总额'
)
# 按企业数量降序排列
df_geo_analysis = df_geo_analysis.orderBy('企业数量', ascending=False)
# 将结果转换为Pandas DataFrame
pandas_geo_df = df_geo_analysis.toPandas()
# 假设你想使用相等的权重,即 alpha = beta = 1
alpha = 1
beta = 1
# 计算各指标的最大值
max_count = df_geo_analysis.agg(F.max('企业数量')).collect()[0][0]
max_rd_investment = df_geo_analysis.agg(F.max('总研发投入额')).collect()[0][0]
max_tech_income = df_geo_analysis.agg(F.max('总技术收入额')).collect()[0][0]
# 添加新列 '创新能力',并保留五位小数
df_geo_analysis = df_geo_analysis.withColumn(
'创新能力',
F.round((F.col('企业数量') + alpha * F.col('总研发投入额') + beta * F.col('总技术收入额')) /
(max_count + alpha * max_rd_investment + beta * max_tech_income), 5)
)
# 将结果转换为Pandas DataFrame包括 '创新能力' 列
pandas_geo_df = df_geo_analysis.toPandas()
# 排序 DataFrame按照创新能力降序排列
sorted_pandas_geo_df = pandas_geo_df.sort_values(by='创新能力', ascending=False)
# 使用过滤后的数据创建 Pyecharts 条形图
lyh_code01_bar = (
Bar()
.add_xaxis(sorted_pandas_geo_df['地址'].tolist())
.add_yaxis('创新能力', sorted_pandas_geo_df['创新能力'].tolist(), label_opts=opts.LabelOpts(position='inside'))
.set_series_opts(label_opts=opts.LabelOpts(is_show=True, position='top', formatter='{c}'))
.set_global_opts(
title_opts=opts.TitleOpts(title="地域创新能力分析"),
xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30, font_size=10)),
)
)
return lyh_code01_bar