You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

282 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import happybase
from pyspark.sql import SparkSession, Row
import os
from pyspark.sql import functions as F
from pyspark.sql.functions import when, regexp_extract
from pyecharts.charts import Bar
from pyecharts import options as opts
# 指定python环境
os.environ['PYSPARK_PYTHON'] = 'D:\python\python3.11.4\python.exe'
os.environ['JAVA_HOME'] = "D:\jdk\jdk-17.0.8" # 记得把地址改成自己的
# -投资潜力: 通过比较不同地域的企业数量和其资产总计、净利润,你可以评估各地的投资潜力。一些地区可能因为较高的盈利能力或者潜在市场需求而更具吸引力。
def lyh_code03():
# 创建Spark会话
spark = SparkSession.builder.appName("HBaseDataLoading").master('local').getOrCreate()
# 连接到HBase
connection = happybase.Connection('192.168.23.128')
# 获取表
table = connection.table('bigdata')
# 定义列名
columns = ['info:地址', 'info:净利润', 'info:年末资产总计']
# 查询数据
data = []
for key, row in table.scan(columns=[col.encode('utf-8') for col in columns]):
row_data = {
'序号': key.decode(),
'地址': row['info:地址'.encode('utf-8')].decode(),
'净利润': row['info:净利润'.encode('utf-8')].decode(),
'年末资产总计': row['info:年末资产总计'.encode('utf-8')].decode()
}
data.append(row_data)
# 关闭连接
connection.close()
# 将数据转换为Spark DataFrame
df = spark.createDataFrame([Row(**{k: str(v) for k, v in i.items()}) for i in data])
df = df.filter(~df['地址'].isin(['NULL', 'qingxiubgs2014@sina.com', '防城市', 'saddress']))
# 去除重复项
df = df.dropDuplicates()
# 需要的列名
df = df.select('地址', '净利润', '年末资产总计') # 修改这里的列名为实际的列名
# 创建一个正则表达式,匹配"市"及其之前的所有字符
pattern = "(.*市)"
# 使用regexp_extract函数提取地址中的城市名如果没有匹配到则保持原始地址不变
df = df.withColumn('地址',
when(df['地址'].rlike(pattern), regexp_extract(df['地址'], pattern, 1)).otherwise(df['地址']))
# 检查"地址"列是否以"广西"开头,如果是,则去掉前两个字符,否则保持原始地址不变
df = df.withColumn('地址', when(df['地址'].startswith('广西'), df['地址'].substr(3, 50)).otherwise(df['地址']))
# 广西的14个市级名称
cities = ['南宁市', '柳州市', '桂林市', '梧州市', '北海市', '防城港市', '钦州市', '贵港市', '玉林市', '百色市',
'贺州市', '河池市', '来宾市', '崇左市']
# 对每个市进行处理
for city in cities:
df = df.withColumn('地址', when(df['地址'].contains(city[:-1]), city).otherwise(df['地址']))
# 广西的所有县级行政区划及其对应的市级行政区划
county_to_city = {
# 南宁市
'兴宁区': '南宁市',
'青秀区': '南宁市',
'江南区': '南宁市',
'西乡塘区': '南宁市',
'良庆区': '南宁市',
'邕宁区': '南宁市',
'武鸣区': '南宁市',
'隆安县': '南宁市',
'马山县': '南宁市',
'上林县': '南宁市',
'宾阳县': '南宁市',
'横县': '南宁市',
# 柳州市
'城中区': '柳州市',
'鱼峰区': '柳州市',
'柳南区': '柳州市',
'柳北区': '柳州市',
'柳江区': '柳州市',
'柳城县': '柳州市',
'鹿寨县': '柳州市',
'融安县': '柳州市',
'融水苗族自治县': '柳州市',
'三江侗族自治县': '柳州市',
# 桂林市
'秀峰区': '桂林市',
'叠彩区': '桂林市',
'象山区': '桂林市',
'七星区': '桂林市',
'雁山区': '桂林市',
'临桂区': '桂林市',
'阳朔县': '桂林市',
'灵川县': '桂林市',
'全州县': '桂林市',
'兴安县': '桂林市',
'永福县': '桂林市',
'灌阳县': '桂林市',
'龙胜各族自治县': '桂林市',
'资源县': '桂林市',
'平乐县': '桂林市',
'荔浦市': '桂林市',
'恭城瑶族自治县': '桂林市',
# 梧州市
'万秀区': '梧州市',
'长洲区': '梧州市',
'龙圩区': '梧州市',
'苍梧县': '梧州市',
'藤县': '梧州市',
'蒙山县': '梧州市',
'岑溪市': '梧州市',
# 北海市
'海城区': '北海市',
'银海区': '北海市',
'铁山港区': '北海市',
'合浦县': '北海市',
# 防城港市
'港口区': '防城港市',
'防城区': '防城港市',
'上思县': '防城港市',
'东兴市': '防城港市',
# 钦州市
'钦南区': '钦州市',
'钦北区': '钦州市',
'灵山县': '钦州市',
'浦北县': '钦州市',
# 贵港市
'港北区': '贵港市',
'港南区': '贵港市',
'覃塘区': '贵港市',
'平南县': '贵港市',
'桂平市': '贵港市',
# 玉林市
'玉州区': '玉林市',
'福绵区': '玉林市',
'容县': '玉林市',
'陆川县': '玉林市',
'博白县': '玉林市',
'兴业县': '玉林市',
'北流市': '玉林市',
# 百色市
'右江区': '百色市',
'田阳县': '百色市',
'田东县': '百色市',
'平果县': '百色市',
'德保县': '百色市',
'那坡县': '百色市',
'凌云县': '百色市',
'乐业县': '百色市',
'田林县': '百色市',
'西林县': '百色市',
'隆林各族自治县': '百色市',
'靖西市': '百色市',
# 贺州市
'八步区': '贺州市',
'平桂区': '贺州市',
'昭平县': '贺州市',
'钟山县': '贺州市',
'富川瑶族自治县': '贺州市',
# 河池市
'金城江区': '河池市',
'南丹县': '河池市',
'天峨县': '河池市',
'凤山县': '河池市',
'东兰县': '河池市',
'罗城仫佬族自治县': '河池市',
'环江毛南族自治县': '河池市',
'巴马瑶族自治县': '河池市',
'都安瑶族自治县': '河池市',
'大化瑶族自治县': '河池市',
'宜州市': '河池市',
# 来宾市
'兴宾区': '来宾市',
'忻城县': '来宾市',
'象州县': '来宾市',
'武宣县': '来宾市',
'金秀瑶族自治县': '来宾市',
'合山市': '来宾市',
# 崇左市
'江州区': '崇左市',
'扶绥县': '崇左市',
'宁明县': '崇左市',
'龙州县': '崇左市',
'大新县': '崇左市',
'天等县': '崇左市',
'凭祥市': '崇左市',
# 特殊值
'高新技术产业开发区': '南宁市',
'南宁经济技术开发区': '南宁市',
'东盟经济技术开发区': '南宁市',
'高新区': '南宁市',
'武鸣县': '南宁市',
'隆安': '南宁市',
'融水县': '柳州市',
'融安': '柳州市',
'柳江县': '柳州市',
'柳邕': '柳州市',
'平果': '百色市',
'田阳': '百色市',
'灵川': '桂林市',
'临桂': '桂林市',
'龙胜县': '桂林市',
'荔浦县': '桂林市',
'恭城县': '桂林市',
'巴马县': '河池市',
'罗城': '河池市',
'合浦': '北海市',
# 特殊值2
'科园西十路24号': '南宁市',
'科园东四路5号': '南宁市',
'秀安路13-11号': '南宁市',
'科园大道31号财智时代12楼': '南宁市',
'新兴工业园创业路6号': '柳州市',
'洛维工业集中区': '柳州市',
'中马产业园区': '钦州市',
'长安工业集中区': '桂林市',
'西江四路扶典上冲29号': '梧州市',
'田东石化工业': '百色市',
'防城市': '防城港市'
}
# 对每个县级行政区划进行处理
for county, city in county_to_city.items():
df = df.withColumn('地址', when(df['地址'].contains(county), city).otherwise(df['地址']))
# 地域投资潜力分析(按照给定的权重)
df_investment_potential = df.groupBy('地址').agg(
F.count('净利润').alias('企业数量'),
F.round(F.sum('净利润')).alias('净利润'),
F.round(F.sum('年末资产总计')).alias('年末资产总计'),
F.round(F.avg('年末资产总计')).alias('平均年末资产总计')
)
# 将结果转换为Pandas DataFrame
pandas_geo_df = df_investment_potential.toPandas()
# 定义权重
weight_count = 0.4
weight_income = 0.4
weight_profit = 0.2
# 归一化数据
max_count = df_investment_potential.agg(F.max('企业数量')).collect()[0][0]
max_income = df_investment_potential.agg(F.max('年末资产总计')).collect()[0][0]
max_profit = df_investment_potential.agg(F.max('净利润')).collect()[0][0]
# 计算地域经济活力指数
df_geo_analysis = df_investment_potential.withColumn(
'投资潜力',
F.round(
weight_count * (F.col('企业数量') / max_count) +
weight_income * (F.col('年末资产总计') / max_income) +
weight_profit * (F.col('净利润') / max_profit),
2)
)
# 按地域经济活力指数降序排列
df_geo_analysis = df_geo_analysis.orderBy('投资潜力', ascending=False)
pandas_geo_df = df_geo_analysis.toPandas()
# 排序 DataFrame按照投资潜力降序排列
sorted_pandas_invest_df = pandas_geo_df.sort_values(by='投资潜力', ascending=False)
# 使用过滤后的数据创建 Pyecharts 条形图
lyh_code03_bar = (
Bar()
.add_xaxis(sorted_pandas_invest_df['地址'].tolist())
.add_yaxis('投资潜力', sorted_pandas_invest_df['投资潜力'].tolist(),
label_opts=opts.LabelOpts(position='inside'))
.set_series_opts(label_opts=opts.LabelOpts(is_show=True, position='top', formatter='{c}'))
.set_global_opts(
title_opts=opts.TitleOpts(title="地域投资潜力"),
xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-30, font_size=10)),
)
)
return lyh_code03_bar