Python 之 diskcache 磁盘缓存工具

之前写了 cachetools 的缓存工具,那个是纯内存的,性能上确实有优势,但重启后缓存数据会丢失。diskcache 则利用轻量级的 sqlite 数据库,该数据库不需要单独的服务器进程,并可以持久化数据结构,且可以突破内存的限制,针对大量数据的缓存时,不会因为内存溢出而丢失数据。

特性diskcachecachetools
存储位置磁盘为主(内存为辅)纯内存
持久化✅ 支持(重启后数据还在)❌ 不支持
数据大小适合大数据(受磁盘限制)适合小数据(受内存限制)
速度磁盘I/O较慢纯内存很快
使用场景长期缓存、大数据短期缓存、小数据

安装

pip install diskcache

淘汰策略

从源码的 EVICTION_POLICY 值可以看出,淘汰策略主要有以下几种。

  • 'least-recently-stored' - 默认,按存储时间淘汰
  • 'least-recently-used' - 按访问时间淘汰(每次访问都写数据库)
  • 'least-frequently-used' - 按访问频率淘汰(每次访问都写数据库)
  • 'none' - 禁用自动淘汰

默认则是按照 LRS 按缓存存储的先后时间进行淘汰的淘汰策略。

简单存取

from diskcache import Cache # 1. 实例一个缓存对象 # 需要传入目录路径。如果目录路径不存在,将创建该路径,并且会在指定位置创建一个cache.db的文件。 # 如果未指定,则会自动创建一个临时目录,并且会在指定位置创建一个cache.db的文件。 cache = Cache("cache") # 2. 保存缓存 cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) # 3. 获取缓存 expire_time为真,返回有效期时间;tag为真,返回缓存设置的tag值; name = cache.get('name', default=False, expire_time=True, tag=True) print(name) # ('张三', 1770617370.6258903, '姓名') 

上面代码执行之后,我们可以在当前位置的下发现有个 cache 目录,其中有个 cache.db 文件。因为这个 cache.db 是个 sqlite 数据库文件,我们可以尝试使用 pandas 读取一下。

import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/cache.db') pd.set_option('display.max_columns', None) # 不限制列数 pd.set_option('display.width', None) # 不限制列宽 if __name__ == '__main__': res = pd.read_sql('SELECT * FROM cache;', con=engine) print(res) 
 rowid key raw store_time expire_time access_time access_count tag size mode filename value 0 1 name 1 1.770605e+09 1.770605e+09 1.770605e+09 0 姓名 0 1 None 张三

从查询结果中可以看出,数据库除了常规的 key 和 value,还有过期时间戳 expire_time,tag, access_count 等参数。

缓存获取

获取指定

正常使用 get() 方法获取缓存,但这种方法如果缓存不存或者缓存已过期的话,就只是返回 None 而不会报错。

from diskcache import Cache cache = Cache("cache") print(cache.get('key_not_exist')) # None 

还有就是使用 read() 方法获取缓存,Key 不存在或已过期则直接报 KeyError 的错。具体使用哪种看自己的使用场景。

from diskcache import Cache cache = Cache("cache") print(cache.read('key_not_exist')) # Traceback (most recent call last): # File "E:\lky_project\test_project\test4.py", line 4, in <module> # print(cache.read('key_not_exist')) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^ # File "E:\myenv\py11_base_env\Lib\site-packages\diskcache\core.py", line 1252, in read # raise KeyError(key) # KeyError: 'key_not_exist' 

获取最近

有时候,我们想知道最近(最远)存储的缓存是哪个,则可以使用 peekitem() 方法获取最近(最远)存储的缓存数据。

注意:该操作会自动清空数据库中已过期的缓存。

from diskcache import Cache cache = Cache("cache") print(cache.peekitem()) # ('key5', 'value5') print(cache.peekitem(last=False)) # ('key1', 'value1') 

缓存过期

设置缓存的时候(无论是使用 set() 还是 add() 方法),都有一个过期时长参数 expire 指定该缓存多久后过期。

from diskcache import Cache cache = Cache("cache") result = cache.add("key1", "value1", expire=60) print(result) # True result = cache.add("key2", "value2", expire=30) print(result) # True 

