Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions include/tasks/batch_write_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,9 @@ class BatchWriteTask : public WriteTask
KvError LoadApplyingPage(PageId page_id);
std::pair<MemIndexPage *, KvError> Pop();

struct DirtyIndexPage
{
~DirtyIndexPage();
std::string key_;
MemIndexPage *page_{nullptr};
PageId page_id_{MaxPageId};
};
KvError FinishIndexPage(DirtyIndexPage &prev_page,
KvError FinishIndexPage(MemIndexPage *&prev_page,
std::string &prev_key,
PageId &prev_page_id,
std::string cur_page_key);
KvError FlushIndexPage(MemIndexPage *new_page,
std::string idx_page_key,
Expand Down Expand Up @@ -170,4 +165,4 @@ class BatchWriteTask : public WriteTask
static void AdvanceIndexPageIter(IndexPageIter &iter, bool &is_valid);
};

} // namespace eloqstore
} // namespace eloqstore
67 changes: 28 additions & 39 deletions src/tasks/batch_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,14 +771,16 @@ std::pair<MemIndexPage *, KvError> BatchWriteTask::Pop()
// We keep the previous built page in the pipeline before flushing it to
// storage. This is to redistribute between last two pages in case the last
// page is sparse.
DirtyIndexPage prev_page;
MemIndexPage *prev_page = nullptr;
std::string prev_key;
PageId prev_page_id = MaxPageId;
std::string_view page_key =
stack_.size() == 1 ? std::string_view{}
: stack_[stack_.size() - 2]->idx_page_iter_.Key();
std::string curr_page_key{page_key};
if (stack_page != nullptr)
{
prev_page.page_id_ = stack_page->GetPageId();
prev_page_id = stack_page->GetPageId();
}

auto add_to_page = [&](std::string_view new_key,
Expand All @@ -788,7 +790,8 @@ std::pair<MemIndexPage *, KvError> BatchWriteTask::Pop()
idx_page_builder_.Add(new_key, new_page_id, is_leaf_index);
if (!success)
{
err = FinishIndexPage(prev_page, std::move(curr_page_key));
err = FinishIndexPage(
prev_page, prev_key, prev_page_id, std::move(curr_page_key));
CHECK_KV_ERR(err);
curr_page_key = new_key;
idx_page_builder_.Reset();
Expand Down Expand Up @@ -908,30 +911,29 @@ std::pair<MemIndexPage *, KvError> BatchWriteTask::Pop()
IndexStackEntry *parent = stack_[stack_.size() - 2].get();
std::string_view page_key = parent->idx_page_iter_.Key();
parent->changes_.emplace_back(
std::string(page_key), prev_page.page_id_, WriteOp::Delete);
std::string(page_key), prev_page_id, WriteOp::Delete);
}
}
else
{
bool splited = prev_page.page_ != nullptr;
err = FinishIndexPage(prev_page, std::move(curr_page_key));
bool splited = prev_page != nullptr;
err = FinishIndexPage(
prev_page, prev_key, prev_page_id, std::move(curr_page_key));
if (err != KvError::NoError)
{
return {nullptr, err};
}
err = FlushIndexPage(prev_page.page_,
std::move(prev_page.key_),
prev_page.page_id_,
splited);
err = FlushIndexPage(
prev_page, std::move(prev_key), prev_page_id, splited);
if (err != KvError::NoError)
{
return {nullptr, err};
}
if (!splited)
{
new_root = prev_page.page_;
new_root = prev_page;
}
prev_page.page_ = nullptr;
prev_page = nullptr;
}

if (stack_page != nullptr)
Expand All @@ -942,58 +944,45 @@ std::pair<MemIndexPage *, KvError> BatchWriteTask::Pop()
return {new_root, KvError::NoError};
}

KvError BatchWriteTask::FinishIndexPage(DirtyIndexPage &prev,
KvError BatchWriteTask::FinishIndexPage(MemIndexPage *&prev_page,
std::string &prev_key,
PageId &prev_page_id,
std::string cur_page_key)
{
assert(!idx_page_builder_.IsEmpty());
const uint16_t cur_page_len = idx_page_builder_.CurrentSizeEstimate();
std::string_view page_view = idx_page_builder_.Finish();
if (prev.page_ != nullptr)
if (prev_page != nullptr)
{
// Redistributing only if the current page length is smaller than
// one quarter of data_page_size and the previous page length is
// bigger than three quarter of data_page_size.
const uint16_t one_quarter = Options()->data_page_size >> 2;
const uint16_t three_quarter = Options()->data_page_size - one_quarter;
if (cur_page_len < one_quarter && prev.page_ != nullptr &&
prev.page_->RestartNum() > 1 &&
prev.page_->ContentLength() > three_quarter)
if (cur_page_len < one_quarter && prev_page != nullptr &&
prev_page->RestartNum() > 1 &&
prev_page->ContentLength() > three_quarter)
{
page_view = Redistribute(prev.page_, page_view, cur_page_key);
page_view = Redistribute(prev_page, page_view, cur_page_key);
}

KvError err = FlushIndexPage(
prev.page_, std::move(prev.key_), prev.page_id_, true);
KvError err =
FlushIndexPage(prev_page, std::move(prev_key), prev_page_id, true);
CHECK_KV_ERR(err);
prev.page_ = nullptr;
prev.page_id_ = MaxPageId;
prev_page = nullptr;
prev_page_id = MaxPageId;
}
MemIndexPage *cur_page = shard->IndexManager()->AllocIndexPage();
if (cur_page == nullptr)
{
return KvError::OutOfMem;
}
memcpy(cur_page->PagePtr(), page_view.data(), page_view.size());
prev.page_ = cur_page;
prev.key_ = std::move(cur_page_key);
prev_page = cur_page;
prev_key = std::move(cur_page_key);
return KvError::NoError;
}

BatchWriteTask::DirtyIndexPage::~DirtyIndexPage()
{
if (page_ != nullptr)
{
if (page_->InFreeList())
{
page_ = nullptr;
return;
}
assert(page_->IsDetached());
shard->IndexManager()->FreeIndexPage(page_);
page_ = nullptr;
}
}

KvError BatchWriteTask::FlushIndexPage(MemIndexPage *idx_page,
std::string idx_page_key,
PageId page_id,
Expand Down