跳到主要内容C++ 高性能内存池设计与实现 | 极客日志C++算法
C++ 高性能内存池设计与实现
综述由AI生成C++ 高性能内存池的设计与实现。内容涵盖内存池的基本概念、池化技术原理、内存碎片(内外碎片)分析以及 malloc 底层机制。重点讲解了基于 TCMalloc 模型的高并发内存池架构,包括 Thread Cache(线程缓存)、Central Cache(中心缓存)和 Page Cache(页缓存)三层结构。文章提供了定长内存池的基础实现代码,深入剖析了 placement new 在内存池中的应用,以及 Span 管理、桶锁机制、对象对齐映射规则和内存回收策略。通过对比原生 new/delete 与自定义内存池的性能测试,展示了内存池在提升分配效率和减少锁竞争方面的优势。
路由之心15K 浏览 项目介绍
当前项目是实现一个高并发的内存池,原型参考 Google 开源项目 TCMalloc(Thread-Caching Malloc),即线程缓存的 malloc。它实现了高效的多线程内存管理,用于替代系统的内存分配相关函数(malloc、free)。
本项目涉及 C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等知识。
基础知识补充
定位 new
new(obj) T; 是 placement new(定位 new)的使用,它是 C++ 中的一种特殊的内存分配方式。
在 C++ 中,new 通常用于动态分配内存并构造对象,例如:
T* obj = new T();
然而,new(obj) T; 则是定位 new 的用法。它并不分配内存,而是在指定的内存位置(由 obj 指向的地址)上构造对象。也就是说,obj 必须是一个指向已经分配内存的指针,这块内存已经存在并且足够大来容纳一个 T 类型的对象。
语法结构:
new (pointer) T(args);
- pointer:指向已分配内存的位置。这块内存已经预先分配好,通常由
malloc、内存池或者其他方式分配。
- T:对象的类型。
- args:传递给
T 类型构造函数的参数。
为什么需要 Placement New?
在常规的 new 操作中,new 关键字不仅分配内存,还会调用构造函数来初始化对象。这样会额外的进行一次内存分配。然而,在很多情况下,你可能已经分配了内存,并且只想在这些内存位置上创建对象。这时,placement new 就非常有用了。
工作原理:
- 不进行内存分配:
placement new 不会调用 operator new(即不会分配内存)。它假定内存已经由你提供,且这块内存足够大。
- 调用构造函数:
placement new 只是用来调用指定内存位置的构造函数。
- 返回对象的指针:
new(obj) T; 会返回指向 obj 的指针,指向已经初始化的对象。
销毁对象:
使用 placement new 构造的对象并不使用普通的 delete 来销毁。普通 delete 会调用析构函数并释放内存,而 placement new 并不负责释放内存。因此,如果你通过 placement new 构造了对象,你需要显式调用对象的析构函数,并确保内存不会泄漏:
obj->~T();
总结:
new (obj) T; 是 placement new,用于在已分配的内存上构造对象。
- 它不会分配内存,而是直接在指定的内存位置上调用构造函数。
- 适用于内存池、缓存池等需要手动管理内存的场景。
术语表
- cache: 高速缓冲存储器
- Align: 对齐
- concurrent: 并发
- batch: 批量
- fetch: 拿来
- actual: 实际
- range: 范围
- benchmark: 基准;标准;基准测试
什么是内存池
1. 池化技术
所谓'池化技术',就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用'池'这种技术的地方,除了内存池,还有连接池、线程池、对象池等。
2. 内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3. 内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。
3.1 效率问题
打个通俗的比方就是:问爸爸妈妈要钱买东西,一次次的要效率太低,如果估算好一个月要用到生活费,直接给一个月的,就不用一次一次的要,可以提交很多的效率。
3.2 碎片化
再需要补充说明的是内存碎片分为外碎片和内碎片。
外部碎片是一些空的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配需求。
内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
内碎片问题,我们后面项目就会看到,那会再进行更准确的理解。(创建时申请空间连续,释放时不按申请的顺序释放,会导致这些内存空间不连续)
3.2.1 外碎片
4. 了解一下 malloc
C/C++ 中我们要动态申请内存都是通过 malloc 去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的。
而 malloc 就是一个内存池。malloc 相当于向操作系统'批发'了一块较大的内存空间,然后'零售'给程序用。当全部'售完'或程序有大量的内存需求时,再根据实际需求向操作系统'进货'。
malloc 的实现方式有很多种,一般不同编译器平台用的都是不同的。比如 windows 的 vs 系列用的微软自己写的一套,linux gcc 用的 glibc 中的 ptmalloc。
先设计一个定长的内存池
作为程序员(C/C++)我们知道申请内存使用的是 malloc,malloc 其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能。下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
创建一个并发内存池 (ConcurrentMemoryPool) 项目
分配与释放
如果申请的大块内存空间没有用完,freeList 又不为空,可以先将还回来的内存块对象,再次重复利用
性能测试
脱离 malloc 直接在堆中
ObjectPool.h
#pragma once
#include<iostream>
#include<vector>
#include <new>
#include <time.h>
using std::cout;
using std::endl;
#ifdef _WIN32
#include <Windows.h>
#else
#endif
inline static void* SystemAlloc(size_t kpage) {
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
}
template<class T>
class ObjectPool {
public:
T* New() {
T* obj = nullptr;
if (_freeList) {
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
} else {
if (_remainingBytes < sizeof(T)) {
_remainingBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainingBytes >> 13);
if (_memory == nullptr) {
throw std::bad_alloc();
}
}
obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainingBytes -= objSize;
}
new(obj)T;
return obj;
}
void Delete(T* obj) {
obj->~T();
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory = nullptr;
void* _freeList = nullptr;
size_t _remainingBytes = 0;
};
struct TreeNode {
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode() :_val(0), _left(nullptr), _right(nullptr) {}
};
void TestObjectPool() {
const size_t Rounds = 3;
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j) {
for (int i = 0; i < N; ++i) {
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i) {
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j) {
for (int i = 0; i < N; ++i) {
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i) {
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
Test.cpp
#include"ObjectPool.h"
int main() {
TestObjectPool();
return 0;
}
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc 本身其实已经很优秀,那么我们项目的原型tcmalloc 就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题:
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题
concurrent memory pool 主要由以下 3 个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于 256KB 的内存的分配,线程从这里申请内存,不需要加锁,每个线程独享一个 cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache 是按需从 central cache 中获取的对象。central cache 合适的时机回收 thread cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache 是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有 thread cache 的没有内存对象时才会找 central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在 central cache 缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache 没有内存对象时,从 page cache 分配出一定数量的 page,并切割成定长大小的小块内存,分配给 central cache。当一个 span 的几个跨度页的对象都回收以后,page cache 会回收 central cache 满足条件的 span 对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
高并发内存池--thread cache
thread cache 是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个 thread cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
- 当内存申请 size
- 如果自由链表_freeLists[i] 中有对象,则直接 Pop 一个内存对象返回。
- 如果_freeLists[i] 中没有对象时,则批量从 central cache 中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于 256k 时将内存释放回 thread cache,计算 size 映射自由链表桶位置 i,将对象 Push 到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到 central cache。
计算对象大小的对齐映射规则
class SizeClass {
public:
static inline size_t _RoundUp(size_t bytes, size_t AlignNum) {
return (((bytes)+AlignNum - 1) & ~(AlignNum - 1));
}
static inline size_t RoundUp(size_t size) {
if (size <= 128) {
return _RoundUp(size, 8);
} else if (size <= 1024) {
return _RoundUp(size, 16);
} else if (size <= 8*1024) {
return _RoundUp(size, 128);
} else if (size <= 64*1024) {
return _RoundUp(size, 1024);
} else if (size <= 256*1024) {
return _RoundUp(size, 8*1024);
} else {
assert(false);
return -1;
}
};
};
Common.h
#pragma once
#include<iostream>
#include<vector>
#include <new>
#include <time.h>
#include <assert.h>
using std::cout;
using std::endl;
void*& NextObj(void* obj) {
return *(void**)obj;
}
class FreeList {
public:
void Push(void* obj) {
assert(obj);
NextObj(obj) = _freeList;
_freeList = obj;
}
void* Pop() {
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(obj);
}
private:
void* _freeList;
};
ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache {
public:
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
private:
FreeList _freeLists[];
};
解决 thread cache 的锁问题
高并发内存池--central cache
central cache 也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是 SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在 span 的自由链表中。
申请内存:
**1. 当 thread cache 中没有内存时,就会批量向 central cache 申请一些内存对象,这里的批量获取对象 的数量使用了类似网络 tcp 协议拥塞控制的慢开始算法;central cache 也有一个哈希映射的 spanlist,spanlist 中挂着 span,从 span 中取出对象给 thread cache,这个过程是需要加锁的,不 过这里使用的是一个桶锁,尽可能提高效率。
-
central cache 映射的 spanlist 中所有 span 的都没有内存以后,则需要向 page cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中 取对象给 thread cache。
-
central cache 的中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给 thread cache,就++use_count**
释放内存:
1. 当 thread_cache 过长或者线程销毁,则会将内存释放回 central cache 中的,释放回来时-- use_count。当 use_count 减到 0 时表示所有对象都回到了 span,则将 span 释放回 page cache,page cache 中会对前后相邻的空闲页进行合并。
#pragma once
#include"Common.h"
class CentralCache {
public:
static CentralCache* GetInstance() {
return &_sInst;
}
Span* GetOneSpan(SpanList& list, size_t byte_size);
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
#pragma once
#include<iostream>
#include<thread>
#include<vector>
#include<algorithm>
#include <new>
#include <time.h>
#include <assert.h>
#include <mutex>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256;
static const size_t NFREELIST = 256;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#elif
#endif
static void*& NextObj(void* obj) {
return *(void**)obj;
}
class FreeList {
public:
void Push(void* obj) {
assert(obj);
NextObj(obj) = _freeList;
_freeList = obj;
}
void PushRange(void* start, void* end) {
NextObj(end) = _freeList;
_freeList = start;
}
void* Pop() {
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(obj);
return obj;
}
bool Empty() {
return _freeList == nullptr;
}
size_t& MaxSize() {
return _maxSize;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
};
class SizeClass {
public:
static inline size_t _RoundUp(size_t bytes, size_t AlignNum) {
return (((bytes)+AlignNum - 1) & ~(AlignNum - 1));
}
static inline size_t RoundUp(size_t size) {
if (size <= 128) {
return _RoundUp(size, 8);
} else if (size <= 1024) {
return _RoundUp(size, 16);
} else if (size <= 8*1024) {
return _RoundUp(size, 128);
} else if (size <= 64*1024) {
return _RoundUp(size, 1024);
} else if (size <= 256*1024) {
return _RoundUp(size, 8*1024);
} else {
assert(false);
return -1;
}
}
static inline size_t _Index(size_t bytes, size_t align_shift) {
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t bytes) {
assert(bytes <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);
} else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
} else if (bytes <= 81024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
} else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
} else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
} else {
assert(false);
}
return -1;
}
static size_t NumMoveSize(size_t size) {
assert(size > 0);
int num = MAX_BYTES / size;
if (num < 2) num = 2;
if (num > 512) num = 512;
return num;
}
};
struct Span {
PAGE_ID _pageId = 0;
size_t _n = 0;
Span* _next = nullptr;
Span* _prev = nullptr;
size_t _useCount = 0;
void* _freeList = nullptr;
};
class SpanList {
public:
SpanList() {
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
void Insert(Span*pos,Span*newSpan) {
assert(newSpan);
assert(pos);
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos) {
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx;
};
#include "CentralCache.h"
CentralCache CentralCache::_sInst;
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) {
return nullptr;
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while ((i < batchNum - 1 && NextObj(end) != nullptr)) {
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {
size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum) {
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 1);
if (actualNum == 1) {
assert(start == end);
return start;
} else {
_freeLists[index].PushRange(NextObj(start), end);
return start;
}
}
void* ThreadCache::Allocate(size_t size) {
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty()) {
return _freeLists[index].Pop();
} else {
return FetchFromCentralCache(index, alignSize);
}
}
void ThreadCache::Deallocate(void* ptr, size_t size) {
assert(size <= MAX_BYTES);
assert(ptr);
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
高并发内存池--page cache
申请内存:
-
当 central cache 向 page cache 申请内存时,page cache 先检查对应位置有没有 span,如果没有则 向更大页寻找一个 span,如果找到则分裂成两个。比如:申请的是 4 页 page,4 页 page 后面没有挂 span,则向后 面寻找更大的 span,假设在 10 页 page 位置找到一个 span,则将 10 页 page span 分裂 为一个 4 页 page span 和一个 6 页 page span。
-
如果找到_spanList[128] 都没有合适的 span,则向系统使用 mmap、brk 或者是 VirtualAlloc 等方式 申请 128 页 page span 挂在自由链表中,再重复 1 中的过程。
-
需要注意的是 central cache 和 page cache 的核心结构都是 spanlist 的哈希桶,但是他们是有本质区别的,central cache 中哈希桶,是按跟 thread cache 一样的大小对齐关系映射的,他的 spanlist 中 挂的 span 中的内存都被按映射关系切好链接成小块内存的自由链表。而 page cache 中的 spanlist 则 是按下标桶号映射的,也就是说第 i 号桶中挂的 span 都是 i 页内存。
释放内存:
- 如果 central cache 释放回一个 span,则依次寻找 span 的前后 page id 的没有在使用的空闲 span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span,减少内存碎片。
从堆中申请内存,页号和地址的关系
在操作系统中,堆内存管理通常涉及将内存分为多个页面,每个页面通常是 4KB 大小(这个大小可能会根据不同的系统架构而有所不同)。页号和地址之间的关系是直接的,可以通过以下方式计算:
PAGE_SIZE 是每个页的大小,通常是 4KB(即 4096 字节)。
page_number 是页号,一个非负整数。
address 是内存中的某个地址。
address = page_number * PAGE_SIZE
反过来,如果你有一个内存地址,你可以通过以下公式计算它所在的页号:
page_number = address / PAGE_SIZE
这里的除法是整数除法,它会丢弃任何余数,只保留结果的整数部分。
以下是一个简单的示例,假设 PAGE_SIZE 是 4096:
#include <iostream>
#define PAGE_SIZE 4096
uintptr_t getPageAddress(int page_number) {
return static_cast<uintptr_t>(page_number) * PAGE_SIZE;
}
int getAddressPageNumber(uintptr_t address) {
return static_cast<int>(address / PAGE_SIZE);
}
int main() {
int page_number = 10;
uintptr_t address = getPageAddress(page_number);
std::cout << "Page " << page_number << " starts at address " << address << std::endl;
int calculated_page_number = getAddressPageNumber(address);
std::cout << "Address " << address << " is on page " << calculated_page_number << std::endl;
return 0;
}
在这个例子中,如果你调用 getPageAddress(10),它将返回页号为 10 的起始地址,即 10 * 4096。如果你有一个地址,比如 0x10000(它是 4096 的十六进制表示),调用 getAddressPageNumber(0x10000) 将返回页号 2,因为 0x10000 / 4096 = 2。
内存回收
测试
高并发内存池 - 使用定长内存池配合脱离使用 new
释放对象时优化为不传对象大小
测试 + 改错
性能优化
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online