Spark+Flask新能源车数据分析与推荐系统实战:从0到1搭建完整项目

Spark+Flask新能源车数据分析与推荐系统实战:从0到1搭建完整项目

Spark+Flask新能源车数据分析与推荐系统实战:从0到1搭建完整项目

在碳中和政策驱动与新能源汽车产业爆发的双重背景下,车联网数据、用户行为数据、市场交易数据的价值愈发凸显。本文将带大家从零开始,基于Spark完成新能源车数据的分布式分析,再通过Flask搭建轻量级Web服务,结合协同过滤算法实现个性化车辆推荐,最终打造一套完整的新能源车数据分析与推荐系统。全程聚焦实战,所有代码均可直接复用,是大数据工程师、数据分析师的优质实战项目。

在这里插入图片描述
在这里插入图片描述


一、项目整体架构与技术栈解析

1. 核心技术栈

本次项目的核心技术组合为 Spark + Flask + Python,各组件分工明确:

  • Spark:负责海量新能源车数据的清洗、转换、统计分析(如用户充电行为、车型销量分布),利用分布式计算提升处理效率;
  • Flask:搭建轻量级Web服务,提供数据查询API和推荐结果展示界面;
  • 协同过滤算法:基于用户-车型评分矩阵,实现个性化车辆推荐;
  • Echarts:前端可视化,展示数据分析结果(如销量趋势、地区分布)。

2. 项目流程

  1. 数据准备:获取新能源车公开数据集(含用户信息、车型数据、用户评分/行为数据);
  2. Spark数据处理:清洗脏数据、统计核心指标、构建用户-车型特征矩阵;
  3. 推荐算法实现:基于Spark MLlib实现协同过滤推荐;
  4. Flask接口开发:封装分析结果和推荐算法,提供REST API;
  5. 前端展示:调用API渲染可视化图表和推荐结果。

二、实战步骤:从数据处理到推荐系统搭建

1. 环境准备

确保本地/服务器已安装以下环境:

  • Python 3.8+
  • Spark 3.3+(配置SPARK_HOME环境变量)
  • 依赖库安装:
pip install pyspark flask pandas numpy requests flask-cors 

2. Spark数据处理实战

(1)数据集说明

本次使用的数据集包含3个核心文件(可从Kaggle/新能源汽车公开平台获取):

  • user.csv:用户ID、地区、购车预算、充电频率;
  • car.csv:车型ID、品牌、续航里程、价格、能耗;
  • user_car_score.csv:用户ID、车型ID、评分(1-5分)、浏览时长。
(2)Spark数据清洗与分析

编写spark_data_analysis.py,实现数据清洗和核心指标统计:

from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, count,sum# 初始化SparkSession spark = SparkSession.builder \ .appName("NewEnergyCarAnalysis") \ .master("local[*]")# 本地运行,分布式环境可改为yarn.getOrCreate()# 1. 加载数据 user_df = spark.read.csv("data/user.csv", header=True, inferSchema=True) car_df = spark.read.csv("data/car.csv", header=True, inferSchema=True) score_df = spark.read.csv("data/user_car_score.csv", header=True, inferSchema=True)# 2. 数据清洗:过滤空值、异常值# 过滤用户表空值 user_clean_df = user_df.filter( col("user_id").isNotNull()& col("region").isNotNull()& col("budget").between(50000,500000)# 过滤预算异常值)# 过滤车型表空值,续航里程>0 car_clean_df = car_df.filter( col("car_id").isNotNull()& col("range_mileage")>0)# 过滤评分表,评分1-5分 score_clean_df = score_df.filter( col("score").between(1,5))# 3. 核心指标分析# 3.1 各地区用户充电频率统计 region_charge_freq = user_clean_df.groupBy("region") \ .agg(avg("charge_frequency").alias("avg_charge_freq")) \ .orderBy(col("avg_charge_freq").desc())print("各地区用户平均充电频率:") region_charge_freq.show()# 3.2 不同价格区间车型的平均续航 price_range_car = car_clean_df.withColumn("price_range",# 划分价格区间 when(col("price")<=100000,"10万以下").when((col("price")>100000)&(col("price")<=200000),"10-20万").when((col("price")>200000)&(col("price")<=300000),"20-30万").otherwise("30万以上")).groupBy("price_range") \ .agg(avg("range_mileage").alias("avg_range")) \ .orderBy(col("avg_range").desc())print("不同价格区间车型平均续航:") price_range_car.show()# 3.3 构建用户-车型评分矩阵(用于推荐算法) user_car_matrix = score_clean_df.join(user_clean_df, on="user_id") \ .join(car_clean_df, on="car_id") \ .select("user_id","car_id","score","range_mileage","price")# 保存处理后的数据为Parquet格式(Spark高效格式) user_car_matrix.write.mode("overwrite").parquet("data/user_car_matrix.parquet")# 停止SparkSession spark.stop()
(3)运行Spark脚本
python spark_data_analysis.py 