如果缓存已过期,则返回的对应的 value 为 None(当然,如果指定了默认值 default,则返回默认值,比如下面的 False)。

from diskcache import Cache cache = Cache("cache") name = cache.get('name', default=False, expire_time=True, tag=True) print(name) # (False, None, None)

但其实这个时候,数据仍然在数据库里的,只不过因为当前时间超过了过期时间 expire_time,所以不会将这个数据取回来。

import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/cache.db') pd.set_option('display.max_columns', None) # 不限制列数 pd.set_option('display.width', None) # 不限制列宽 if __name__ == '__main__': res = pd.read_sql('SELECT * FROM cache;', con=engine) print(res) # rowid key raw store_time expire_time access_time access_count tag size mode filename value # 0 1 name 1 1.770617e+09 1.770617e+09 1.770617e+09 0 姓名 0 1 None 张三 

缓存清理

清空过期

使用 expire() 方法可以清空过期缓存,并返回清空的缓存数。

import time from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=30, read=True, tag='年龄', retry=True) time.sleep(35) count = cache.expire() print(count) # 1 

清空所有

使用 clear() 方法可以清空所有缓存,并返回清空的缓存数。

from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) count = cache.clear() print(count) # 2

强制清理

使用 cull() 方法会先清理过期缓存,再按照给定的缓存淘汰策略,清理缓存直到磁盘缓存容量小于size_limit 的大小。

from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) count = cache.cull() print(count) 

按 tag 清理

使用 evict() 方法可以手动按照 tag 的名称清理缓存,并返回清空的缓存数。

from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) count = cache.evict("年龄") print(count) # 1 

按 key 清理

delete() 删除并返回是否删除成功。

from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) result = cache.delete("name") print(result) # True 

pop() 删除并返回缓存的 value。

from diskcache import Cache cache = Cache("cache") cache.set('name', '张三', expire=60, read=True, tag='姓名', retry=True) cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) result = cache.pop("name") print(result) # 张三 

缓存添加

可以使用 add() 方法仅添加缓存。

  • 只有键不存在时才会存储
  • 如果键已存在,不会覆盖,返回 False
  • 相当于 setdefault() 的行为
from diskcache import Cache cache = Cache("cache") result = cache.add("key1", "value1") print(result) # True 

当然,set() 方法也可以设置缓存。与 add() 的区别如下。

  • 总是会存储值,无论键是否已存在
  • 如果键已存在,会覆盖旧值
  • 相当于字典的 dict[key] = value

缓存刷新

针对未过期的 key,重新刷新 key 的过期时间。如果 key 已过期,则会刷新失败。

from diskcache import Cache cache = Cache("cache") result = cache.touch("key1", 60) print(result) # True 

缓存判断

Cache 中定义了 __contains__ 魔法方法,可以用 in 的方式判断对应的 key 是否在缓存中。

from diskcache import Cache cache = Cache("cache") cache.set("key1", "value1") print("key1" in cache) # True 

也可以使用 get() 方法,看返回的结果是不是 None 来进行判断(但这种判断并不精确,如果 value 值本身是 None 的话,则会造成误判)。

当然,还可以使用 read() 方法,Key 不存在的话直接报 KeyError,这种需要自己处理这种异常。

缓存修改

针对整型的缓存数据类型,可以使用 incr() 和 decr() 对 value 进行加减指定 delta 的数值。

from diskcache import Cache cache = Cache("cache") cache.set('age', 30, expire=60, read=True, tag='年龄', retry=True) cache.incr("age", delta=5) print(cache.get("age")) # 35 cache.decr("age", delta=3) print(cache.get("age")) # 32

缓存检查

check() 方法会对数据库和文件系统的一致性进行检查,如果有告警,则返回告警信息。

from diskcache import Cache cache = Cache("cache") warnings = cache.check() print(warnings) # [] 

自动缓存

像之前的 cachetools 或标准库的 lru_cache,都是可以直接装饰在函数上,实现对函数调用自动缓存的,diskcache 也可以使用缓存实例的 memoize() 方法来装饰函数,实现调用结果自动缓存。

