CMU 15-445 Fall 2022 Project1: Buffer Pool
Lapin Gris Lv3

这个项目是实现 bpm,简而言之是实现一个磁盘缓冲区,用于加速数据库应用的 IO 操作。

背景

众所周知,磁盘(SSD/HDD)的访问速度是很慢的,内存 DRAM 访问相比较很快了。我们希望为磁盘经常访问的数据在内存建立对应的缓存来加速数据库应用的访问延迟。

那我们把所有的数据都搬到内存里不就行了?

说的有道理,但我们不能这样做,一个原因是我们不是内存数据库(e.g. redis),我们是类似 MySQL / PostgreSQL 的 disk-based 磁盘数据库,主要解决数据的持久化存储问题,因此数据是存储在磁盘上的。同时因为数据量会很大,如果将持久性数据库的所有数据全部搬到内存里对内存需求会很大,需要购买更多的更大的内存才能满足需求。(注:内存比磁盘贵,贵很多。。。会有经济压力)

另外一个原因是,数据访问是具有局部性特点的,并不是所有数据都是热点数据,我们只需要在内存里缓存热点数据,保证绝大多数磁盘访问请求可以得到加速即可。

所以我们不能把所有磁盘的数据都搬到内存里来加速,我们请求一个固定大小的一块内存区域作为的 buffer pool,把最常用的页在内存里建立缓存。

image.png

图:不同存储介质的访问延迟

回到项目,项目将实现 buffer pool manager 分为 3 个子任务,

  • 可扩展哈希,是一种增强型哈希。解决最简单取余 hash 的缺点。
  • LRU-K 置换,是一种增强型 LRU。解决单队列 LRU 不能应对突发访问的,导致缓存被冲掉的问题。
  • 实现 bpm,利用前两个实现的数据结构。集合在一起实现 bpm 的逻辑。

这个项目本身不难,主要时间在实现数据结构(雾),所以需要我们能够理解项目的设计,画出草图。是在画不出来边 debug 边画也是可以的。同时因为可以使用 STL 库,数据结构的增删查改操作显得很轻易。

Task 1 Extendible Hash Table

Task 1 是需要实现一个 Extendible Hashtable,原理很好懂,难点在于实现,有不少细节需要注意。
实现步骤:首先实现 Bucket 增删查改,然后实现 ExtendibleHashTable 增删查改。

Bucket

Find & Remove & Insert

属于 std:list 数据结构的增删查改的一部分。
值得欣喜的是可以使用 C++17 的结构化绑定语法了,会比写 first、second 优雅很多。

// Find
template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Find(const K &key, V &value) -> bool {
// Structured Binding make code looks pretty
for (auto &[k, v] : list_) {
if (k == key) {
value = v;
return true;
}
}
return false;
}

// Remove
template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Remove(const K &key) -> bool {
for (auto it = list_.begin(); it != list_.end(); it++) {
if (it->first == key) {
list_.erase(it);
return true;
}
}
return false;
}

// Insert
template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Insert(const K &key, const V &value) -> bool {
// Check if a key already exists in the bucket.
for (auto &[k, v] : list_) {
if (k == key) {
// Key already exists, update the value and return true.
v = value;
return true;
}
}
// Key doesn't exist, add a new pair if the bucket isn't full.
if (list_.size() < size_) {
list_.emplace_back(key, value);
return true;
}
// Bucket is full, return false.
return false;
}

ExtendibleHashTable

有了前面对 Bucket 增删查改的支持后,现在我们可以实现对 ExtendibleHashTable 的增删查改了。

构造函数

需要实现 ExtendibleHashTable 的构造函数,使用emplace_back 代替 push_back获得更好的性能。push_back 会将对象复制一遍。

template <typename K, typename V>
ExtendibleHashTable<K, V>::ExtendibleHashTable(size_t bucket_size)
: global_depth_(0), bucket_size_(bucket_size), num_buckets_(1) {
auto b = std::make_shared<Bucket>(bucket_size_, 0);
dir_.emplace_back(b);
}

Find

查找到 key 对应的bucket,然后到对应的 bucket 里面找,这里可以调用 Bucket 实现的 Find。

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Find(const K &key, V &value) -> bool {
std::scoped_lock<std::mutex> lock(latch_);

size_t index = IndexOf(key);
auto bucket = dir_[index];
return bucket->Find(key, value);
}

