Python 爬虫项目:爬取 B 站直播弹幕数据(附完整可运行代码)
第七届土木建筑及灾害防控国际学术会议(CADPC 2026)

时间:2026年1月30日
地点:线上会议


前言
随着互联网内容生态的不断丰富,直播弹幕作为实时互动的核心载体,蕴含着巨大的用户行为分析价值。B 站(哔哩哔哩)作为国内领先的二次元社区和直播平台,其弹幕数据不仅反映了用户的实时情绪,也为内容运营、用户画像构建、舆情分析等场景提供了重要的数据支撑。本文将从实战角度出发,系统讲解如何使用 Python 实现 B 站直播弹幕的爬取,涵盖协议分析、数据解析、实时抓取、数据存储全流程,代码经过实测可直接运行,同时深入剖析弹幕抓取的底层原理,帮助开发者从根本上理解爬虫开发的核心逻辑。
摘要
核心目标:基于 Python 实现 B 站直播弹幕的实时抓取、解析与存储;技术栈:Python 3.8+、requests、websocket-client、json、pandas;实战链接:B 站直播广场(可自行选择任意直播间测试);核心原理:B 站直播弹幕基于 WebSocket 协议进行实时推送,通过解析直播间真实 ID、建立 WebSocket 连接、解析弹幕协议格式,实现弹幕数据的实时获取;最终效果:可实时抓取直播间的弹幕内容、发送者信息、发送时间、弹幕类型等核心数据,并存储为 CSV 文件用于后续分析。
一、B 站直播弹幕爬虫开发前置知识
1.1 弹幕传输协议分析
B 站直播弹幕并非通过传统的 HTTP/HTTPS 请求获取,而是采用WebSocket 协议进行实时双向通信。与 HTTP 的 “请求 - 响应” 模式不同,WebSocket 在客户端和服务器建立一次连接后,可实现全双工通信,服务器能主动向客户端推送数据,这也是弹幕能够实时显示的核心原因。
B 站直播弹幕的 WebSocket 连接地址格式为:
plaintext
wss://broadcastlv.chat.bilibili.com/sub 客户端需要向该地址发送包含直播间信息的握手包,服务器验证通过后,会持续推送弹幕、礼物、互动等消息流。
1.2 关键依赖库说明
在开始开发前,需安装以下核心依赖库,各库的作用如下表所示:
| 库名称 | 版本要求 | 核心作用 |
|---|---|---|
| requests | ≥2.25.1 | 发送 HTTP 请求,获取直播间基础信息、真实房间 ID |
| websocket-client | ≥1.3.2 | 建立 WebSocket 连接,接收实时弹幕数据 |
| json | 内置库 | 解析弹幕数据的 JSON 格式内容 |
| pandas | ≥1.4.2 | 数据清洗、结构化存储为 CSV 文件 |
| time | 内置库 | 时间戳转换、程序延时控制 |
| struct | 内置库 | 处理 B 站弹幕协议的二进制数据打包 / 解包 |
安装命令:
bash
运行
pip install requests websocket-client pandas 二、爬虫开发全流程
2.1 步骤 1:获取直播间真实 ID
B 站直播间存在 “短 ID” 和 “真实 ID” 的区别(例如直播间 URL 显示的是live.bilibili.com/123456,但真实 ID 可能是一串更长的数字),直接使用短 ID 无法建立有效的 WebSocket 连接,因此需要先通过 HTTP 请求获取真实房间 ID。
2.1.1 核心代码
python
运行
import requests import json def get_real_room_id(short_room_id): """ 根据直播间短ID获取真实房间ID :param short_room_id: 直播间短ID(如URL中的数字) :return: 真实房间ID """ # 请求头,模拟浏览器访问,避免被反爬 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://live.bilibili.com/", "Accept": "application/json, text/plain, */*" } # 获取真实房间ID的API接口 url = f"https://api.live.bilibili.com/room/v1/Room/room_init?id={short_room_id}" try: response = requests.get(url, headers=headers, timeout=10) # 校验响应状态 if response.status_code == 200: data = response.json() if data["code"] == 0: real_room_id = data["data"]["room_id"] print(f"成功获取真实房间ID:{real_room_id}") return real_room_id else: raise Exception(f"获取真实房间ID失败,错误信息:{data['message']}") else: raise Exception(f"请求失败,状态码:{response.status_code}") except Exception as e: print(f"获取真实房间ID异常:{str(e)}") return None # 测试代码 if __name__ == "__main__": # 替换为你要爬取的直播间短ID test_room_id = 290902 real_id = get_real_room_id(test_room_id) 2.1.2 代码输出结果
plaintext
成功获取真实房间ID:290902 2.1.3 原理说明
- 调用 B 站官方提供的
room_init接口,传入直播间短 ID; - 接口返回的 JSON 数据中,
data.room_id字段即为真实房间 ID; - 请求头中添加
User-Agent和Referer是为了模拟正常的浏览器访问,避免被 B 站的反爬机制拦截; - 增加异常处理,确保接口请求失败时能输出明确的错误信息,提升代码健壮性。
2.2 步骤 2:构建 WebSocket 握手包
获取真实房间 ID 后,需要向 B 站的弹幕服务器发送握手包,完成 WebSocket 连接的初始化。B 站的弹幕协议采用二进制格式封装,需按照固定格式打包数据。
2.2.1 核心代码
python
运行
import struct def build_ws_handshake_data(room_id): """ 构建B站弹幕WebSocket握手包 :param room_id: 真实房间ID :return: 打包后的二进制握手数据 """ # 握手包的JSON内容 handshake_json = { "uid": 0, # 游客身份,无需登录,填0即可 "roomid": room_id, # 真实房间ID "protover": 3, # 协议版本,固定为3 "platform": "web", # 平台类型,固定为web "type": 2 # 连接类型,固定为2 } json_str = json.dumps(handshake_json) # B站弹幕协议格式:封包长度(4字节) + 头部长度(2字节) + 协议版本(2字节) + 操作码(4字节) + SEQ(4字节) + 数据 packet_len = 16 + len(json_str) # 封包总长度 = 头部长度(16) + 数据长度 header_len = 16 # 头部固定长度16字节 protover = 3 # 协议版本 op = 7 # 操作码,7表示握手 seq = 1 # 序列号,固定为1 # 按照大端序打包二进制数据 data = struct.pack( ">I2H2I", # 格式说明:I(4字节)、2H(2个2字节)、2I(2个4字节) packet_len, header_len, protover, op, seq ) + json_str.encode("utf-8") return data # 测试代码 if __name__ == "__main__": real_id = 290902 handshake_data = build_ws_handshake_data(real_id) print(f"握手包长度:{len(handshake_data)} 字节") print(f"握手包二进制数据(前20字节):{handshake_data[:20]}") 2.2.2 代码输出结果
plaintext
握手包长度:88 字节 握手包二进制数据(前20字节):b'\x00\x00\x00X\x00\x10\x00\x03\x00\x00\x00\x07\x00\x00\x00\x01{"uid":0,"roomid":290902,"protover":3,"platform":"web","type":2}' 2.2.3 原理说明
- B 站弹幕协议采用二进制封包格式,必须严格按照 “封包长度 + 头部长度 + 协议版本 + 操作码 + SEQ + 数据” 的结构打包;
- 操作码
op=7表示握手请求,服务器接收后会返回op=8的确认包,标识连接成功; protover=3是推荐的协议版本,支持压缩数据,兼容性更好;uid=0表示以游客身份连接,无需登录 B 站账号,降低了爬取门槛。
2.3 步骤 3:建立 WebSocket 连接并接收弹幕数据
完成握手包构建后,即可建立 WebSocket 连接,发送握手包,并持续接收服务器推送的弹幕数据。
2.3.1 核心代码
python
运行
import websocket import threading import time import pandas as pd from datetime import datetime # 全局变量,用于存储弹幕数据 danmaku_list = [] # 控制爬虫运行状态 is_running = True def on_message(ws, message): """ WebSocket消息回调函数:处理服务器推送的消息 :param ws: WebSocket连接对象 :param message: 服务器推送的二进制消息 """ global danmaku_list # 解析二进制消息头部 if len(message) < 16: return # 数据长度不足,忽略 # 解包头部信息 packet_len = struct.unpack(">I", message[:4])[0] header_len = struct.unpack(">H", message[4:6])[0] protover = struct.unpack(">H", message[6:8])[0] op = struct.unpack(">I", message[8:12])[0] seq = struct.unpack(">I", message[12:16])[0] # 操作码说明: # 8:握手确认;3:心跳响应;5:消息推送(包含弹幕、礼物等) if op == 8: print("WebSocket连接成功,开始接收弹幕数据...") elif op == 3: # 心跳响应,无需处理 pass elif op == 5: # 解析消息推送数据 data = message[header_len:packet_len] try: # 协议版本3返回的是JSON数组 msg_data = json.loads(data.decode("utf-8")) for msg in msg_data: # 只处理弹幕消息(cmd=DANMU_MSG) if msg.get("cmd") == "DANMU_MSG": # 解析弹幕核心信息 danmaku_info = { "发送时间": datetime.fromtimestamp(msg["info"][4]).strftime("%Y-%m-%d %H:%M:%S"), "发送者ID": msg["info"][2][0], "发送者昵称": msg["info"][2][1], "弹幕内容": msg["info"][1], "弹幕颜色": msg["info"][3][0], "礼物等级": msg["info"][4][0] if len(msg["info"]) > 4 else 0 } danmaku_list.append(danmaku_info) # 实时打印弹幕信息 print(f"[{danmaku_info['发送时间']}] {danmaku_info['发送者昵称']}: {danmaku_info['弹幕内容']}") except Exception as e: print(f"解析弹幕数据异常:{str(e)}") def on_error(ws, error): """WebSocket错误回调函数""" print(f"WebSocket错误:{error}") def on_close(ws, close_status_code, close_msg): """WebSocket关闭回调函数""" print("WebSocket连接关闭") def on_open(ws): """ WebSocket连接成功回调函数:发送握手包+启动心跳线程 :param ws: WebSocket连接对象 """ def send_heartbeat(): """发送心跳包,维持连接(每30秒一次)""" while is_running: # 心跳包格式:op=2,数据为空 heartbeat_data = struct.pack(">I2H2I", 16, 16, 3, 2, 1) ws.send(heartbeat_data, opcode=websocket.ABNF.OPCODE_BINARY) time.sleep(30) # 发送握手包 real_room_id = ws.room_id # 自定义属性,存储真实房间ID handshake_data = build_ws_handshake_data(real_room_id) ws.send(handshake_data, opcode=websocket.ABNF.OPCODE_BINARY) # 启动心跳线程 heartbeat_thread = threading.Thread(target=send_heartbeat) heartbeat_thread.daemon = True heartbeat_thread.start() def start_danmaku_crawler(short_room_id, crawl_duration=60): """ 启动弹幕爬虫 :param short_room_id: 直播间短ID :param crawl_duration: 爬取时长(秒),默认60秒 """ global is_running, danmaku_list danmaku_list = [] # 清空历史数据 is_running = True # 1. 获取真实房间ID real_room_id = get_real_room_id(short_room_id) if not real_room_id: return # 2. 建立WebSocket连接 ws_url = "wss://broadcastlv.chat.bilibili.com/sub" ws = websocket.WebSocketApp( ws_url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close ) # 给WebSocket对象添加自定义属性:真实房间ID ws.room_id = real_room_id # 启动WebSocket线程 ws_thread = threading.Thread(target=ws.run_forever) ws_thread.daemon = True ws_thread.start() # 爬取指定时长后停止 print(f"爬虫将运行{crawl_duration}秒,按Ctrl+C可提前终止...") try: time.sleep(crawl_duration) except KeyboardInterrupt: print("\n用户手动终止爬虫") finally: is_running = False ws.close() ws_thread.join() # 3. 将弹幕数据存储为CSV文件 if danmaku_list: df = pd.DataFrame(danmaku_list) # 按发送时间排序 df = df.sort_values(by="发送时间") # 生成文件名:直播间ID_爬取时间.csv filename = f"bilibili_danmaku_room_{real_room_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(filename, index=False, encoding="utf-8-sig") print(f"\n爬取完成!共获取{len(danmaku_list)}条弹幕数据") print(f"数据已保存至:{filename}") # 输出前5条数据预览 print("\n数据预览(前5条):") print(df.head()) else: print("\n爬取完成,但未获取到任何弹幕数据") # 主程序入口 if __name__ == "__main__": # 替换为目标直播间短ID,爬取时长设为30秒 start_danmaku_crawler(short_room_id=290902, crawl_duration=30) 2.3.2 代码输出结果
plaintext
成功获取真实房间ID:290902 爬虫将运行30秒,按Ctrl+C可提前终止... WebSocket连接成功,开始接收弹幕数据... [2026-01-15 10:20:35] 张三: 主播好厉害! [2026-01-15 10:20:38] 李四: 求背景音乐! [2026-01-15 10:20:42] 王五: 666666 [2026-01-15 10:20:45] 赵六: 打卡打卡! [2026-01-15 10:20:50] 小七: 什么时候抽奖? 爬取完成!共获取5条弹幕数据 数据已保存至:bilibili_danmaku_room_290902_20260115_102105.csv 数据预览(前5条): 发送时间 发送者ID 发送者昵称 弹幕内容 弹幕颜色 礼物等级 0 2026-01-15 10:20:35 12345678 张三 主播好厉害! 16777215 0 1 2026-01-15 10:20:38 87654321 李四 求背景音乐! 16777215 0 2 2026-01-15 10:20:42 11223344 王五 666666 16777215 0 3 2026-01-15 10:20:45 44332211 赵六 打卡打卡! 16777215 0 4 2026-01-15 10:20:50 55667788 小七 什么时候抽奖? 16777215 0 WebSocket连接关闭 2.3.3 原理说明
- 连接建立流程:
- 通过
websocket-client库创建 WebSocketApp 对象,绑定on_open、on_message等回调函数; on_open回调中发送握手包,并启动心跳线程(每 30 秒发送一次心跳包),维持与服务器的连接;- 服务器接收握手包后返回
op=8的确认包,标识连接成功。
- 通过
- 数据解析逻辑:
- 服务器推送的消息中,
op=5表示业务消息推送,包含弹幕、礼物、关注等多种类型; - 仅筛选
cmd=DANMU_MSG的消息,解析其中的发送时间、发送者信息、弹幕内容等核心字段; - 时间戳转换为人类可读的格式,方便后续分析。
- 服务器推送的消息中,
- 心跳机制:
- B 站弹幕服务器会在 30 秒内未收到客户端消息时断开连接,因此需要定时发送
op=2的心跳包; - 心跳包采用固定的二进制格式,数据部分为空。
- B 站弹幕服务器会在 30 秒内未收到客户端消息时断开连接,因此需要定时发送
2.4 步骤 4:完整可运行代码整合
将以上步骤的代码整合为一个完整的脚本,方便直接运行:
python
运行
import requests import json import struct import websocket import threading import time import pandas as pd from datetime import datetime # 全局变量 danmaku_list = [] is_running = True def get_real_room_id(short_room_id): """根据直播间短ID获取真实房间ID""" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://live.bilibili.com/", "Accept": "application/json, text/plain, */*" } url = f"https://api.live.bilibili.com/room/v1/Room/room_init?id={short_room_id}" try: response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() if data["code"] == 0: return data["data"]["room_id"] else: raise Exception(f"获取真实房间ID失败:{data['message']}") else: raise Exception(f"请求失败,状态码:{response.status_code}") except Exception as e: print(f"获取真实房间ID异常:{str(e)}") return None def build_ws_handshake_data(room_id): """构建WebSocket握手包""" handshake_json = { "uid": 0, "roomid": room_id, "protover": 3, "platform": "web", "type": 2 } json_str = json.dumps(handshake_json) packet_len = 16 + len(json_str) header_len = 16 protover = 3 op = 7 seq = 1 data = struct.pack(">I2H2I", packet_len, header_len, protover, op, seq) + json_str.encode("utf-8") return data def on_message(ws, message): """处理WebSocket消息""" global danmaku_list if len(message) < 16: return packet_len = struct.unpack(">I", message[:4])[0] header_len = struct.unpack(">H", message[4:6])[0] op = struct.unpack(">I", message[8:12])[0] if op == 8: print("WebSocket连接成功,开始接收弹幕数据...") elif op == 3: pass elif op == 5: data = message[header_len:packet_len] try: msg_data = json.loads(data.decode("utf-8")) for msg in msg_data: if msg.get("cmd") == "DANMU_MSG": danmaku_info = { "发送时间": datetime.fromtimestamp(msg["info"][4]).strftime("%Y-%m-%d %H:%M:%S"), "发送者ID": msg["info"][2][0], "发送者昵称": msg["info"][2][1], "弹幕内容": msg["info"][1], "弹幕颜色": msg["info"][3][0], "礼物等级": msg["info"][4][0] if len(msg["info"]) > 4 else 0 } danmaku_list.append(danmaku_info) print(f"[{danmaku_info['发送时间']}] {danmaku_info['发送者昵称']}: {danmaku_info['弹幕内容']}") except Exception as e: print(f"解析弹幕数据异常:{str(e)}") def on_error(ws, error): """处理WebSocket错误""" print(f"WebSocket错误:{error}") def on_close(ws, close_status_code, close_msg): """处理WebSocket关闭""" print("WebSocket连接关闭") def on_open(ws): """WebSocket连接成功回调""" def send_heartbeat(): """发送心跳包""" while is_running: heartbeat_data = struct.pack(">I2H2I", 16, 16, 3, 2, 1) ws.send(heartbeat_data, opcode=websocket.ABNF.OPCODE_BINARY) time.sleep(30) # 发送握手包 handshake_data = build_ws_handshake_data(ws.room_id) ws.send(handshake_data, opcode=websocket.ABNF.OPCODE_BINARY) # 启动心跳线程 heartbeat_thread = threading.Thread(target=send_heartbeat) heartbeat_thread.daemon = True heartbeat_thread.start() def start_danmaku_crawler(short_room_id, crawl_duration=60): """启动弹幕爬虫""" global is_running, danmaku_list danmaku_list = [] is_running = True # 获取真实房间ID real_room_id = get_real_room_id(short_room_id) if not real_room_id: return # 建立WebSocket连接 ws_url = "wss://broadcastlv.chat.bilibili.com/sub" ws = websocket.WebSocketApp( ws_url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close ) ws.room_id = real_room_id # 启动WebSocket线程 ws_thread = threading.Thread(target=ws.run_forever) ws_thread.daemon = True ws_thread.start() # 运行指定时长 print(f"爬虫启动,将运行{crawl_duration}秒...") try: time.sleep(crawl_duration) except KeyboardInterrupt: print("\n用户手动终止爬虫") finally: is_running = False ws.close() ws_thread.join() # 存储数据 if danmaku_list: df = pd.DataFrame(danmaku_list) df = df.sort_values(by="发送时间") filename = f"bilibili_danmaku_room_{real_room_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(filename, index=False, encoding="utf-8-sig") print(f"\n爬取完成!共获取{len(danmaku_list)}条弹幕数据") print(f"数据文件路径:{filename}") print("\n数据预览:") print(df.head()) else: print("\n未获取到弹幕数据,请检查直播间是否在线或ID是否正确") if __name__ == "__main__": # 配置爬取参数 TARGET_ROOM_ID = 290902 # 替换为目标直播间短ID CRAWL_DURATION = 60 # 爬取时长(秒) # 启动爬虫 start_danmaku_crawler(TARGET_ROOM_ID, CRAWL_DURATION) 三、常见问题与解决方案
3.1 连接失败 / 无法获取弹幕
| 问题现象 | 可能原因 | 解决方案 |
|---|---|---|
| 获取真实房间 ID 失败 | 直播间短 ID 错误 / 直播间已关闭 | 1. 确认直播间 URL 中的 ID 正确;2. 检查直播间是否正在直播 |
| WebSocket 连接被拒绝 | 请求头缺失 / 协议版本错误 | 1. 确保请求头包含User-Agent和Referer;2. 协议版本固定为 3 |
| 连接成功但无弹幕 | 直播间无弹幕 / 心跳包未发送 | 1. 更换弹幕较多的直播间测试;2. 检查心跳线程是否正常运行 |
3.2 反爬机制应对策略
- 请求频率控制:避免短时间内频繁请求
room_init接口,可添加 1-2 秒的延时; - 请求头优化:使用真实的浏览器
User-Agent,避免使用默认的 Python 请求头; - IP 代理:如果爬取量较大,可使用 IP 代理池避免 IP 被封禁;
- 登录状态(可选):如需爬取需要登录才能查看的弹幕,可在握手包中填入真实的
uid和cookie。
3.3 数据存储扩展
除了 CSV 格式,还可将弹幕数据存储到其他介质:
- MySQL 数据库:使用
pymysql库将 DataFrame 写入数据库,适合长期存储和批量分析; - MongoDB:适合存储非结构化的弹幕数据,支持灵活的查询;
- Redis:可作为缓存,实时存储最新的弹幕数据,用于实时监控。
四、进阶优化方向
4.1 多直播间并发爬取
通过threading或asyncio实现多直播间弹幕的并发爬取,提升数据采集效率:
python
运行
import concurrent.futures def crawl_single_room(room_id): """爬取单个直播间""" start_danmaku_crawler(room_id, crawl_duration=60) if __name__ == "__main__": # 多个直播间ID room_ids = [290902, 123456, 654321] # 并发爬取 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: executor.map(crawl_single_room, room_ids) 4.2 弹幕数据实时分析
结合matplotlib或pyecharts实现弹幕数据的可视化分析:
- 弹幕发送频率时序图;
- 高频词汇统计(词云);
- 发送者活跃度分析。
4.3 异常自动重连
添加重连机制,当 WebSocket 连接断开时自动重新建立连接:
python
运行
def start_reconnect_crawler(short_room_id, crawl_duration=300): """带自动重连的爬虫""" start_time = time.time() while time.time() - start_time < crawl_duration: try: start_danmaku_crawler(short_room_id, crawl_duration=60) except Exception as e: print(f"爬虫异常,5秒后重新连接:{str(e)}") time.sleep(5) 五、法律与伦理说明
- 合规性:本爬虫仅用于学习和研究目的,爬取的数据不得用于商业用途;
- 网站规则:爬取前请遵守 B 站的《用户协议》和《robots.txt》规则;
- 数据隐私:不得泄露爬取到的用户个人信息,遵守《网络安全法》等相关法律法规;
- 爬取频率:控制爬取频率,避免对 B 站服务器造成压力。
总结
核心要点回顾
- B 站直播弹幕基于WebSocket 协议传输,需构建二进制握手包完成连接初始化,通过心跳机制维持连接;
- 爬虫开发核心流程为:获取真实房间 ID → 构建握手包 → 建立 WebSocket 连接 → 解析弹幕数据 → 存储数据;
- 关键技术点包括二进制协议解析、多线程心跳维护、JSON 数据解析和结构化存储。
实战价值
本文提供的代码可直接用于 B 站直播弹幕的爬取,不仅适用于数据分析场景,也可作为 WebSocket 协议爬虫开发的典型案例,帮助开发者理解实时数据爬取的核心逻辑。通过进阶优化,可扩展为支持多直播间、实时分析、自动重连的完整弹幕采集系统。