版本与依赖
pyspark 在 JDK1.8 环境下是 3.5.3 版本,即:
pip install pyspark==3.5.3
pyspark 需要依赖 MySQL 驱动,有两种方式:
- 自动下载
.config("spark.jars.packages","mysql:mysql-connector-java:8.0.26") \
- 手动下载后放到项目根目录 libs 下:
mysql_jar_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),'libs','mysql-connector-java-8.0.26.jar')
if os.path.exists(mysql_jar_path):
builder = builder.config("spark.jars", mysql_jar_path)
app.logger.info(f"使用本地 MySQL 驱动:{ mysql_jar_path}")
扩展注入
applications/extension/init.py 如下:
# applications/extensions/__init__.py
from flask import Flask
import os
import sys
import traceback
from .init_sqlalchemy import db, ma, init_databases
from .init_upload import init_upload
from .init_migrate import init_migrate
# Spark 相关 - 直接使用变量,不用 property
spark = None
def init_spark(app):
"""初始化 SparkSession"""
global spark
# 声明使用全局变量
try:
app.logger.info("开始初始化 SparkSession...")
# 检查 Java 环境
import subprocess
:
java_version = subprocess.check_output([,], stderr=subprocess.STDOUT).decode()
app.logger.info()
Exception e:
app.logger.error()
:
pyspark.sql SparkSession
pyspark
app.logger.info()
app.logger.info()
ImportError e:
app.logger.error()
os.environ[]= sys.executable
os.environ[]= sys.executable
os.environ[]=
app.logger.info()
app.logger.info()
app.logger.info()
builder = SparkSession.builder \
.appName() \
.master() \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \
.config(,) \