Remove

查找到 key 对应的bucket,然后到对应的 bucket 里面删除,这里可以调用 Bucket 实现的 Remove。我记得15445 往年的删除操作是要求实现 shink 的,现在不要求实现 shink (缩容)操作了,会更加简单一些,调用实现的 Bucket 的 api 就好了。如果要求实现 shink 操作,那么代码变得像 Insert 那样长了,不会像这样简单的几行。

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Remove(const K &key) -> bool {
std::scoped_lock<std::mutex> lock(latch_);

size_t index = IndexOf(key);
auto bucket = dir_[index];
return bucket->Remove(key);
}

Insert

Insert 操作是整个 hashtable 操作的核心。
这里有几个部分需要介绍一下,当 local_depth == global_depth_ 时候的 dir_ 扩容是使用 std::copy_n,这段代码这是为了维护扩容后,每个新扩的 dir_ 的索引项都会指向原先的 bucket。

template <typename K, typename V>
void ExtendibleHashTable<K, V>::Insert(const K &key, const V &value) {
std::scoped_lock<std::mutex> lock(latch_);

size_t index = IndexOf(key);
auto bucket = dir_[index];

if (!bucket->IsFull()) {
bucket->Insert(key, value);
return;
}

while (bucket->IsFull()) {
// If the bucket is full, split it
int local_depth = bucket->GetDepth();
if (local_depth == global_depth_) {
size_t dir_size = dir_.size();
dir_.reserve(dir_size * 2);
std::copy_n(dir_.begin(), dir_size, std::back_inserter(dir_));
global_depth_++;
}

auto b0 = std::make_shared<Bucket>(bucket_size_, local_depth + 1);
auto b1 = std::make_shared<Bucket>(bucket_size_, local_depth + 1);
num_buckets_++;

int local_mask = 1 << local_depth;

// redistribute old bucket
for (const auto &[k, v] : bucket->GetItems()) {
auto new_index_of = std::hash<K>()(k) & local_mask;
// new_index_of = {0, 1}
if (new_index_of) {
b1->Insert(k, v);
} else {
b0->Insert(k, v);
}
}

// update hashtable
size_t start_index = std::hash<K>()(key) & (local_mask - 1);
for (size_t i = start_index; i < dir_.size(); i += local_mask) {
if (static_cast<bool>(i & local_mask)) {
dir_[i] = b1;
} else {
dir_[i] = b0;
}
}

index = IndexOf(key);
bucket = dir_[index];
}
// now we have enough space to insert into
dir_[index]->Insert(key, value);
}

DEBUG

how to debug on M1?

m1 上可以用 lldb 来进行 debug。

  • f 当前栈帧,等价 gdb 中的 frame。用于查看当前执行到哪里了
  • n 下一步,等价 gdb 中的 n。执行下一步。
  • p 打印,等价 gdb 中的 p。打印变量。

举一个当时做项目 debug 的例子,看图,
image.png

image.png

debug 时候会发现, lldb 打印了结构体类型就打印了很长很长,,,,这是因为 C++ STL 标准库后面隐藏了很多细节,但这些细节在 debug 时候会暴露出来了,图中可以看到 debug 时候会 lldb 输出的一个对象结构体是一层套一层。。。
Don’t panic,不熟悉的同学可以去看看侯捷的《STL源码剖析》这本书,这本书是每一位 C++er 必看的一本书了。书中介绍了STL 标准库如何实现这些 vector, string 等高级数据结构的。b 站也有对应的视频,搭配食用,效果更好哈。

为什么我通过不了InsertMultipleSplit ?

InsertMultipleSplit,测试检测的是,某一次插入会造成 dir_ 不止一次的 split。
通常没通过的原因是判断 bucket->IsFull()只判断了一次,实际需要使用 while 循环持续判断 bucket 是否满了,直到知道找到适合的位置。
下面解释为什么,
首先,这是两个测试用例,

// 测试用例 1
table->Insert(0, "0");
table->Insert(1024, "1024");
table->Insert(4, "4"); // this causes 3 splits

// 测试用例 2
table->Insert(0, "0");
table->Insert(1024, "1024");
table->Insert(16, "16"); // this causes 5 splits