from diskcache import Cache cache = Cache('cache') # 使用cache.memoize()装饰器自动缓存函数结果 @cache.memoize(expire=60) def compute_expensive_operation(x): # 模拟耗时计算 print("call function") return x * x * x # 第一次调用,计算结果并缓存 result = compute_expensive_operation(3) print(result) # 输出: call function 和 27 # 第二次调用,直接从缓存获取结果,不会再真正执行函数 result = compute_expensive_operation(3) print(result) # 输出: 27 

看源码可知,它会自动将输入参数等按照指定流程转换成元组(这样能确保相同输入转换后 的 key 始终一致)作为缓存的 key,每次函数调用时,如果已经有获取到缓存结果,则直接返回;如果还没有缓存结果,则调用函数,并将结果缓存并返回。

 def wrapper(*args, **kwargs): """Wrapper for callable to cache arguments and return values.""" key = wrapper.__cache_key__(*args, **kwargs) result = self.get(key, default=ENOVAL, retry=True) if result is ENOVAL: result = func(*args, **kwargs) if expire is None or expire > 0: self.set(key, result, expire, tag=tag, retry=True) return result

队列操作

我们可以使用相同前缀的 key 来约定一个队列(简单理解就是相同前缀的 key 同属于一个队列),然后通过 push() 和 pull() 方法来实现入列和出列。

队列推送

可以使用 push() 方法往指定前缀的队列推送 value(key 则由方法根据前缀及缓存中已有的相同前缀的 key 来自动生成)。

from diskcache import Cache cache = Cache("cache") result = cache.push("first", prefix="test") print(result) # test-500000000000000 result = cache.push("second", prefix="test") print(result) # test-500000000000001 result = cache.push("third", prefix="test", side="front") # 从队列前面插入 print(result) # test-499999999999999 

队列拉取

出队的顺序与入队顺序相同(遵从 FIFO 先进先出的规则)。

from diskcache import Cache cache = Cache("cache") result = cache.pull("test") print(result) # ('test-499999999999999', 'third') result = cache.pull("test") print(result) # ('test-500000000000000', 'first') result = cache.pull("test") print(result) # ('test-500000000000001', 'second') 

当然,我们也可以通过参数 side 指定从头部出列还是尾部出列。

from diskcache import Cache cache = Cache("cache") result = cache.pull("test") print(result) # ('test-499999999999999', 'third') result = cache.pull("test", side="back") print(result) # ('test-500000000000001', 'second') result = cache.pull("test") print(result) # ('test-500000000000000', 'first') 

队列查看

我们可以使用 peek() 方法,查看指定前缀的队列的队首和队尾的数据。

注意:不同于 pull() 方法,peek() 只是查看而不会删除未过期的缓存(过期缓存仍然会自动清理),毕竟从 peek 这个单词的意思本身就是 窥视,偷看的意思。

from diskcache import Cache cache = Cache("cache") print(cache.peek(prefix="test")) # ('test-499999999999999', 'third') print(cache.peek(prefix="test", side="back")) # ('test-500000000000001', 'second') 

连接关闭

使用 close() 方法可以关闭与底层 sqlite 数据库的连接。

from diskcache import Cache cache = Cache("cache") cache.close() 

配置重置

缓存的配置存储在 Settings 表中。

SELECT key, value FROM Settings;
 key value 0 count 0 1 size 0 2 hits 0 3 misses 0 4 statistics 0 5 tag_index 0 6 eviction_policy least-recently-stored 7 size_limit 1073741824 8 cull_limit 10 9 sqlite_auto_vacuum 1 10 sqlite_cache_size 8192 11 sqlite_journal_mode wal 12 sqlite_mmap_size 67108864 13 sqlite_synchronous 1 14 disk_min_file_size 32768 15 disk_pickle_protocol 5

我们可以使用 reset() 方法重置 Settings 中对应 key 的 value。比如我们设置 cull_limit 值为 30。

from diskcache import Cache cache = Cache("cache") value = cache.reset("cull_limit", 30) print(value) # 30 

再次查看 Settings 表时,可以看到 cull_limit 的值已被修改。

 key value 0 count 2 1 size 0 2 hits 0 3 misses 0 4 statistics 0 5 tag_index 0 6 eviction_policy least-recently-stored 7 size_limit 1073741824 8 cull_limit 30 ...

配置查看

如果我么仅仅是想查看配置的值,而不需要修改的话,则在使用 reset() 方法时,不需要传入对应的值,且 update 置为 False 即可。

