import os, pymysql, pandas as pd, numpy as np
from datetime import datetime, timedelta
from keras.models import Sequential
from keras.layers import LSTM, Dropout, Dense
from sklearn.preprocessing import MinMaxScaler
from django.http import JsonResponse
parent_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dbtype, host, port, user, passwd, dbName, charset, hasHadoop = config_read(os.path.join(parent_directory,"config.ini"))
mysql_config = {
'host': host,
'user': user,
'password': passwd,
'database': dbName,
'port': port
}
def create_dataset(data, time_step=1):
"""将时序数据转换为监督学习格式"""
X, Y = [], []
for i in range(len(data) - time_step - 1):
a = data[i:(i + time_step), :]
X.append(a)
Y.append(data[i + time_step, :])
return np.array(X), np.array(Y)
def learningdataforecast_forecast(request):
if request.method in ["POST", "GET"]:
msg = {'code': normal_code, "msg": mes.normal_code}
connection = pymysql.connect(**mysql_config)
query = "SELECT date, student, subject, regulargrade, midtermresults, learningattitude, finalgrade FROM learningdata ORDER BY date ASC"
data = pd.read_sql(query, connection).dropna()
date_format = data['date'].iloc[0]
if isinstance(date_format, (datetime.date, datetime.datetime)):
pass
elif "年" in date_format and "月" in date_format and "日" in date_format:
date_format='%Y年%m月%d日'
elif "年" in date_format and "月" in date_format:
date_format='%Y年%m月'
else:
date_format='%Y-%m-%d'
data['date'] = pd.to_datetime(data['date'], format=date_format)
data.set_index('date', inplace=True)
student_encoder = LabelEncoder()
data['student'] = student_encoder.fit_transform(data['student'])
subject_encoder = LabelEncoder()
data['subject'] = subject_encoder.fit_transform(data['subject'])
learningattitude_encoder = LabelEncoder()
data['learningattitude'] = learningattitude_encoder.fit_transform(data['learningattitude'])
data = data[['student', 'subject', 'regulargrade', 'midtermresults', 'learningattitude', 'finalgrade']]
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
time_step = int(len(data)/10)
if time_step > 30: time_step = 30
if time_step <= 0: time_step = 1
X, y = create_dataset(scaled_data, time_step)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
print(f'X_train shape: {X_train.shape}, y_train shape: {y_train.shape}')
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))
model.add(LSTM(50, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(len(data.columns), activation='relu'))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=1)
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(12, 6), dpi=80)
plt.plot(data.index[:len(train_predict)], train_predict[:, 1-1], label='训练 student 预测', color='blue')
plt.plot(data.index[len(train_predict) + time_step + 1:], test_predict[:, 1-1], label='测试 student 预测', color='red')
plt.plot(data.index, data['student'], label='实际 student', color='green')
plt.title('student 预测')
plt.xlabel('Date')
plt.ylabel('student')
plt.legend()
directory = os.path.join(parent_directory, "templates", "upload", "learningdataforecast", "student_prediction.png")
os.makedirs(os.path.dirname(directory), exist_ok=True)
plt.savefig(directory)
plt.clf()
plt.close()
last_data_days = scaled_data[-time_step:]
future_predictions = []
for _ in range(3):
last_data_days = last_data_days.reshape((1, time_step, len(data.columns)))
prediction = model.predict(last_data_days)
future_predictions.append(prediction[0])
last_data_days = np.append(last_data_days[:, 1:, :], [prediction], axis=1)
future_predictions = scaler.inverse_transform(future_predictions)
last_date = data.index[-1]
future_dates = [last_date + timedelta(days=31*i) for i in range(1, 3+1)]
df = pd.DataFrame(columns=['date', 'student', 'subject', 'regulargrade', 'midtermresults', 'learningattitude', 'finalgrade'])
df['date'] = [str(date.year)+"-"+str(date.month) for date in future_dates]
df['student'] = future_predictions[:, 1-1]
df['subject'] = future_predictions[:, 2-1]
df['regulargrade'] = future_predictions[:, 3-1]
df['midtermresults'] = future_predictions[:, 4-1]
df['learningattitude'] = future_predictions[:, 5-1]
df['finalgrade'] = future_predictions[:, 6-1]
df['student'] = student_encoder.inverse_transform(df['student'].astype(int))
df['subject'] = subject_encoder.inverse_transform(df['subject'].astype(int))
df['learningattitude'] = learningattitude_encoder.inverse_transform(df['learningattitude'].astype(int))
connection_string = f"mysql+pymysql://{mysql_config['user']}:{mysql_config['password']}@{mysql_config['host']}:{mysql_config['port']}/{mysql_config['database']}"
engine = create_engine(connection_string)
try:
df.to_sql('learningdataforecast', con=engine, if_exists='append', index=False)
print("数据更新成功!")
except Exception as e:
print(f"发生错误:{e}")
finally:
engine.dispose()
return JsonResponse(msg, encoder=CustomJsonEncoder)