这是测试用例每个 key 的二进制表示,

# python3
>>> bin(0)
'0b0'
>>> bin(1024)
'0b10000000000'
>>> bin(4)
'0b100'
>>> bin(16)
'0b10000'

插入导致的 split 次数和我们 Indexof 函数有关系的,Indexof 函数是判断当前的 key 应该放到那个 bucket 里面的,如果 这个 bucket 满了需要继续 split 至少找到一个可以放下 key 的 bucket。
那 Indexof 是如何判断应该放到那个 bucket 里的? 比如说,现在我们有 dir_ 表项的长度是 8,代表有 8 个 bucket ,现在有一个 key 需要被放到 8 个 bucket 中的一个,那么应该选择哪一个 bucket,用最简单的 hash 函数即可,取余 index = key % 8,取余之后 key 会自然落到想要的范围内。这也是项目里 Indexof 的实现原理,项目里取余的数不是一个固定的数 8 而是 global_depth 。另外项目没有采用 % 运算,而是用位运算来实现。

那么,我什么构造 0,1024,4 就能让 bucket 分裂三次?这个 global_depth 有关系,global_depth 代表需要使用多少位才能区分出所有的 bucket 。区分的方式是通过 key 对应的二进制数低位,通过增加低位的个数,直至区分出来。看上面 0,1024,4 三个数的二进制的低位(这里只写了 3 位)分别是 [000,000,100],当进行插入的时候,bucket 大小是 2,只通过低位可以区别这两个数,那么 0,1024 会被放到第一个 bucket 里。插入 4 的时候,4 的低位第一位是 0,可以插入第一个 bucket 里的,但是 bucket 只能放 2 个 key,已经满了,这时候发生 split,需要通过更多的 bit 来区分两个 bucket,因为 4 的低位 bit 是[100],0,1024 的 低位 bit 是[000] ,能区分两个 bucket 最小都要到低位第三个 bit。这也是为什么要 split 三次的原因。
另,测试用例 2 插入 16 也是同理,至少需要到第 5 位 bit才能区分出两个 bucket,如果插入的数字不是 16 而是 512,那么需要 split 10 次才能区分出两个 bucket。(512 的二进制表示是 1000000000

Task 2 LRU-K Replacement Policy

LRU-K 是 LRU 算法的变种,LRU 置换算法的通常用于磁盘缓存(从内核角度讲,更准确的是文件系统的缓存,文件系统有页的概念,磁盘属于块设备,没有页的概念,当然这无关精要)。
回到正题,通常磁盘缓存的形态是有一个队列,存放缓存的页,有对磁盘某一页的访问,就会将这个页加入缓存。缓存都会设置一定的大小,如果缓存满了,就会将最近没有访问过的页从磁盘缓存里踢掉,但是 LRU 有一个队列,比较难以应对突发访问,称为 burst。突发访问的特点是,量很大,但是只会访问一次,因为 burst 将之前的所有缓存都刷了,确实是一个问题,可以改进。
这时候引入两个队列(LRU-K 的K通常取 2)就可以解决这个问题,一个不活跃队列,一个活跃队列(其他叫法:历史队列/缓存队列,冷队列/热队列),访问页会记录访问次数,次数达到 n 次,从不活跃队列加入活跃队列。
这样,就能解决前面提到的 burst 的问题了。
这两个队列,完全是可以采用完全不同的替换策略,各自独立。比如项目里,不活跃队列要求采用 FIFO,活跃队列采用 LRU 算法。这种实现又有另一个名字叫做 2Q 策略。

注:不活跃使用 FIFO 队列,活跃队列采用 LRU 置换策略可能是 Task 2 最大的坑点了,理清楚 LRU 是干啥的,编码不难。项目注释说的不是很清楚,很容易两个队列都使用 LRU 置换策略。

构造函数

LRUKReplacer::LRUKReplacer(size_t num_frames, size_t k) : replacer_size_(num_frames), k_(k) {}

RecordAccess

记录当前页被访问了,LRU-K 需要通过访问记录来控制缓存停留在不活跃队列,还是移入活跃队列。
LRU 算法的队列可以取队头防止最频繁访问的缓存,取队尾是新加入的缓存。也可以反过来,我采用的是队尾存放最经常访问的缓存。目的为了方便遍历的时候从队首开始,而不是从队尾开始。

void LRUKReplacer::RecordAccess(frame_id_t frame_id) {
std::scoped_lock<std::mutex> lock(latch_);

BUSTUB_ASSERT(frame_id < static_cast<frame_id_t>(replacer_size_), "Invalid frame_id, this shouldn't happen");

auto it = table_.find(frame_id);
// new, should append to inactive list and update LRU-K metadata
if (it == table_.end()) {
inactive_list_.emplace_back(frame_id);
struct FrameItem f_item = {inactive_list_.end()--, 1, false};
table_.insert({frame_id, f_item});
return;
}
// already in table
size_t accs_cnt = it->second.nr_accessed_ + 1;

if (accs_cnt < k_) {
// should still stay in inactive list

struct FrameItem *f_item = &(it->second);
f_item->nr_accessed_ = accs_cnt;
} else if (accs_cnt == k_) {
// Promote from inactive to active
inactive_list_.remove(frame_id);
active_list_.emplace_back(frame_id);

struct FrameItem *f_item = &(it->second);
f_item->iter_ = active_list_.end()--;
f_item->nr_accessed_ = accs_cnt;
} else {
// Already active, move to the active side
active_list_.remove(frame_id);
active_list_.emplace_back(frame_id);

struct FrameItem *f_item = &(it->second);
f_item->iter_ = active_list_.end()--;
f_item->nr_accessed_ = accs_cnt;
}
}

SetEvictable

将一个页标记为是否可以被 Evict 掉,2019 去年这个函数分为两个函数,叫做 pin/unpin。感觉,还是 pin/unpin 更容易理解一些。对 nr_evictable 的更新这里用了两个三元表达式简化 if 嵌套判断。不得不说,三元表达式真好用,嘿嘿。


void LRUKReplacer::SetEvictable(frame_id_t frame_id, bool set_evictable) {
std::scoped_lock<std::mutex> lock(latch_);

auto it = table_.find(frame_id);
if (it == table_.end()) {
return;
}
bool prev_status = it->second.evictable_;
/*
* Update evictable count based on status change
* status changed ? --> (yes) update evictable count --> (false -> true) increase count
* --> (true -> false) decrease count
* --> (no) doesn't need change
*/
nr_evictable_ += (prev_status != set_evictable) ? (set_evictable ? 1 : -1) : 0;
it->second.evictable_ = set_evictable;
}

Evict

驱逐,当缓存满了的时候,就要驱逐一些缓存页了。先从不活跃队列驱逐,如果不活跃队列里的缓存队列不满足驱逐条件,就从活跃队列驱逐。


auto LRUKReplacer::Evict(frame_id_t *frame_id) -> bool {
std::scoped_lock<std::mutex> lock(latch_);

// FIFO, evict should start from head
if (std::any_of(inactive_list_.begin(), inactive_list_.end(), [&](auto f) {
bool evictable = table_[f].evictable_;
if (evictable) {
inactive_list_.remove(f);
nr_evictable_--;
table_.erase(f);
*frame_id = f;
}
return evictable;
})) {
return true;
}

// LRU, stores the most recently used items at the tail(my solution),
// so evict should start from head
if (std::any_of(active_list_.begin(), active_list_.end(), [&](auto f) {
bool evictable = table_[f].evictable_;
if (evictable) {
active_list_.remove(f);
nr_evictable_--;
table_.erase(f);
*frame_id = f;
}
return evictable;
})) {
return true;
}

return false;
}

注意看,这里使用了 std::anyof 代替了 for 循环,这是 clang-tidy 要求的,Clang-Tidy: Replace loop by 'std::any_of()',原先我使用 for 循环写的,目的是可以用C++17 的结构体,另外我把 LRU 活跃缓存放在队尾也是为了能在这里使用结构化绑定,多简洁。
后来还是换成 std::anyof 了,因为不给换项目不给过。。

auto LRUKReplacer::Evict(frame_id_t *frame_id) -> bool {
for (auto f : inactive_list_) {
bool evictable = table_[f].evictable_;

if (evictable) {
inactive_list_.remove(f);
nr_evictable_--;
table_.erase(f);

*frame_id = f;
return true;
}
}

for (auto f : active_list_) {
bool evictable = table_[f].evictable_;

if (evictable) {
active_list_.remove(f);
nr_evictable_--;
table_.erase(f);

*frame_id = f;
return true;
}
}

return false;
}

Remove

将一个缓存移除。需要注意的是因为,我们引入了两个队列,所以从两个队列都要扫一遍。但是是有加速方案的,就是判断访问次数,根据次数可以判断在哪个队列,避免无脑扫两次链表。

void LRUKReplacer::Remove(frame_id_t frame_id) {
std::scoped_lock<std::mutex> lock(latch_);

auto it = table_.find(frame_id);
if (it == table_.end()) {
return;
}

if (it->second.nr_accessed_ < k_) {
inactive_list_.remove(frame_id);
nr_evictable_--;
table_.erase(frame_id);
} else {
active_list_.remove(frame_id);
nr_evictable_--;
table_.erase(frame_id);
}
}

Size

auto LRUKReplacer::Size() -> size_t {
std::scoped_lock<std::mutex> lock(latch_);
return nr_evictable_;
}

DEBUG

遇到过项目要求理解错了导致测试用例 failed 的情况,
image.png

修改成这样可以过了,项目的意思是 inactive list 用 FIFO 换出策略。active list 才采用 classic LRU 换出策略。inactive list 也采用 LRU 换出策略,会导致此测试用例 failed。

# git diff
diff --git a/src/buffer/lru_k_replacer.cpp b/src/buffer/lru_k_replacer.cpp
index d2f0eea..8b41fa0 100644
--- a/src/buffer/lru_k_replacer.cpp
+++ b/src/buffer/lru_k_replacer.cpp
@@ -67,8 +67,6 @@ void LRUKReplacer::RecordAccess(frame_id_t frame_id) {

if (accs_cnt < k_) {
// should still stay in inactive list
- inactive_list_.remove(frame_id);
- inactive_list_.emplace_back(frame_id);

struct FrameItem *f_item = &(it->second);
f_item->nr_accessed_ = accs_cnt;

Task #3 - Buffer Pool Manager Instance

实现 bpm,bpm 负责管理磁盘的页缓存。

理解构造函数

无需实现,已经实现好了。理解构造函数,对于理解 bpm 的设计具有重要意义。理解了,后面实现也会轻松很多。
bpm 负责管理磁盘缓存,自然会涉及磁盘页的读取和写入操作。这部分已经被逻辑实现了,无需实现,只要记得调用 disk_manager_ 的逻辑写和读接口即可。

观察构造函数,我们可以发现,bpm 的成员主要是这四个,

  • disk_manager 负责磁盘页的读写操作
  • page_table 记录磁盘页(page)在内存帧(frame)的映射关系,同一个东西的不同角度表示
  • replacer 缓存空间有限,缓存管理者
  • free_list bpm 的缓存空闲链表,用完了就找 replacer 要了,让他踢掉几个不常用的页
// 构造函数
BufferPoolManagerInstance::BufferPoolManagerInstance(size_t pool_size, DiskManager *disk_manager, size_t replacer_k, LogManager *log_manager)
: pool_size_(pool_size), disk_manager_(disk_manager), log_manager_(log_manager) {
// we allocate a consecutive memory space for the buffer pool
pages_ = new Page[pool_size_];
page_table_ = new ExtendibleHashTable<page_id_t, frame_id_t>(bucket_size_);
replacer_ = new LRUKReplacer(pool_size, replacer_k);

// Initially, every page is in the free list.
for (size_t i = 0; i < pool_size_; ++i) {
free_list_.emplace_back(static_cast<int>(i));
}
}

新增一个 helper 函数

将寻找一个新的 frame 的操作抽象到一个函数 TryToFindAvailFrame 中。

// src/include/buffer/buffer_pool_manager_instance.cpp
/**
* @brief Try to find usable frame in buffer pool
* @param[out] frame_id id of the page to deallocate
* @return true if found, false if all frames are pined
*/
auto TryToFindAvailFrame(frame_id_t *frame_id) -> bool;

// src/buffer/buffer_pool_manager_instance.cpp
auto BufferPoolManagerInstance::TryToFindAvailFrame(frame_id_t *frame_id) -> bool {
if (!free_list_.empty()) {
*frame_id = free_list_.back();
free_list_.pop_back();
return true;
}
if (replacer_->Evict(frame_id)) {
if (pages_[*frame_id].IsDirty()) {
disk_manager_->WritePage(pages_[*frame_id].page_id_, pages_[*frame_id].data_);
pages_[*frame_id].is_dirty_ = false;
}
page_table_->Remove(pages_[*frame_id].page_id_);
return true;
}
return false;
}

NewPgImp

在 bpm 里新申请一个 page,会通过 Page 结构的构造函数分配一个页出来(实现代码:src/include/storage/page/page.h),返回是一个指向页的指针。拿到指向页的指针,应用程序可以往页里写数据了,页的大小是固定的,是通过宏定义的,不用担心只有指针却没有大小。

auto BufferPoolManagerInstance::NewPgImp(page_id_t *page_id) -> Page * {
std::scoped_lock<std::mutex> lock(latch_);

frame_id_t frame_id;
if (TryToFindAvailFrame(&frame_id)) {
*page_id = AllocatePage();
pages_[frame_id].page_id_ = *page_id;
pages_[frame_id].pin_count_ = 1;

page_table_->Insert(*page_id, frame_id);
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);

return &pages_[frame_id];
}
return nullptr;
}

FetchPgImp

fetch 操作代表我要从磁盘将一个页取到内存中,我知道这个页的 id,我需要得到这个页在内存里的地址,因为待会应用程序需要读写这个页。
这里有两种情况了,如果这个页本身就在内存中,那么直接返回它的地址即可。如果不在内存,而是被换出到磁盘上了,就需要重新将磁盘的页读到内存中,然后返回页的内存地址。

auto BufferPoolManagerInstance::FetchPgImp(page_id_t page_id) -> Page * {
std::scoped_lock<std::mutex> lock(latch_);

frame_id_t frame_id;
if (page_table_->Find(page_id, frame_id)) {
pages_[frame_id].pin_count_++;
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);

return &pages_[frame_id];
}

if (TryToFindAvailFrame(&frame_id)) {
pages_[frame_id].page_id_ = page_id;
pages_[frame_id].pin_count_ = 1;
pages_[frame_id].is_dirty_ = false;
disk_manager_->ReadPage(page_id, pages_[frame_id].data_);

page_table_->Insert(page_id, frame_id);
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);

return &pages_[frame_id];
}
return nullptr;
}

