import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from django.http import JsonResponse
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt
import pymysql
parent_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dbtype, host, port, user, passwd, dbName, charset, hasHadoop = config_read(os.path.join(parent_directory, "config.ini"))
mysql_config = {
'host': host,
'user': user,
'password': passwd,
'database': dbName,
'port': port
}
def auto_figsize(x_data, base_width=8, base_height=6, width_per_point=0.2):
"""根据数据点数量自动调整画布宽度"""
num_points = len(x_data)
dynamic_width = base_width + width_per_point * num_points
return (dynamic_width, base_height)
def create_dataset(data, time_step=1):
"""定义函数创建时间序列数据集"""
X, Y = [], []
for i in range(len(data) - time_step - 1):
a = data[i:(i + time_step), :]
X.append(a)
Y.append(data[i + time_step, :])
return np.array(X), np.array(Y)
def learningdataforecast_forecast(request):
if request.method in ["POST", "GET"]:
msg = {'code': normal_code, "msg": mes.normal_code}
connection = pymysql.connect(**mysql_config)
query = "SELECT date, student, subject, regulargrade, midtermresults, learningattitude, finalgrade FROM learningdata ORDER BY date ASC"
data = pd.read_sql(query, connection).dropna()
date_format = data['date'].iloc[0]
if isinstance(date_format, (datetime.date, datetime.datetime)):
pass
elif "年" in date_format and "月" in date_format and "日" in date_format:
date_format = '%Y年%m月%d日'
elif "年" in date_format and "月" in date_format:
date_format = '%Y年%m月'
elif "年" in date_format:
date_format = '%Y年'
else:
if date_format == "" or date_format is None:
data['date'] = pd.to_datetime(data['date'])
else:
data['date'] = pd.to_datetime(data['date'], format=date_format)
data.set_index('date', inplace=True)
student_encoder = LabelEncoder()
data['student'] = student_encoder.fit_transform(data['student'])
subject_encoder = LabelEncoder()
data['subject'] = subject_encoder.fit_transform(data['subject'])
learningattitude_encoder = LabelEncoder()
data['learningattitude'] = learningattitude_encoder.fit_transform(data['learningattitude'])
data = data[['student', 'subject', 'regulargrade', 'midtermresults', 'learningattitude', 'finalgrade']]
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
time_step = int(len(data)/10)
if time_step > 30: time_step = 30
if time_step <= 0: time_step = 1
X, y = create_dataset(scaled_data, time_step)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
print(f'X_train shape: {X_train.shape}, y_train shape: {y_train.shape}')
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))
model.add(LSTM(50, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(len(data.columns), activation='relu'))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=1)
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
last_data_days = scaled_data[-time_step:]
future_predictions = []
for _ in range(3):
last_data_days = last_data_days.reshape((1, time_step, len(data.columns)))
prediction = model.predict(last_data_days)
future_predictions.append(prediction[0])
last_data_days = np.append(last_data_days[:, 1:, :], [prediction], axis=1)
future_predictions = scaler.inverse_transform(future_predictions)
last_date = data.index[-1]
future_dates = [last_date + timedelta(days=31*i) for i in range(1, 3+1)]
df = pd.DataFrame(columns=['date', 'student', 'subject', 'regulargrade', 'midtermresults', 'learningattitude', 'finalgrade'])
df['date'] = [str(date.year)+"-"+str(date.month) for date in future_dates]
df['student'] = future_predictions[:, 1-1]
df['subject'] = future_predictions[:, 2-1]
df['regulargrade'] = future_predictions[:, 3-1]
df['midtermresults'] = future_predictions[:, 4-1]
df['learningattitude'] = future_predictions[:, 5-1]
df['finalgrade'] = future_predictions[:, 6-1]
df['student'] = student_encoder.inverse_transform(df['student'].astype(int))
df['subject'] = subject_encoder.inverse_transform(df['subject'].astype(int))
df['learningattitude'] = learningattitude_encoder.inverse_transform(df['learningattitude'].astype(int))
connection_string = f"mysql+pymysql://{mysql_config['user']}:{mysql_config['password']}@{mysql_config['host']}:{mysql_config['port']}/{mysql_config['database']}"
engine = create_engine(connection_string)
try:
df.to_sql('learningdataforecast', con=engine, if_exists='append', index=False)
print("数据更新成功!")
except Exception as e:
print(f"发生错误:{e}")
finally:
engine.dispose()
return JsonResponse(msg, encoder=CustomJsonEncoder)