NumPy 与 Pandas 数据分析基础
NumPy 和 Pandas 是 Python 数据分析的核心库。本文介绍了 NumPy 数组的创建、操作、向量化运算及广播机制,涵盖数学函数应用。Pandas 部分讲解了 Series 和 DataFrame 结构,包括数据读写、清洗(缺失值、重复值处理)、索引切片、过滤排序及分组聚合。最后通过实战项目演示了基于 Tkinter 的数据分析系统开发流程,包含需求分析、架构设计、代码实现及测试,帮助读者掌握数据处理全流程技能。

NumPy 和 Pandas 是 Python 数据分析的核心库。本文介绍了 NumPy 数组的创建、操作、向量化运算及广播机制,涵盖数学函数应用。Pandas 部分讲解了 Series 和 DataFrame 结构,包括数据读写、清洗(缺失值、重复值处理)、索引切片、过滤排序及分组聚合。最后通过实战项目演示了基于 Tkinter 的数据分析系统开发流程,包含需求分析、架构设计、代码实现及测试,帮助读者掌握数据处理全流程技能。

💡 掌握 NumPy 数组的基本操作和运算 💡 理解 NumPy 的广播机制和向量化运算 💡 学会使用 Pandas 进行数据读取、清洗和处理 💡 掌握 Pandas 的数据索引、切片和聚合操作 💡 通过实战项目,提升数据分析能力
NumPy 是 Python 中用于科学计算的核心库。它提供了高效的多维数组对象和大量的数学函数,可以用于处理各种科学计算任务。
NumPy 可以通过 pip 安装:
pip install numpy
以下是一个简单的 NumPy 使用示例:
import numpy as np
# 创建 NumPy 数组
arr = np.array([1,2,3,4,5])
# 打印数组
print(arr)
# 打印数组形状
print(arr.shape)
# 打印数组数据类型
print(arr.dtype)
NumPy 提供了多种方法创建数组,包括:
import numpy as np
# 从 Python 列表创建数组
arr1 = np.array([1,2,3,4,5])
# 创建全 0 数组
arr2 = np.zeros((2,3))
# 创建全 1 数组
arr3 = np.ones((2,3))
# 创建等差数组
arr4 = np.arange(0,10,2)
# 创建均匀分布数组
arr5 = np.linspace(0,1,5)
# 创建随机数组
arr6 = np.random.rand(2,3)
arr7 = np.random.randint(0,10,(2,3))
NumPy 提供了多种数组操作方法,包括:
import numpy as np
arr = np.array([[1,2,3],[4,5,6]])
# 数组形状操作
print(arr.reshape(3,2)) # 重塑数组形状
print(arr.flatten()) # 扁平化数组
print(arr.ravel()) # 扁平化数组
# 数组连接操作
arr1 = np.array([[1,2],[3,4]])
arr2 = np.array([[5,6],[7,8]])
print(np.concatenate((arr1, arr2), axis=0)) # 垂直连接数组
print(np.concatenate((arr1, arr2), axis=1)) # 水平连接数组
print(np.vstack((arr1, arr2))) # 垂直连接数组
print(np.hstack((arr1, arr2))) # 水平连接数组
# 数组分割操作
arr3 = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
print(np.split(arr3,2, axis=0)) # 垂直分割数组
print(np.split(arr3,3, axis=1)) # 水平分割数组
print(np.vsplit(arr3,2)) # 垂直分割数组
print(np.hsplit(arr3,3)) # 水平分割数组
# 数组复制操作
arr4 = np.array([1,2,3,4,5])
arr5 = arr4.copy() # 深度复制
arr6 = arr4.view() # 浅拷贝
arr4[0]=10
print(arr5) # [1 2 3 4 5]
print(arr6) # [10 2 3 4 5]
NumPy 提供了向量化运算功能,可以对数组中的每个元素进行快速运算,避免使用 Python 的循环,提高运算效率。
import numpy as np
arr = np.array([1,2,3,4,5])
# 向量化加法
arr1 = arr + 1
print(arr1) # [2 3 4 5 6]
# 向量化乘法
arr2 = arr * 2
print(arr2) # [ 2 4 6 8 10]
# 向量化平方
arr3 = arr ** 2
print(arr3) # [ 1 4 9 16 25]
# 向量化三角函数
arr4 = np.sin(arr)
print(arr4) # [ 0.84147098 0.90929743 0.14112001 -0.7568025 -0.95892427]
NumPy 提供了广播机制,可以对形状不同的数组进行运算,只要它们的形状满足一定的条件。
import numpy as np
arr1 = np.array([[1,2,3],[4,5,6]])
arr2 = np.array([10,20,30])
# 广播机制
arr3 = arr1 + arr2
print(arr3) # [[11 22 33] [44 55 66]]
NumPy 提供了丰富的数学函数,包括:
import numpy as np
arr = np.array([[1,2,3],[4,5,6],[7,8,9]])
# 算术函数
print(np.add(arr,1)) # 加法
print(np.subtract(arr,1))# 减法
print(np.multiply(arr,2))# 乘法
print(np.divide(arr,2)) # 除法
print(np.sqrt(arr)) # 平方根
print(np.power(arr,2)) # 平方
# 三角函数
print(np.sin(arr)) # 正弦
print(np.cos(arr)) # 余弦
print(np.tan(arr)) # 正切
print(np.arcsin(arr)) # 反正弦
print(np.arccos(arr)) # 反余弦
print(np.arctan(arr)) # 反正切
# 统计函数
print(np.mean(arr)) # 平均值
print(np.median(arr)) # 中位数
print(np.std(arr)) # 标准差
print(np.var(arr)) # 方差
print(np.min(arr)) # 最小值
print(np.max(arr)) # 最大值
# 线性代数函数
arr1 = np.array([[1,2],[3,4]])
arr2 = np.array([[5,6],[7,8]])
print(np.dot(arr1, arr2)) # 矩阵乘法
print(arr1.transpose()) # 矩阵转置
print(np.linalg.inv(arr1)) # 矩阵逆
print(np.linalg.det(arr1)) # 矩阵行列式
Pandas 是 Python 中用于数据分析和处理的核心库。它提供了两种主要的数据结构:Series 和 DataFrame,可以用于处理各种类型的数据。
Pandas 可以通过 pip 安装:
pip install pandas
以下是一个简单的 Pandas 使用示例:
import pandas as pd
# 创建 Series
s = pd.Series([1,2,3,4,5], index=["a","b","c","d","e"])
# 打印 Series
print(s)
# 创建 DataFrame
data = {"name":["张三","李四","王五","赵六","钱七"],"age":[18,20,19,21,18],"gender":["男","女","男","女","男"],"score":[85,90,88,92,87]}
df = pd.DataFrame(data)
# 打印 DataFrame
print(df)
# 打印 DataFrame 形状
print(df.shape)
# 打印 DataFrame 数据类型
print(df.dtypes)
Pandas 支持多种数据格式的读取,包括 CSV、Excel、SQL、JSON 等。
import pandas as pd
# 读取 CSV 文件
df_csv = pd.read_csv("data.csv")
# 读取 Excel 文件
df_excel = pd.read_excel("data.xlsx")
# 读取 SQL 文件
import sqlite3
conn = sqlite3.connect("data.db")
df_sql = pd.read_sql("SELECT * FROM table", conn)
# 读取 JSON 文件
df_json = pd.read_json("data.json")
Pandas 支持多种数据格式的写入,包括 CSV、Excel、SQL、JSON 等。
import pandas as pd
data = {"name":["张三","李四","王五","赵六","钱七"],"age":[18,20,19,21,18],"gender":["男","女","男","女","男"],"score":[85,90,88,92,87]}
df = pd.DataFrame(data)
# 写入 CSV 文件
df.to_csv("data.csv", index=False)
# 写入 Excel 文件
df.to_excel("data.xlsx", index=False)
# 写入 SQL 文件
import sqlite3
conn = sqlite3.connect("data.db")
df.to_sql("table", conn, if_exists="replace", index=False)
# 写入 JSON 文件
df.to_json("data.json", orient="records")
数据清洗是数据分析过程中的重要环节,主要包括:
import pandas as pd
import numpy as np
data = {"name":["张三","李四","王五","赵六","钱七"],"age":[18, np.nan,19,21,18],"gender":["男","女","男","女","男"],"score":[85,90,88,92, np.nan]}
df = pd.DataFrame(data)
# 处理缺失值
print(df.dropna()) # 删除包含缺失值的行
print(df.fillna({"age":18,"score":85})) # 填充缺失值
# 处理重复值
df_duplicate = pd.DataFrame({"name":["张三","李四","张三","赵六","钱七"],"age":[18,20,18,21,18],"gender":["男","女","男","女","男"],"score":[85,90,85,92,87]})
print(df_duplicate.drop_duplicates()) # 删除重复值
# 处理异常值
df_outlier = pd.DataFrame({"name":["张三","李四","王五","赵六","钱七"],"age":[18,20,19,21,18],"gender":["男","女","男","女","男"],"score":[85,90,88,92,1000]})
print(df_outlier[df_outlier["score"]<100]) # 删除分数大于等于 100 的异常值
数据处理是数据分析过程中的另一个重要环节,主要包括:
import pandas as pd
data = {"name":["张三","李四","王五","赵六","钱七"],"age":[18,20,19,21,18],"gender":["男","女","男","女","男"],"score":[85,90,88,92,87]}
df = pd.DataFrame(data)
# 数据索引与切片
print(df.loc[0:2]) # 索引为 0 到 2 的行
print(df.loc[0:2,["name","age"]]) # 索引为 0 到 2 的行,列名为 name 和 age
print(df.iloc[0:3]) # 第 0 到 2 行
print(df.iloc[0:3,0:2]) # 第 0 到 2 行,第 0 到 1 列
# 数据过滤
print(df[df["score"]>88]) # 分数大于 88 的行
print(df[(df["score"]>88)&(df["gender"]=="女")]) # 分数大于 88 且性别为女的行
# 数据排序
print(df.sort_values("score")) # 按分数升序排序
print(df.sort_values("score", ascending=False)) # 按分数降序排序
# 数据分组与聚合
print(df.groupby("gender").mean()) # 按性别分组,计算平均值
print(df.groupby("gender").agg({"age":"mean","score":"max"})) # 按性别分组,计算年龄平均值和分数最大值
构建一个数据分析系统,能够读取数据文件,进行数据清洗、处理和分析,然后输出结果。
该数据分析系统的架构采用分层设计,分为以下几个层次:
该系统的数据存储方案包括以下几个部分:
首先,需要搭建开发环境。该系统使用 Python 作为开发语言,使用 Pandas 库进行数据处理和分析。
# 安装 Pandas 库
pip install pandas
# 安装 OpenPyXL 库(用于读取和写入 Excel 文件)
pip install openpyxl
# 安装 SQLAlchemy 库(用于读取和写入 SQL 文件)
pip install sqlalchemy
# 安装 PyMySQL 库(用于连接 MySQL 数据库)
pip install pymysql
数据文件读取是系统的基础功能。以下是数据文件读取的实现代码:
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
def read_data(file_path, file_type):
try:
if file_type == "csv":
df = pd.read_csv(file_path)
elif file_type == "excel":
df = pd.read_excel(file_path)
elif file_type == "sql":
conn = sqlite3.connect(file_path)
df = pd.read_sql("SELECT * FROM table", conn)
elif file_type == "json":
df = pd.read_json(file_path)
else:
print("不支持的文件格式")
return None
return df
except Exception as e:
print(f"读取数据文件失败:{e}")
return None
数据清洗是系统的核心功能。以下是数据清洗的实现代码:
import pandas as pd
import numpy as np
def clean_data(df, dropna_columns, fillna_values, drop_duplicates):
try:
# 处理缺失值
if dropna_columns:
df = df.dropna(subset=dropna_columns)
if fillna_values:
df = df.fillna(fillna_values)
# 处理重复值
if drop_duplicates:
df = df.drop_duplicates()
return df
except Exception as e:
print(f"数据清洗失败:{e}")
return None
数据处理是系统的另一个核心功能。以下是数据处理的实现代码:
import pandas as pd
def process_data(df, index_columns, filter_conditions, sort_columns, groupby_columns, agg_functions):
try:
# 数据索引与切片
if index_columns:
df = df.set_index(index_columns)
# 数据过滤
if filter_conditions:
for condition in filter_conditions:
df = df.query(condition)
# 数据排序
if sort_columns:
sort_ascending = []
for column in sort_columns:
if column.endswith("asc"):
sort_ascending.append(True)
elif column.endswith("desc"):
sort_ascending.append(False)
else:
sort_ascending.append(True)
df = df.sort_values([column.split(":")[0] for column in sort_columns], ascending=sort_ascending)
# 数据分组与聚合
if groupby_columns and agg_functions:
agg_dict = {}
for function in agg_functions:
column, agg_func = function.split(":")
agg_dict[column] = agg_func
df = df.groupby(groupby_columns).agg(agg_dict)
return df
except Exception as e:
print(f"数据处理失败:{e}")
return None
数据分析是系统的另一个核心功能。以下是数据分析的实现代码:
import pandas as pd
def analyze_data(df):
try:
# 数据描述性统计
describe = df.describe()
# 数据相关性分析
corr = df.corr()
return {"describe": describe, "corr": corr}
except Exception as e:
print(f"数据分析失败:{e}")
return None
分析结果导出是系统的另一个核心功能。以下是分析结果导出的实现代码:
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
def export_data(df, output_path, output_type):
try:
if output_type == "csv":
df.to_csv(output_path, index=False)
elif output_type == "excel":
df.to_excel(output_path, index=False)
elif output_type == "sql":
conn = sqlite3.connect(output_path)
df.to_sql("table", conn, if_exists="replace", index=False)
elif output_type == "json":
df.to_json(output_path, orient="records")
else:
print("不支持的文件格式")
return False
return True
except Exception as e:
print(f"导出数据失败:{e}")
return False
用户界面是系统的交互部分。以下是用户界面的实现代码:
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
import os
from data_reader import read_data
from data_cleaner import clean_data
from data_processor import process_data
from data_analyzer import analyze_data
from data_exporter import export_data
class DataAnalysisApp:
def __init__(self, root):
self.root = root
self.root.title("数据分析系统")
# 创建控件
self.file_label = tk.Label(root, text="数据文件:")
self.file_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.file_entry = tk.Entry(root, width=50)
self.file_entry.grid(row=0, column=1, padx=10, pady=10)
self.browse_button = tk.Button(root, text="浏览", command=self.browse_file)
self.browse_button.grid(row=0, column=2, padx=10, pady=10)
self.file_type_label = tk.Label(root, text="文件格式:")
self.file_type_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
self.file_type_var = tk.StringVar()
self.file_type_var.set("csv")
self.file_type_menu = tk.OptionMenu(root, self.file_type_var, "csv", "excel", "sql", "json")
self.file_type_menu.grid(row=1, column=1, padx=10, pady=10, sticky=tk.W)
self.clean_button = tk.Button(root, text="数据清洗", command=self.clean_data)
self.clean_button.grid(row=2, column=0, columnspan=3, padx=10, pady=10)
self.process_button = tk.Button(root, text="数据处理", command=self.process_data)
self.process_button.grid(row=3, column=0, columnspan=3, padx=10, pady=10)
self.analyze_button = tk.Button(root, text="数据分析", command=self.analyze_data)
self.analyze_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.export_label = tk.Label(root, text="导出路径:")
self.export_label.grid(row=5, column=0, padx=10, pady=10, sticky=tk.W)
self.export_entry = tk.Entry(root, width=50)
self.export_entry.grid(row=5, column=1, padx=10, pady=10)
self.browse_export_button = tk.Button(root, text="浏览", command=self.browse_export_path)
self.browse_export_button.grid(row=5, column=2, padx=10, pady=10)
self.export_type_label = tk.Label(root, text="导出格式:")
self.export_type_label.grid(row=6, column=0, padx=10, pady=10, sticky=tk.W)
self.export_type_var = tk.StringVar()
self.export_type_var.set("csv")
self.export_type_menu = tk.OptionMenu(root, self.export_type_var, "csv", "excel", "sql", "json")
self.export_type_menu.grid(row=6, column=1, padx=10, pady=10, sticky=tk.W)
self.export_button = tk.Button(root, text="导出数据", command=self.export_data)
self.export_button.grid(row=7, column=0, columnspan=3, padx=10, pady=10)
def browse_file(self):
file_path = filedialog.askopenfilename(filetypes=[("所有文件", "*.*")])
if file_path:
self.file_entry.delete(0, tk.END)
self.file_entry.insert(0, file_path)
def browse_export_path(self):
export_path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV 文件", "*.csv"), ("Excel 文件", "*.xlsx"), ("SQL 文件", "*.db"), ("JSON 文件", "*.json")])
if export_path:
self.export_entry.delete(0, tk.END)
self.export_entry.insert(0, export_path)
def clean_data(self):
file_path = self.file_entry.get()
if not file_path:
messagebox.showerror("错误", "请选择数据文件")
return
file_type = self.file_type_var.get()
df = read_data(file_path, file_type)
if df is None:
return
# 处理缺失值
dropna_columns = input("请输入删除缺失值的列名(用逗号分隔):").split(",")
fillna_values = input("请输入填充缺失值的列名和值(格式:列名 1:值 1,列名 2:值 2):").split(",")
fillna_dict = {}
for item in fillna_values:
if item:
column, value = item.split(":")
fillna_dict[column] = value
# 处理重复值
drop_duplicates = input("是否删除重复值(y/n):").lower() == "y"
df_clean = clean_data(df, dropna_columns, fillna_dict, drop_duplicates)
if df_clean is None:
return
self.df_clean = df_clean
messagebox.showinfo("成功", "数据清洗完成")
def process_data(self):
if not hasattr(self, "df_clean"):
messagebox.showerror("错误", "请先进行数据清洗")
return
# 数据索引与切片
index_columns = input("请输入索引列名(用逗号分隔):").split(",")
index_columns = [column for column in index_columns if column]
# 数据过滤
filter_conditions = input("请输入过滤条件(用逗号分隔,格式:列名>值):").split(",")
filter_conditions = [condition for condition in filter_conditions if condition]
# 数据排序
sort_columns = input("请输入排序列名(用逗号分隔,格式:列名:asc/desc):").split(",")
sort_columns = [column for column in sort_columns if column]
# 数据分组与聚合
groupby_columns = input("请输入分组列名(用逗号分隔):").split(",")
groupby_columns = [column for column in groupby_columns if column]
agg_functions = input("请输入聚合函数(用逗号分隔,格式:列名:函数名):").split(",")
agg_functions = [function for function in agg_functions if function]
df_processed = process_data(self.df_clean, index_columns, filter_conditions, sort_columns, groupby_columns, agg_functions)
if df_processed is None:
return
self.df_processed = df_processed
messagebox.showinfo("成功", "数据处理完成")
def analyze_data(self):
if not hasattr(self, "df_processed"):
messagebox.showerror("错误", "请先进行数据处理")
return
analyze_result = analyze_data(self.df_processed)
if analyze_result is None:
return
self.analyze_result = analyze_result
print("数据描述性统计:")
print(self.analyze_result["describe"])
print("\n数据相关性分析:")
print(self.analyze_result["corr"])
messagebox.showinfo("成功", "数据分析完成")
def export_data(self):
if not hasattr(self, "df_processed"):
messagebox.showerror("错误", "请先进行数据处理")
return
export_path = self.export_entry.get()
if not export_path:
messagebox.showerror("错误", "请选择导出路径")
return
export_type = self.export_type_var.get()
success = export_data(self.df_processed, export_path, export_type)
if success:
messagebox.showinfo("成功", "数据导出完成")
else:
messagebox.showerror("错误", "数据导出失败")
if __name__ == "__main__":
root = tk.Tk()
app = DataAnalysisApp(root)
root.mainloop()
运行系统时,需要执行以下步骤:
系统测试时,需要使用一些测试数据。以下是一个简单的测试数据示例:
name,age,gender,score
张三,18,男,85
李四,20,女,90
王五,19,男,88
赵六,21,女,92
钱七,18,男,87
将测试数据保存为 test_data.csv 文件,然后运行系统,选择该文件进行分析。
本章介绍了 NumPy 和 Pandas 的基本概念、重要性和应用场景,以及 NumPy 数组的创建与操作、向量化运算与广播机制,Pandas 的数据读取与写入、数据清洗与处理、数据分组与聚合操作。最后,通过实战项目,展示了如何开发一个数据分析系统。
NumPy 和 Pandas 是 Python 中用于数据分析和处理的核心库。通过学习本章的内容,读者可以掌握 NumPy 和 Pandas 的基本方法和技巧,具备数据分析和处理的能力。同时,通过实战项目,读者可以将所学知识应用到实际项目中,进一步提升自己的技能水平。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online