UnpinPgImp

对页 unpin,代表应用程序暂时不会用到这个页,指示这个页不必一直停留到内存里,可以被 bpm 换出到磁盘上,毕竟内存资源不是无限的

auto BufferPoolManagerInstance::UnpinPgImp(page_id_t page_id, bool is_dirty) -> bool {
std::scoped_lock<std::mutex> lock(latch_);

frame_id_t frame_id;
if (page_table_->Find(page_id, frame_id)) {
pages_[frame_id].pin_count_--;
if (pages_[frame_id].pin_count_ == 0) {
replacer_->SetEvictable(frame_id, true);
}
pages_[frame_id].is_dirty_ = is_dirty || pages_[frame_id].is_dirty_;

return true;
}
return false;
}

FlushPgImp

刷页,指示 bpm 将页的内存刷回到磁盘上。通常数据库应用为了防止断掉带来的 DRAM 内存里的数据丢失,会定期刷盘,将内存数据写回磁盘。

auto BufferPoolManagerInstance::FlushPgImp(page_id_t page_id) -> bool {
std::scoped_lock<std::mutex> lock(latch_);

frame_id_t frame_id;
if (page_table_->Find(page_id, frame_id)) {
if (pages_[frame_id].is_dirty_) {
disk_manager_->WritePage(page_id, pages_[frame_id].data_);
pages_[frame_id].is_dirty_ = false;
}
return true;
}
return false;
}

FlushAllPgsImp

所有页刷回磁盘。


void BufferPoolManagerInstance::FlushAllPgsImp() {
std::scoped_lock<std::mutex> lock(latch_);

for (size_t i = 0; i < pool_size_; i++) {
if (pages_[i].is_dirty_) {
disk_manager_->WritePage(pages_[i].page_id_, pages_[i].data_);
pages_[i].is_dirty_ = false;
}
}
}

Gradescope

image.png