运行后会输出各地区充电频率、价格区间-续航等核心指标,同时生成user_car_matrix.parquet文件,为后续推荐算法提供数据基础。

3. 基于Spark MLlib实现协同过滤推荐

编写recommendation_algorithm.py,利用Spark MLlib的ALS(交替最小二乘法)实现协同过滤推荐:

from pyspark.sql import SparkSession from pyspark.ml.recommendation import ALS from pyspark.ml.evaluation import RegressionEvaluator # 初始化SparkSession spark = SparkSession.builder \ .appName("CarRecommendation") \ .master("local[*]") \ .getOrCreate()# 加载预处理后的用户-车型矩阵 user_car_matrix = spark.read.parquet("data/user_car_matrix.parquet")# 划分训练集和测试集(8:2)(training_data, test_data)= user_car_matrix.randomSplit([0.8,0.2], seed=42)# 构建ALS模型 als = ALS( maxIter=10,# 迭代次数 regParam=0.01,# 正则化参数,防止过拟合 userCol="user_id",# 用户ID列 itemCol="car_id",# 车型ID列 ratingCol="score",# 评分列 coldStartStrategy="drop"# 忽略冷启动(新用户/新车)的NaN值)# 训练模型 model = als.fit(training_data)# 模型评估 predictions = model.transform(test_data) evaluator = RegressionEvaluator( metricName="rmse",# 均方根误差 labelCol="score", predictionCol="prediction") rmse = evaluator.evaluate(predictions)print(f"模型RMSE(均方根误差):{rmse:.4f}")# 越小说明模型越准确# 为指定用户推荐Top5车型defrecommend_cars_for_user(user_id, top_n=5):# 生成推荐结果 user_recs = model.recommendForAllUsers(top_n)# 筛选指定用户的推荐结果 user_rec = user_recs.filter(col("user_id")== user_id).collect()if user_rec:# 解析推荐结果 rec_list = user_rec[0]["recommendations"] rec_cars =[(item["car_id"], item["rating"])for item in rec_list]return rec_cars else:return[]# 测试:为用户ID=1001推荐5款车型 rec_result = recommend_cars_for_user(1001,5)print(f"为用户1001推荐的车型ID及预测评分:{rec_result}")# 保存模型 model.save("model/car_recommendation_model") spark.stop()

运行脚本后,会输出模型的RMSE(理想值<1),并为用户1001推荐5款车型,同时保存训练好的模型到model/目录。

4. Flask搭建Web服务与API开发

编写flask_app.py,封装数据分析结果和推荐算法,提供REST API:

