跳到主要内容
Python + MySQL + Web 构建私有 Apple 设备监控系统 | 极客日志
Python Node.js 大前端
Python + MySQL + Web 构建私有 Apple 设备监控系统 综述由AI生成 > 一个完整的全栈项目实战:从 iCloud 获取设备信息,存储在 MySQL 数据库,通过 RESTful API 提供数据接口,并打造美观的 Web 监控大屏。 作为 Apple 生态的重度用户,我拥有 iPhone、iPad、MacBook 等多台设备。日常使用中,我希望能在一个统一的界面上查看所有设备的电量、在线状态等信息。虽然 Apple 提供了"查找"应用,但我想要: **私有化部署*…
剑仙 发布于 2026/4/6 更新于 2026/5/20 33K 浏览
一个完整的全栈项目实战:从 iCloud 获取设备信息,存储在 MySQL 数据库,通过 RESTful API 提供数据接口,并打造美观的 Web 监控大屏。
作为 Apple 生态的重度用户,我拥有 iPhone、iPad、MacBook 等多台设备。日常使用中,我希望能在一个统一的界面上查看所有设备的电量、在线状态等信息。虽然 Apple 提供了"查找"应用,但我想要:
私有化部署 :数据存储在自己控制的服务器上
历史记录 :可以查看设备状态的历史变化
自定义展示 :根据需求定制化的监控界面
API 接口 :方便与其他系统集成
于是,我决定自己动手搭建一个完整的设备监控系统。
项目功能
✅ 自动从 iCloud 获取设备信息(电量、状态、位置等)
✅ 数据持久化存储到 MySQL 数据库
✅ RESTful API 提供灵活的数据查询接口
✅ 美观的 Web 监控大屏,支持实时刷新
✅ 按设备类型分类展示(手机、平板、电脑)
✅ 设备统计信息(总数、在线数、平均电量等)
✅ 定时任务自动更新设备状态
技术栈
Python 3 + pyicloud - 数据采集
Node.js + Koa2 - API 服务
MySQL - 数据存储
HTML5 + Tailwind CSS + Vanilla JavaScript - 前端展示
架构设计
iCloud API → Python 脚本 → MySQL 数据库 → Koa2 API → Web 前端
完整代码实现
1. Python 数据采集脚本
安装依赖
pip3 install pyicloud mysql-connector-python
完整代码(update_devices.py)
""" iCloud 设备信息同步脚本
用途:定时拉取设备状态(电量等)存入 MySQL
"""
import os
import sys
import json
import logging
from datetime import datetime
ICLOUD_EMAIL = "[email protected] "
ICLOUD_PASSWORD = "your_app_specific_password"
CHINA_MAINLAND =
MYSQL_CONFIG = {
: ,
: ,
: ,
: ,
: ,
:
}
LOG_FILE =
LOG_FILE:
log_dir = os.path.dirname(LOG_FILE)
log_dir:
:
os.makedirs(log_dir, exist_ok= )
(OSError, PermissionError) e:
( )
LOG_FILE =
LOG_FILE:
logging.basicConfig(
level=logging.INFO,
= ,
handlers=[
logging.FileHandler(LOG_FILE, encoding= ),
logging.StreamHandler(sys.stdout)
]
)
:
logging.basicConfig(
level=logging.INFO,
= ,
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
():
:
logger.info( )
pyicloud PyiCloudService
pyicloud.exceptions PyiCloudFailedLoginException, PyiCloudServiceUnavailable
:
api = PyiCloudService(ICLOUD_EMAIL, ICLOUD_PASSWORD, china_mainland=CHINA_MAINLAND)
PyiCloudFailedLoginException e:
logger.error( )
sys.exit( )
IS_INTERACTIVE = os.isatty(sys.stdin.fileno()) (sys.stdin, )
api.requires_2fa:
IS_INTERACTIVE:
logger.error( )
logger.error( )
sys.exit( )
logger.info( )
security_key_names = api.security_key_names
security_key_names:
logger.info( )
devices = api.fido2_devices
logger.info( )
idx, dev (devices, start= ):
logger.info( )
choice = ( )
choice:
choice =
:
choice = (choice)
selected_device = devices[choice - ]
logger.info( )
api.confirm_security_key(selected_device)
:
logger.info( )
code = ( )
result = api.validate_2fa_code(code)
logger.info( )
result:
logger.error( )
sys.exit( )
api.is_trusted_session:
logger.info( )
result = api.trust_session()
logger.info( )
api.requires_2sa:
IS_INTERACTIVE:
logger.error( )
sys.exit( )
logger.info( )
logger.info( )
devices = api.trusted_devices
i, device (devices):
device_name = device.get( , )
logger.info( )
device_choice = ( )
device_choice:
device_choice =
:
device_choice = (device_choice)
device = devices[device_choice]
api.send_verification_code(device):
logger.error( )
sys.exit( )
code = ( )
api.validate_verification_code(device, code):
logger.error( )
sys.exit( )
logger.info( )
mysql.connector
db = mysql.connector.connect(**MYSQL_CONFIG)
cursor = db.cursor()
cursor.execute( )
:
devices = api.devices
PyiCloudServiceUnavailable e:
logger.error( )
sys.exit( )
devices:
logger.warning( )
count =
dev devices:
:
requested_properties = [ , , , , , , , , , , , , , , , , , , , , , , ]
:
status = dev.status(requested_properties)
TypeError:
status = dev.status()
device_id = (status.get( ) status.get( ) status.get( ) status.get( ) (dev. ) (dev, ) (dev))
name = status.get( ) status.get( , )
model = status.get( ) status.get( , )
device_class = status.get( ) status.get( , )
battery_level = status.get( )
battery_status = status.get( , )
os_version = (status.get( ) status.get( ) status.get( ) )
location = status.get( )
location (dev, ):
:
location_data = dev.location()
location_data:
location = location_data
Exception:
last_seen = (status.get( ) status.get( ) status.get( ))
is_online = status.get( ) ==
(last_seen, ( , )):
last_seen = datetime.fromtimestamp(last_seen / )
:
last_seen =
loc_str = json.dumps(location, ensure_ascii= ) location
sql =
cursor.execute(sql, (
device_id, name, model, device_class, battery_level, battery_status, os_version, loc_str, last_seen, is_online, json.dumps(status, ensure_ascii= )
))
count +=
logger.info( )
Exception e:
logger.error( , exc_info= )
db.commit()
logger.info( )
Exception e:
logger.error( , exc_info= )
sys.exit( )
:
:
cursor.close()
db.close()
:
__name__ == :
main()
True
"host"
"localhost"
"port"
3306
"user"
"your_username"
"password"
"your_password"
"database"
"your_database"
"charset"
"utf8mb4"
""
if
if
try
True
except
as
print
f"警告:无法创建日志目录 {log_dir} : {e} "
""
if
format
'%(asctime)s | %(levelname)s | %(message)s'
'utf-8'
else
format
'%(asctime)s | %(levelname)s | %(message)s'
def
main
try
"▶ 开始同步 iCloud 设备信息..."
from
import
from
import
try
except
as
f"❌ 登录失败:{e} "
1
if
hasattr
'fileno'
else
False
if
if
not
"❌ 需要双重认证(2FA),但当前运行在非交互模式下"
"请先手动运行一次脚本来完成认证"
2
"需要进行两步验证(2FA)"
if
f"需要安全密钥确认。请插入以下密钥之一:{', ' .join(security_key_names)} "
"可用的 FIDO2 设备:"
for
in
enumerate
1
f" {idx} : {dev} "
input
"请选择 FIDO2 设备编号(直接回车使用第一个): "
if
not
1
else
int
1
"请使用安全密钥确认操作"
else
"验证码已发送到你已批准的设备上。"
input
"请输入收到的验证码:"
f"验证码验证结果:{result} "
if
not
"验证码验证失败"
1
if
not
"会话未被信任。正在请求信任..."
f"会话信任结果:{result} "
elif
if
not
"❌ 需要两步认证(2SA),但当前运行在非交互模式下"
2
"需要进行两步认证(2SA)"
"你的可信任设备:"
for
in
enumerate
'deviceName'
f"SMS to {device.get('phoneNumber' , '未知' )} "
f" {i} : {device_name} "
input
'请选择要使用的设备编号(直接回车使用第一个): '
if
not
0
else
int
if
not
"发送验证码失败"
1
input
'请输入验证码:'
if
not
"验证码验证失败"
1
"✓ 认证完成"
import
"""
CREATE TABLE IF NOT EXISTS `devices` (
`id` INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
`device_id` VARCHAR(100) NOT NULL UNIQUE,
`name` VARCHAR(100),
`model` VARCHAR(60),
`device_class` VARCHAR(20),
`battery_level` DECIMAL(3,2),
`battery_status` VARCHAR(20),
`os_version` VARCHAR(30),
`last_location` TEXT,
`last_seen` DATETIME,
`is_online` BOOLEAN,
`raw_data` JSON,
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
try
except
as
f"❌ Find My iPhone 服务不可用:{e} "
1
if
not
"⚠ 未找到任何设备"
return
0
for
in
try
'deviceDisplayName'
'name'
'deviceStatus'
'batteryLevel'
'batteryStatus'
'deviceModel'
'model'
'modelDisplayName'
'deviceClass'
'deviceClassDisplay'
'deviceType'
'osVersion'
'deviceOSType'
'osVersionDisplay'
'systemVersion'
'serialNumber'
'id'
'deviceId'
'location'
'deviceStatusTime'
'timestamp'
'locationTimeStamp'
'timeStamp'
try
except
'serialNumber'
or
'id'
or
'deviceId'
or
'deviceDisplayName'
or
str
id
if
hasattr
'id'
else
str
'deviceDisplayName'
or
'name'
'Unknown'
'deviceModel'
or
'model'
'Unknown'
'deviceClass'
or
'deviceClassDisplay'
'Unknown'
'batteryLevel'
'batteryStatus'
'Unknown'
'osVersion'
or
'deviceOSType'
or
'osVersionDisplay'
or
'Unknown'
'location'
if
not
and
hasattr
'location'
try
if
except
pass
'deviceStatusTime'
or
'timestamp'
or
'locationTimeStamp'
'deviceStatus'
'200'
if
isinstance
int
float
1000.0
else
None
False
if
else
None
"""
INSERT INTO `devices` (
device_id, name, model, device_class, battery_level, battery_status, os_version, last_location, last_seen, is_online, raw_data
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
model = VALUES(model),
battery_level = VALUES(battery_level),
battery_status = VALUES(battery_status),
os_version = VALUES(os_version),
last_location = VALUES(last_location),
last_seen = VALUES(last_seen),
is_online = VALUES(is_online),
raw_data = VALUES(raw_data),
updated_at = NOW()
"""
False
1
f"✓ 已同步:{name} | 电量:{battery_level or 'N/A' } "
except
as
f"⚠ 设备同步失败 ({dev} ): {e} "
True
f"✅ 同步完成!共 {count} 台设备"
except
as
f"❌ 全局错误:{e} "
True
1
finally
try
except
pass
if
"__main__"
2. Node.js API 服务
安装依赖 npm install koa koa-router koa-bodyparser mysql2 dotenv
完整代码(app.js) const Koa = require ('koa' );
const Router = require ('koa-router' );
const bodyParser = require ('koa-bodyparser' );
require ('dotenv' ).config ();
const app = new Koa ();
const router = new Router ();
const dbConfig = {
host : process.env .DB_HOST || 'localhost' ,
port : process.env .DB_PORT || 3306 ,
user : process.env .DB_USER || 'root' ,
password : process.env .DB_PASSWORD || '' ,
database : process.env .DB_NAME || 'devices' ,
charset : 'utf8mb4'
};
const mysql = require ('mysql2/promise' );
app.use (async (ctx, next) => {
ctx.set ('Access-Control-Allow-Origin' , '*' );
ctx.set ('Access-Control-Allow-Methods' , 'GET, POST, PUT, DELETE, OPTIONS' );
ctx.set ('Access-Control-Allow-Headers' , 'Content-Type, Authorization, X-Requested-With' );
ctx.set ('Access-Control-Allow-Credentials' , 'true' );
if (ctx.method === 'OPTIONS' ) {
ctx.status = 204 ;
return ;
}
await next ();
});
app.use (bodyParser ());
let pool = null ;
async function getPool ( ) {
if (!pool) {
pool = mysql.createPool ({
...dbConfig,
waitForConnections : true ,
connectionLimit : 10 ,
queueLimit : 0
});
}
return pool;
}
router.get ('/api/devices/all' , async (ctx) => {
try {
const pool = await getPool ();
const [rows] = await pool.execute (`
SELECT id, device_id, name, model, device_class, battery_level, battery_status, os_version, last_location, last_seen, is_online, raw_data, updated_at
FROM devices ORDER BY updated_at DESC
` );
const devices = rows.map (device => {
let parsedData = {};
try {
if (device.raw_data ) {
parsedData = typeof device.raw_data === 'string' ? JSON .parse (device.raw_data ) : device.raw_data ;
}
} catch (e) {
console .error ('解析 raw_data 失败:' , e);
}
let parsedLocation = null ;
try {
if (device.last_location ) {
parsedLocation = typeof device.last_location === 'string' ? JSON .parse (device.last_location ) : device.last_location ;
}
} catch (e) {
console .error ('解析 last_location 失败:' , e);
}
return { ...device, raw_data : parsedData, last_location : parsedLocation };
});
ctx.body = { code : 200 , message : 'success' , data : devices, total : devices.length , timestamp : new Date ().toISOString () };
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '查询失败' , error : error.message };
}
});
router.get ('/api/devices' , async (ctx) => {
try {
const pool = await getPool ();
const [rows] = await pool.execute (`
SELECT id, device_id, name, model, device_class, battery_level, battery_status, os_version, last_location, last_seen, is_online, updated_at
FROM devices ORDER BY updated_at DESC
` );
ctx.body = { code : 200 , message : 'success' , data : rows, total : rows.length };
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '查询失败' , error : error.message };
}
});
router.get ('/api/devices/online' , async (ctx) => {
try {
const pool = await getPool ();
const [rows] = await pool.execute (`
SELECT * FROM devices WHERE is_online = 1 ORDER BY updated_at DESC
` );
ctx.body = { code : 200 , message : 'success' , data : rows, total : rows.length };
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '查询失败' , error : error.message };
}
});
router.get ('/api/devices/low-battery' , async (ctx) => {
try {
const pool = await getPool ();
const [rows] = await pool.execute (`
SELECT * FROM devices WHERE battery_level < 0.3 AND battery_level IS NOT NULL ORDER BY battery_level ASC
` );
ctx.body = { code : 200 , message : 'success' , data : rows, total : rows.length };
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '查询失败' , error : error.message };
}
});
router.get ('/api/devices/stats' , async (ctx) => {
try {
const pool = await getPool ();
const [totalRows] = await pool.execute ('SELECT COUNT(*) as total FROM devices' );
const [onlineRows] = await pool.execute ('SELECT COUNT(*) as count FROM devices WHERE is_online = 1' );
const [lowBatteryRows] = await pool.execute ('SELECT COUNT(*) as count FROM devices WHERE battery_level < 0.3 AND battery_level IS NOT NULL' );
const [avgBatteryRows] = await pool.execute ('SELECT AVG(battery_level) as avg FROM devices WHERE battery_level IS NOT NULL' );
const [typeRows] = await pool.execute (`
SELECT device_class, COUNT(*) as count FROM devices WHERE device_class IS NOT NULL GROUP BY device_class
` );
ctx.body = {
code : 200 ,
message : 'success' ,
data : {
total : totalRows[0 ].total ,
online : onlineRows[0 ].count ,
offline : totalRows[0 ].total - onlineRows[0 ].count ,
lowBattery : lowBatteryRows[0 ].count ,
avgBattery : avgBatteryRows[0 ].avg ? parseFloat (avgBatteryRows[0 ].avg ).toFixed (2 ) : null ,
byType : typeRows
}
};
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '查询失败' , error : error.message };
}
});
router.get ('/api/health' , async (ctx) => {
try {
const pool = await getPool ();
await pool.execute ('SELECT 1' );
ctx.body = { code : 200 , message : '服务正常' , timestamp : new Date ().toISOString () };
} catch (error) {
ctx.status = 500 ;
ctx.body = { code : 500 , message : '数据库连接失败' , error : error.message };
}
});
app.use (router.routes ()).use (router.allowedMethods ());
const PORT = process.env .PORT || 3000 ;
app.listen (PORT , () => {
console .log (`🚀 服务器运行在 http://localhost:${PORT} ` );
});
3. Web 前端页面
完整代码(index.html) 由于 HTML 代码较长,这里提供关键部分。完整代码请查看项目仓库或根据以下结构自行实现:
设备卡片渲染 :根据设备类型(手机/平板/电脑)渲染不同的卡片样式
电量状态判断 :自动识别电量格式(小数/百分比),正确显示状态
自动刷新 :每 30 秒自动更新设备数据
响应式布局 :使用 Tailwind CSS 实现美观的界面
const API_BASE_URL = 'http://your-api-server:3000' ;
function normalizeBatteryLevel (batteryLevel ) {
if (batteryLevel === null || batteryLevel === undefined ) return null ;
if (batteryLevel > 1 ) {
return batteryLevel / 100 ;
}
return batteryLevel;
}
function getBatteryColor (batteryLevel ) {
const normalized = normalizeBatteryLevel (batteryLevel);
if (normalized === null ) return 'bg-gray-400' ;
if (normalized >= 0.8 ) return 'bg-green-500' ;
if (normalized >= 0.3 ) return 'bg-orange-500' ;
return 'bg-red-500' ;
}
function getStatusBadge (device ) {
const batteryLevel = device.battery_level ;
const isOnline = device.is_online ;
if (!isOnline) {
return '<span>离线</span>' ;
}
if (batteryLevel === null || batteryLevel === undefined ) {
return '<span>在线</span>' ;
}
const normalized = normalizeBatteryLevel (batteryLevel);
if (normalized >= 0.8 ) {
return '<span>电量充足</span>' ;
} else if (normalized >= 0.3 ) {
return '<span>电量中等</span>' ;
} else {
return '<span>电量不足</span>' ;
}
}
async function loadDevices ( ) {
try {
const response = await fetch (`${API_BASE_URL} /api/devices/all` );
const result = await response.json ();
if (result.code === 200 && result.data ) {
const devices = result.data ;
const phoneDevices = devices.filter (d => d.device_class ?.toLowerCase ().includes ('iphone' ) || d.name ?.toLowerCase ().includes ('iphone' ));
const tabletDevices = devices.filter (d => d.device_class ?.toLowerCase ().includes ('ipad' ) || d.name ?.toLowerCase ().includes ('ipad' ));
const computerDevices = devices.filter (d => d.device_class ?.toLowerCase ().includes ('mac' ) || d.name ?.toLowerCase ().includes ('mac' ));
document .getElementById ('phoneDevices' ).innerHTML = phoneDevices.map (device => renderDeviceCard (device)).join ('' );
updateStats (devices);
}
} catch (error) {
console .error ('加载设备失败:' , error);
}
}
document .addEventListener ('DOMContentLoaded' , () => {
loadDevices ();
setInterval (loadDevices, 30000 );
});
完整 HTML 代码请参考项目中的 web/index.html 文件。
配置说明
环境变量配置 DB_HOST=localhost
DB_PORT=3306
DB_USER=your_username
DB_PASSWORD=your_password
DB_NAME=your_database
PORT=3000
Python 脚本配置 修改 update_devices.py 中的配置:
ICLOUD_EMAIL = "[email protected] "
ICLOUD_PASSWORD = "your_app_specific_password"
CHINA_MAINLAND = True
MYSQL_CONFIG = {
"host" : "localhost" ,
"port" : 3306 ,
"user" : "your_username" ,
"password" : "your_password" ,
"database" : "your_database" ,
"charset" : "utf8mb4"
}
关键要点
1. batteryLevel 格式 根据 pyicloud 文档 ,batteryLevel 是 0-1 之间的小数(如 0.85 表示 85%)。前端代码已兼容两种格式(小数和百分比),会自动识别并转换。
2. 设备状态码
deviceStatus = "200" - 设备在线
deviceStatus = "201" - 设备离线
3. 离线设备限制 离线时 iCloud API 只返回基本信息(名称、电量),无法获取型号、系统版本等详细信息。这是 iCloud API 的限制,不是代码问题。
4. 会话管理 认证后的会话保存在 .pyicloud/ 目录,有效期约 2 个月。过期后需要重新手动运行脚本完成认证。
使用方法
1. 首次运行
python3 update_devices.py
cd api
npm install
npm start
open index.html
2. 设置定时任务
launchctl load ~/Library/LaunchAgents/com.icloud.devices.update.plist
crontab -e
*/5 * * * * cd /path/to/project && /usr/bin/python3 update_devices.py >> logs/cron.log 2>&1
参考资源 相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online