from diskcache import Cache cache = Cache("cache") value = cache.reset("cull_limit", update=False) print(value) # 10 

事务操作

可以使用事务的方式批量进行操作。

from diskcache import Cache cache = Cache("cache") items = {"key1": "value1", "key2": "value2", "key3": "value3"} with cache.transact(): # 性能提升2-5倍 for key, value in items.items(): cache[key] = value 

命中统计

from diskcache import Cache cache = Cache("cache") # 启用统计 cache.stats(enable=True) # 获取命中率 hits, misses = cache.stats() print(f"hits: {hits}; misses: {misses}") # hits: 5; misses: 2 # 检查缓存一致性 warnings = cache.check() print(warnings) # [] # 获取缓存体积(字节) size = cache.volume() print(size) # 32768 

Deque 队列缓存

估计是怕我们直接用 Cache 原生的 push() 和 pull() 方法来操作队列太不方便,diskcache 还帮我们封装实现了一个 Deque 类(其中大部分操作也是基于 Cache 的 push() 和 pull() 方法来实现)。

from diskcache import Deque cache = Deque([0, 1, 2, 3, 4], "cache") cache.append(5) print(list(cache)) # [0, 1, 2, 3, 4, 5]

其 key 值则是无前缀的 key。其他操作则与标准库 collections 中的 deque 基本类似,就不赘述了。不同之处在于每个修改操作都会持久化到数据库,感兴趣的可以看 Deque 中方法的源码实现。

 rowid key raw store_time expire_time access_time access_count tag size mode filename value 0 1 500000000000000 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 0 1 2 500000000000001 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 1 2 3 500000000000002 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 2 3 4 500000000000003 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 3 4 5 500000000000004 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 4 5 6 500000000000005 1 1.770714e+09 None 1.770714e+09 0 None 0 1 None 5

Index 索引缓存

Index 组件提供了类似字典的持久化存储方案(从源码中也可以看出,它默认没有设置缓存淘汰策略,也即永不过期)。它支持事务操作,确保数据一致性,同时保持高效的读写性能。

self._cache = Cache(directory, eviction_policy='none')
from diskcache import Index # 持久化字典,永不淘汰 index = Index("cache", {"a": 1, "b": 2}, c=3) index.update([('d', 4), ('e', 5)]) print(list(index)) # ['a', 'b', 'c', 'd', 'e'] 

我们也可以从通用的 Cache() 实例来创建 index。

from diskcache import Cache, Index cache = Cache("cache") index = Index.fromcache(cache) 

Index 缓存的设置和获取和字典一样(Index 中定义了 __getitem__,__setitem__,__delitem__ 等字典类型的魔法方法)。封装的其他方法也基本上字典一致。

from diskcache import Index index = Index("cache") index['f'] = 6 print(index.get('c')) # 3

还可以像字典一样 setdefault,对应的 key 有值时,返回缓存的值;没有值时,使用给定的值设置缓存并返回。

from diskcache import Index index = Index("cache") result = index.setdefault('h', 9) print(result) # 8 

Fanout 分片缓存

FanoutCache提供了横向扩展能力,它的一致性哈希算法确保了数据均匀分布,将数据自动分片到多个子缓存,同时支持动态扩容和多进程操作。

from diskcache import FanoutCache # 初始化分布式缓存,8个分片,每个分片1GB大小限制 distributed_cache = FanoutCache( directory='./cache', shards=8, size_limit=1024 ** 3 # 1GB per shard ) # 在多进程环境中共享使用 from multiprocessing import Pool def fetch_page(url): return f"content of {url}" def spider_worker(url): # 自动路由到对应分片 if distributed_cache.get(url, default=None) is None: data = fetch_page(url) distributed_cache.set(url, data, expire=86400) # 缓存24小时 return url url_list = [ "https://example1.com", "https://example2.com", "https://example3.com", "https://example4.com", ] if __name__ == '__main__': with Pool(processes=8) as pool: # 并行处理URL列表 results = pool.map(spider_worker, url_list) 