from flask import Flask, jsonify, request from flask_cors import CORS from pyspark.sql import SparkSession from pyspark.ml.recommendation import ALSModel import pandas as pd app = Flask(__name__) CORS(app)# 解决跨域问题# 初始化SparkSession(用于加载模型和数据) spark = SparkSession.builder \ .appName("FlaskCarRecommendation") \ .master("local[*]") \ .getOrCreate()# 加载训练好的推荐模型 model = ALSModel.load("model/car_recommendation_model")# 加载车型基础数据(转为字典,方便查询) car_df = pd.read_csv("data/car.csv") car_info = car_df.set_index("car_id").T.to_dict()# 接口1:获取不同价格区间车型的平均续航@app.route("/api/price_range_range", methods=["GET"])defget_price_range_range():# 读取Spark分析结果(也可直接读取本地CSV) price_range_df = pd.read_csv("data/price_range_car.csv")# 转为JSON格式 result = price_range_df.to_dict("records")return jsonify({"code":200,"data": result})# 接口2:为指定用户推荐车型@app.route("/api/recommend", methods=["POST"])defrecommend_cars():# 获取请求参数 data = request.get_json() user_id = data.get("user_id") top_n = data.get("top_n",5)ifnot user_id:return jsonify({"code":400,"msg":"用户ID不能为空"})# 调用推荐函数defget_recommendations(user_id, top_n): user_recs = model.recommendForAllUsers(top_n) user_rec = user_recs.filter(col("user_id")== user_id).collect()if user_rec: rec_list = user_rec[0]["recommendations"]# 补充车型详情 rec_result =[]for item in rec_list: car_id = item["car_id"] rec_result.append({"car_id": car_id,"pred_score":round(item["rating"],2),"brand": car_info[car_id]["brand"],"range_mileage": car_info[car_id]["range_mileage"],"price": car_info[car_id]["price"]})return rec_result else:return[] rec_data = get_recommendations(user_id, top_n)return jsonify({"code":200,"data": rec_data})# 接口3:获取各地区用户充电频率@app.route("/api/region_charge", methods=["GET"])defget_region_charge(): region_df = pd.read_csv("data/region_charge_freq.csv") result = region_df.to_dict("records")return jsonify({"code":200,"data": result})if __name__ =="__main__":# 启动Flask服务 app.run(host="0.0.0.0", port=5000, debug=True)

5. 前端可视化展示(简易版)

编写index.html,调用Flask API渲染Echarts图表和推荐结果:

