跳到主要内容
Python diskcache 磁盘缓存工具使用指南 | 极客日志
Python
Python diskcache 磁盘缓存工具使用指南 综述由AI生成 Python 的 diskcache 库,这是一个基于 SQLite 的持久化内存缓存工具。相比纯内存的 cachetools,diskcache 支持数据持久化,重启后不丢失,且不受内存大小限制。文章详细讲解了安装方法、淘汰策略(如 LRS、LRU)、基本存取操作、过期机制、清理方法(清空、按 Tag、按 Key)、自动缓存装饰器、队列操作、事务处理以及多进程分片缓存(FanoutCache)等功能。通过示例代码展示了如何配置、使用及监控缓存状态,适用于需要长期存储或大数据量缓存的场景。
云朵棉花糖 发布于 2026/3/23 更新于 2026/5/9 9.6K 浏览之前写了 cachetools 的缓存工具,那个是纯内存的,性能上确实有优势,但重启后缓存数据会丢失。diskcache 则利用轻量级的 sqlite 数据库,该数据库不需要单独的服务器进程,并可以持久化数据结构,且可以突破内存的限制,针对大量数据的缓存时,不会因为内存溢出而丢失数据。
特性 diskcache cachetools 存储位置 磁盘为主(内存为辅) 纯内存 持久化 ✅ 支持(重启后数据还在) ❌ 不支持 数据大小 适合大数据(受磁盘限制) 适合小数据(受内存限制) 速度 磁盘 I/O 较慢 纯内存很快 使用场景 长期缓存、大数据 短期缓存、小数据
安装
pip install diskcache
淘汰策略
从源码的 EVICTION_POLICY 值可以看出,淘汰策略主要有以下几种。
'least-recently-stored' - 默认 ,按存储时间淘汰
'least-recently-used' - 按访问时间淘汰(每次访问都写数据库)
'least-frequently-used' - 按访问频率淘汰(每次访问都写数据库)
'none' - 禁用自动淘汰
默认则是按照 LRS 按缓存存储的先后时间进行淘汰的淘汰策略。
简单存取
from diskcache import Cache
上面代码执行之后,我们可以在当前位置的下发现有个 cache 目录,其中有个 cache.db 文件。因为这个 cache.db 是个 sqlite 数据库文件,我们可以尝试使用 pandas 读取一下。
import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/cache.db' ) pd.set_option('display.max_columns' , None )
rowid key raw store_time expire_time access_time access_count tag size mode filename value 0 1 name 1 1.770605e+09 1.770605e+09 1.770605e+09 0 姓名 0 1 None 张三
从查询结果中可以看出,数据库除了常规的 key 和 value,还有过期时间戳 expire_time,tag, access_count 等参数。
缓存获取
获取指定 正常使用 get() 方法获取缓存,但这种方法如果缓存不存或者缓存已过期的话,就只是返回 None 而不会报错。
from diskcache import Cache cache = Cache("cache" ) print (cache.get('key_not_exist' ))
还有就是使用 read() 方法获取缓存,Key 不存在或已过期则直接报 KeyError 的错。具体使用哪种看自己的使用场景。
from diskcache import Cache cache = Cache("cache" ) print (cache.read('key_not_exist' ))
获取最近 有时候,我们想知道最近(最远)存储的缓存是哪个,则可以使用 peekitem() 方法获取最近(最远)存储的缓存数据。
**注意:**该操作会自动清空数据库中已过期的缓存。
from diskcache import Cache cache = Cache("cache" ) print (cache.peekitem())
缓存过期 设置缓存的时候(无论是使用 set() 还是 add() 方法),都有一个过期时长参数 expire 指定该缓存多久后过期。
from diskcache import Cache cache = Cache("cache" ) result = cache.add("key1" , "value1" , expire=60 ) print (result)
如果缓存已过期,则返回的对应的 value 为 None(当然,如果指定了默认值 default,则返回默认值,比如下面的 False)。
from diskcache import Cache cache = Cache("cache" ) name = cache.get('name' , default=False , expire_time=True , tag=True ) print (name)
但其实这个时候,数据仍然在数据库里的,只不过因为当前时间超过了过期时间 expire_time,所以不会将这个数据取回来。
import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/cache.db' ) pd.set_option('display.max_columns' , None )
缓存清理
清空过期 使用 expire() 方法可以清空过期缓存,并返回清空的缓存数。
import time from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=30 , read=True , tag='年龄' , retry=True ) time.sleep(35 ) count = cache.expire() print (count)
清空所有 使用 clear() 方法可以清空所有缓存,并返回清空的缓存数。
from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) count = cache.clear() print (count)
强制清理 使用 cull() 方法会先清理过期缓存,再按照给定的缓存淘汰策略,清理缓存直到磁盘缓存容量小于 size_limit 的大小。
from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) count = cache.cull() print (count)
按 tag 清理 使用 evict() 方法可以手动按照 tag 的名称清理缓存,并返回清空的缓存数。
from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) count = cache.evict("年龄" ) print (count)
按 key 清理 from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) result = cache.delete("name" ) print (result)
from diskcache import Cache cache = Cache("cache" ) cache.set ('name' , '张三' , expire=60 , read=True , tag='姓名' , retry=True ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) result = cache.pop("name" ) print (result)
缓存添加
只有键不存在时 才会存储
如果键已存在,不会覆盖 ,返回 False
相当于 setdefault() 的行为
from diskcache import Cache cache = Cache("cache" ) result = cache.add("key1" , "value1" ) print (result)
当然,set() 方法也可以设置缓存。与 add() 的区别如下。
总是会存储 值,无论键是否已存在
如果键已存在,会覆盖 旧值
相当于字典的 dict[key] = value
缓存刷新 针对未过期的 key,重新刷新 key 的过期时间。如果 key 已过期,则会刷新失败。
from diskcache import Cache cache = Cache("cache" ) result = cache.touch("key1" , 60 ) print (result)
缓存判断 Cache 中定义了 contains 魔法方法,可以用 in 的方式判断对应的 key 是否在缓存中。
from diskcache import Cache cache = Cache("cache" ) cache.set ("key1" , "value1" ) print ("key1" in cache)
也可以使用 get() 方法,看返回的结果是不是 None 来进行判断(但这种判断并不精确,如果 value 值本身是 None 的话,则会造成误判)。
当然,还可以使用 read() 方法,Key 不存在的话直接报 KeyError,这种需要自己处理这种异常。
缓存修改 针对整型的缓存数据类型,可以使用 incr() 和 decr() 对 value 进行加减指定 delta 的数值。
from diskcache import Cache cache = Cache("cache" ) cache.set ('age' , 30 , expire=60 , read=True , tag='年龄' , retry=True ) cache.incr("age" , delta=5 ) print (cache.get("age" ))
缓存检查 check() 方法会对数据库和文件系统的一致性进行检查,如果有告警,则返回告警信息。
from diskcache import Cache cache = Cache("cache" ) warnings = cache.check() print (warnings)
自动缓存 像之前的 cachetools 或标准库的 lru_cache,都是可以直接装饰在函数上,实现对函数调用自动缓存的,diskcache 也可以使用缓存实例的 memoize() 方法来装饰函数,实现调用结果自动缓存。
from diskcache import Cache cache = Cache('cache' )
看源码可知,它会自动将输入参数等按照指定流程转换成元组(这样能确保相同输入转换后 的 key 始终一致)作为缓存的 key,每次函数调用时,如果已经有获取到缓存结果,则直接返回;如果还没有缓存结果,则调用函数,并将结果缓存并返回。
def wrapper (*args, **kwargs ):
"""Wrapper for callable to cache arguments and return values."""
key = wrapper.__cache_key__(*args, **kwargs)
result = self .get(key, default=ENOVAL, retry=True )
if result is ENOVAL:
result = func(*args, **kwargs)
if expire is None or expire > 0 :
self .set (key, result, expire, tag=tag, retry=True )
return result
队列操作 我们可以使用相同前缀的 key 来约定一个队列(简单理解就是相同前缀的 key 同属于一个队列),然后通过 push() 和 pull() 方法来实现入列和出列。
队列推送 可以使用 push() 方法往指定前缀的队列推送 value(key 则由方法根据前缀及缓存中已有的相同前缀的 key 来自动生成)。
from diskcache import Cache cache = Cache("cache" ) result = cache.push("first" , prefix="test" ) print (result)
队列拉取 出队的顺序与入队顺序相同(遵从 FIFO 先进先出的规则)。
from diskcache import Cache cache = Cache("cache" ) result = cache.pull("test" ) print (result)
当然,我们也可以通过参数 side 指定从头部出列还是尾部出列。
from diskcache import Cache cache = Cache("cache" ) result = cache.pull("test" ) print (result)
队列查看 我们可以使用 peek() 方法,查看指定前缀的队列的队首和队尾的数据。
**注意:**不同于 pull() 方法,peek() 只是查看而不会删除未过期的缓存(过期缓存仍然会自动清理),毕竟从 peek 这个单词的意思本身就是 窥视,偷看的意思。
from diskcache import Cache cache = Cache("cache" ) print (cache.peek(prefix="test" ))
连接关闭 使用 close() 方法可以关闭与底层 sqlite 数据库的连接。
from diskcache import Cache cache = Cache("cache" ) cache.close()
配置重置 SELECT key, value FROM Settings;
key value 0 count 0 1 size 0 2 hits 0 3 misses 0 4 statistics 0 5 tag_index 0 6 eviction_policy least-recently-stored 7 size_limit 1073741824 8 cull_limit 10 9 sqlite_auto_vacuum 1 10 sqlite_cache_size 8192 11 sqlite_journal_mode wal 12 sqlite_mmap_size 67108864 13 sqlite_synchronous 1 14 disk_min_file_size 32768 15 disk_pickle_protocol 5
我们可以使用 reset() 方法重置 Settings 中对应 key 的 value。比如我们设置 cull_limit 值为 30。
from diskcache import Cache cache = Cache("cache" ) value = cache.reset("cull_limit" , 30 ) print (value)
再次查看 Settings 表时,可以看到 cull_limit 的值已被修改。
key value 0 count 2 1 size 0 2 hits 0 3 misses 0 4 statistics 0 5 tag_index 0 6 eviction_policy least-recently-stored 7 size_limit 1073741824 8 cull_limit 30 ...
配置查看 如果我么仅仅是想查看配置的值,而不需要修改的话,则在使用 reset() 方法时,不需要传入对应的值,且 update 置为 False 即可。
from diskcache import Cache cache = Cache("cache" ) value = cache.reset("cull_limit" , update=False ) print (value)
事务操作 from diskcache import Cache cache = Cache("cache" ) items = {"key1" : "value1" , "key2" : "value2" , "key3" : "value3" } with cache.transact():
命中统计 from diskcache import Cache cache = Cache("cache" )
Deque 队列缓存 估计是怕我们直接用 Cache 原生的 push() 和 pull() 方法来操作队列太不方便,diskcache 还帮我们封装实现了一个 Deque 类(其中大部分操作也是基于 Cache 的 push() 和 pull() 方法来实现)。
from diskcache import Deque cache = Deque([0 , 1 , 2 , 3 , 4 ], "cache" ) cache.append(5 ) print (list (cache))
其 key 值则是无前缀的 key。其他操作则与标准库 collections 中的 deque 基本类似,就不赘述了。不同之处在于每个修改操作都会持久化到数据库,感兴趣的可以看 Deque 中方法的源码实现。
rowid key raw store_time expire_time access_time access_count tag size mode filename value 0 1 500000000000000 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 0 1 2 500000000000001 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 1 2 3 500000000000002 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 2 3 4 500000000000003 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 3 4 5 500000000000004 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 4 5 6 500000000000005 1 1.770714 e+09 None 1.770714 e+09 0 None 0 1 None 5
Index 索引缓存 Index 组件提供了类似字典的持久化存储方案(从源码中也可以看出,它默认没有设置缓存淘汰策略,也即永不过期)。它支持事务操作,确保数据一致性,同时保持高效的读写性能。
self ._cache = Cache(directory, eviction_policy='none' )
from diskcache import Index
我们也可以从通用的 Cache() 实例来创建 index。
from diskcache import Cache, Index cache = Cache("cache" ) index = Index.fromcache(cache)
Index 缓存的设置和获取和字典一样(Index 中定义了 getitem , setitem , delitem 等字典类型的魔法方法)。封装的其他方法也基本上字典一致。
from diskcache import Index index = Index("cache" ) index['f' ] = 6 print (index.get('c' ))
还可以像字典一样 setdefault,对应的 key 有值时,返回缓存的值;没有值时,使用给定的值设置缓存并返回。
from diskcache import Index index = Index("cache" ) result = index.setdefault('h' , 9 ) print (result)
Fanout 分片缓存 FanoutCache 提供了横向扩展 能力,它的一致性哈希 算法确保了数据均匀分布,将数据自动分片到多个子缓存,同时支持动态扩容和多进程操作。
from diskcache import FanoutCache
此时,diskcache 会按照分片数目,在指定目录下建立分片个数目的目录,并将缓存均匀分散存储在这些目录中的 cache.db 数据库中。源码中的 self._shards 也可以看出他新建分片缓存目录的逻辑。
self ._shards = tuple (
Cache(
directory=op.join(directory, '%03d' % num),
timeout=timeout,
disk=disk,
size_limit=size_limit,
**settings,
)
for num in range (shards)
)
注意 :上面多进程示例,进程池的 with 块需要放到 main 块中执行,不然会因为安全因素导致执行失败。
下面是其中一个 001 分片的 cache.db 中存储的缓存数据。
import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/001/cache.db' ) pd.set_option('display.max_columns' , None )
其他的缓存操作方法与 Cache 类似,只是针对每个 key,会通过相同的 Hash 算法先找到其所属的正确的分片,然后再进行操作。
相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online