此时,diskcache 会按照分片数目,在指定目录下建立分片个数目的目录,并将缓存均匀分散存储在这些目录中的 cache.db 数据库中。源码中的 self._shards 也可以看出他新建分片缓存目录的逻辑。

 self._shards = tuple( Cache( directory=op.join(directory, '%03d' % num), timeout=timeout, disk=disk, size_limit=size_limit, **settings, ) for num in range(shards) )

注意:上面多进程示例,进程池的 with 块需要放到 __main__ 块中执行,不然会因为安全因素导致执行失败。

下面是其中一个 001 分片的 cache.db 中存储的缓存数据。

import pandas as pd from sqlalchemy import create_engine engine = create_engine('sqlite:///cache/001/cache.db') pd.set_option('display.max_columns', None) # 不限制列数 pd.set_option('display.width', None) # 不限制列宽 if __name__ == '__main__': res = pd.read_sql('SELECT * FROM cache;', con=engine) # res = pd.read_sql('SELECT key, value FROM Settings;', con=engine) print(res)
# rowid key raw store_time expire_time access_time access_count tag size mode filename value # 0 1 https://example4.com 1 1.770632e+09 1.770718e+09 1.770632e+09 0 None 0 1 None content of https://example4.com 

其他的缓存操作方法与 Cache 类似,只是针对每个 key,会通过相同的 Hash 算法先找到其所属的正确的分片,然后再进行操作。

Read more

最新电子电气架构(EEA)调研-3

而新一代的强实时性、高确定性,以及满足CAP定理的同步分布式协同技术(SDCT),可以实现替代TSN、DDS的应用,且此技术已经在无人车辆得到验证,同时其低成本学习曲线、无复杂二次开发工作,将开发人员的劳动强度、学习曲线极大降低,使开发人员更多的去完成算法、执行器功能完善。 五、各大车厂的EEA 我们调研策略是从公开信息中获得各大车厂的EEA信息,并在如下中进行展示。 我们集中了华为、特斯拉、大众、蔚来、小鹏、理想、东风(岚图)等有代表领先性的车辆电子电气架构厂商。        1、华为 图12 华为的CCA电子电气架构              (1)华为“计算+通信”CC架构的三个平台                         1)MDC智能驾驶平台;                         2)CDC智能座舱平台                         3)VDC整车控制平台。        联接指的是华为智能网联解决方案,解决车内、车外网络高速连接问题,云服务则是基于云计算提供的服务,如在线车主服务、娱乐和OTA等。 华

By Ne0inhk
Apache IoTDB 架构特性与 Prometheus+Grafana 监控体系部署实践

Apache IoTDB 架构特性与 Prometheus+Grafana 监控体系部署实践

Apache IoTDB 架构特性与 Prometheus+Grafana 监控体系部署实践 文章目录 * Apache IoTDB 架构特性与 Prometheus+Grafana 监控体系部署实践 * Apache IoTDB 核心特性与价值 * Apache IoTDB 监控面板完整部署方案 * 安装步骤 * 步骤一:IoTDB开启监控指标采集 * 步骤二:安装、配置Prometheus * 步骤三:安装grafana并配置数据源 * 步骤四:导入IoTDB Grafana看板 * TimechoDB(基于 Apache IoTDB)增强特性 * 总结与应用场景建议 Apache IoTDB 核心特性与价值 Apache IoTDB 专为物联网场景打造的高性能轻量级时序数据库,以 “设备 - 测点” 原生数据模型贴合物理设备与传感器关系,通过高压缩算法、百万级并发写入能力和毫秒级查询响应优化海量时序数据存储成本与处理效率,同时支持边缘轻量部署、

By Ne0inhk
SQL Server 2019安装教程(超详细图文)

SQL Server 2019安装教程(超详细图文)

SQL Server 介绍) SQL Server 是由 微软(Microsoft) 开发的一款 关系型数据库管理系统(RDBMS),支持结构化查询语言(SQL)进行数据存储、管理和分析。自1989年首次发布以来,SQL Server 已成为企业级数据管理的核心解决方案,广泛应用于金融、电商、ERP、CRM 等业务系统。它提供高可用性、安全性、事务处理(ACID)和商业智能(BI)支持,并支持 Windows 和 Linux 跨平台部署。 一、获取 SQL Server 2019 安装包 1. 官方下载方式 前往微软官网注册账号后,即可下载 SQL Server Developer 版本(

By Ne0inhk