<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><title>新能源车数据分析与推荐系统</title><!-- 引入Echarts --><scriptsrc="https://cdn.jsdelivr.net/npm/echarts/dist/echarts.min.js"></script><!-- 引入jQuery --><scriptsrc="https://cdn.jsdelivr.net/npm/jquery/dist/jquery.min.js"></script><style>.container{width: 90%;margin: 0 auto;}.chart{width: 100%;height: 400px;margin: 20px 0;}.recommend{margin: 20px 0;padding: 20px;border: 1px solid #eee;}</style></head><body><divclass="container"><h1>新能源车数据分析与推荐系统</h1><!-- 价格区间-续航图表 --><divclass="chart"id="priceRangeChart"></div><!-- 地区充电频率图表 --><divclass="chart"id="regionChargeChart"></div><!-- 推荐模块 --><divclass="recommend"><h3>个性化车型推荐</h3><inputtype="text"id="userId"placeholder="输入用户ID(如1001)"><buttononclick="getRecommend()">获取推荐</button><divid="recResult"></div></div></div><script>// 初始化价格区间-续航图表var priceRangeChart = echarts.init(document.getElementById('priceRangeChart')); $.get("/api/price_range_range",function(res){if(res.code ===200){var xData = res.data.map(item=> item.price_range);var yData = res.data.map(item=> item.avg_range); priceRangeChart.setOption({title:{text:'不同价格区间车型平均续航'},xAxis:{type:'category',data: xData},yAxis:{type:'value',name:'续航(公里)'},series:[{type:'bar',data: yData}]});}});// 初始化地区充电频率图表var regionChargeChart = echarts.init(document.getElementById('regionChargeChart')); $.get("/api/region_charge",function(res){if(res.code ===200){var xData = res.data.map(item=> item.region);var yData = res.data.map(item=> item.avg_charge_freq); regionChargeChart.setOption({title:{text:'各地区用户平均充电频率'},xAxis:{type:'category',data: xData},yAxis:{type:'value',name:'充电频率(次/周)'},series:[{type:'line',data: yData}]});}});// 获取推荐结果functiongetRecommend(){var userId =$("#userId").val();if(!userId){alert("请输入用户ID");return;} $.ajax({url:"/api/recommend",type:"POST",contentType:"application/json",data:JSON.stringify({user_id:parseInt(userId),top_n:5}),success:function(res){if(res.code ===200){var html ="<table><tr><th>车型ID</th><th>品牌</th><th>续航(公里)</th><th>价格(元)</th><th>预测评分</th></tr>"; res.data.forEach(item=>{ html +=`<tr> <td>${item.car_id}</td> <td>${item.brand}</td> <td>${item.range_mileage}</td> <td>${item.price}</td> <td>${item.pred_score}</td> </tr>`;}); html +="</table>";$("#recResult").html(html);}}});}</script></body></html>

6. 项目运行与验证

  1. 启动Flask服务:
python flask_app.py 
  1. 打开index.html文件(直接双击或部署到Nginx),即可看到:
    • 价格区间-续航的柱状图;
    • 各地区充电频率的折线图;
    • 输入用户ID(如1001),点击“获取推荐”,展示推荐车型列表(含品牌、续航、价格、预测评分)。

三、项目优化与扩展方向

  1. 实时数据处理:集成Spark Streaming,处理车联网实时数据(如实时充电行为、车辆行驶数据),实现动态推荐;
  2. 算法优化:融合用户画像(如预算、地区、充电频率)和车型特征(续航、价格),采用FM/DeepFM等深度学习推荐算法;
  3. 部署上线:将Flask服务部署到Gunicorn+Nginx,Spark作业提交到Yarn集群,提升系统稳定性和并发能力;
  4. 数据安全:对用户隐私数据(如手机号、地理位置)进行脱敏处理,符合车联网数据安全规范。

四、项目价值与应用场景

  1. 求职加分:该项目覆盖Spark大数据分析、Flask Web开发、推荐算法、数据可视化全链路,是大数据/数据分析岗位简历的优质实战项目;
  2. 行业落地:可应用于新能源车企的用户运营(个性化推荐)、市场分析(销量/价格趋势)、产品优化(续航/价格调整);
  3. 学习提升:帮助新手掌握Python全栈数据项目的开发流程,理解大数据分析与推荐系统的结合思路。

总结

  1. 本次项目以Spark为核心完成新能源车数据的分布式分析,利用Flask搭建Web服务,结合协同过滤算法实现了个性化推荐,覆盖了数据处理、算法实现、Web开发全流程;
  2. 项目代码可直接复用,通过调整数据集和参数,可适配不同行业(如电商、影视)的推荐场景;
  3. 核心价值在于将大数据分析与实际业务场景结合,既体现了Spark的分布式计算能力,又通过Flask实现了算法的工程化落地。
    通过这个项目,你不仅能掌握Spark和Flask的核心用法,还能理解大数据分析项目从数据处理到系统部署的完整链路,是提升实战能力的绝佳案例。

✨ 坚持用清晰的图解+易懂的硬件架构 +硬件解析, 让每个知识点都简单明了!
🚀 个人主页一只大侠的侠 · ZEEKLOG💬 座右铭 :“所谓成功就是以自己的方式度过一生。”

Read more

URDF(Unified Robot Description Format)机器人领域中用于描述机器人模型的标准 XML 格式

URDF(Unified Robot Description Format),这是机器人领域中用于描述机器人模型的标准 XML 格式。 1. URDF 概述 URDF 是 ROS(Robot Operating System)中用于描述机器人结构的标准格式。它使用 XML 格式定义机器人的: * 连杆(Links):机器人的刚性部件 * 关节(Joints):连接连杆的运动副 * 运动学结构:连杆与关节的层级关系 * 物理属性:质量、惯性、碰撞体积等 * 视觉属性:3D 模型外观 2. URDF 核心元素详解 2.1 基本结构 <?xml version="1.0"?>

By Ne0inhk
Flutter 三方库 modular_core 大型应用级鸿蒙微服务化架构适配解析:纵深拆解路由控制组件化隔离网格,利用轻量级依赖注入中枢斩断应用深层耦合羁绊-适配鸿蒙 HarmonyOS ohos

Flutter 三方库 modular_core 大型应用级鸿蒙微服务化架构适配解析:纵深拆解路由控制组件化隔离网格,利用轻量级依赖注入中枢斩断应用深层耦合羁绊-适配鸿蒙 HarmonyOS ohos

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 modular_core 大型应用级鸿蒙微服务化架构适配解析:纵深拆解路由控制组件化隔离网格,利用轻量级依赖注入中枢斩断应用深层耦合羁绊 在构建超大型、多业务线的鸿蒙应用时,代码的模块化分层与解耦是决定项目成败的关键。modular_core 作为 flutter_modular 的核心逻辑库,提供了一套纯粹的依赖注入(DI)和模块生命周期管理机制。本文将深入解析该库在 OpenHarmony 上的适配与应用实践。 前言 什么是 modular_core?它不是一个 UI 框架,而是一套管理“对象如何创建”和“模块如何组织”的底层协议。在鸿蒙操作系统这种强调模块化分发(HAP/HSP)和细粒度原子化服务的生态中,利用 modular_core 可以帮助开发者构建出高内聚、低耦合的系统底座。本文将指导你如何在鸿蒙端侧实现模块的动态注入与回收。 一、

By Ne0inhk
21m/s!UZH RPG组T-RO新作AC-MPC:微分MPC赋能强化学习,实现超人级无人机竞速

21m/s!UZH RPG组T-RO新作AC-MPC:微分MPC赋能强化学习,实现超人级无人机竞速

「MPC+RL」 目录 01 主要方法  1. 整体架构:RL决策 + MPC执行  2. Actor设计:学习代价而非动作 3. Critic设计与模型预测价值扩展 02  实验结果 1.训练效率与极限性能:学得更快,飞得更猛  2.鲁棒性:无惧风扰与参数偏差  3.可解释性:打开 RL 的黑盒  4.真实世界部署:零样本迁移的 21m/s 03  总结 在机器人控制领域,长期存在着模型驱动(MPC)与数据驱动(RL)的路线之争。前者理论完备但依赖人工调参,后者探索力强却受困于黑盒不可解释性。苏黎世大学 RPG 组的这项 T-RO 最新工作,为这一争论提供了一个优雅的融合解。 论文提出的

By Ne0inhk

OpenClaw实战系列01:OpenClaw接入飞书机器人全接入指南 + Ollama本地大模型

文章目录 * 引言 * 第一步:环境准备与核心思想 * 第二步:部署Ollama——把大模型“养”在本地 * 1. 安装 Ollama * 2. 拉取并运行模型 * 3. 确认API可用性 * 第三步:安装OpenClaw——AI大脑的“躯干” * 1. 安装Node.js * 2. 一键安装 OpenClaw * 3. 验证安装 * 第四步:打通飞书——创建并配置机器人 * 1. 创建飞书应用 * 2. 配置机器人能力 * 3. 发布应用 * 第五步:OpenClaw与飞书“握手” * 方法一:使用 onboard 向导重新配置(推荐最新版) * 方法二:手动添加渠道 * 批准配对 * 第六步:实战测试与玩法拓展

By Ne0inhk