Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split huge read and write requests in TAlignedDeviceHandler #2024

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 198 additions & 51 deletions cloud/blockstore/libs/service/device_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,23 @@ struct TBlocksInfo

////////////////////////////////////////////////////////////////////////////////

static TResultOrError<bool> TryToNormalize(
// Removes the first blockCount elements from the sgList. Returns these removed
// items in TGuardedSgList.
TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount)
{
auto guard = sgList.Acquire();
if (!guard) {
return {};
}

const TSgList& blockList = guard.Get();
auto result =
sgList.Create({blockList.begin(), blockList.begin() + blockCount});
sgList.SetSgList({blockList.begin() + blockCount, blockList.end()});
return result;
}

TResultOrError<bool> TryToNormalize(
TGuardedSgList& guardedSgList,
const TBlocksInfo& blocksInfo,
ui64 length,
Expand Down Expand Up @@ -82,7 +98,7 @@ static TResultOrError<bool> TryToNormalize(

////////////////////////////////////////////////////////////////////////////////

static TBlocksInfo ConvertRangeToBlocks(
TBlocksInfo ConvertRangeToBlocks(
ui64 from,
ui64 length,
ui32 blockSize)
Expand Down Expand Up @@ -176,7 +192,7 @@ class TDeviceHandler final
}

private:
TReadBlocksResponseFuture ExecuteReadBlocks(
TReadBlocksResponseFuture ExecuteAlignedReadRequest(
TCallContextPtr ctx,
const TBlocksInfo& blocksInfo,
TGuardedSgList sgList,
Expand Down Expand Up @@ -419,7 +435,7 @@ TReadBlocksResponseFuture TDeviceHandler::Read(
checkpointId);
}

return ExecuteReadBlocks(
return ExecuteAlignedReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
Expand Down Expand Up @@ -611,7 +627,7 @@ void TDeviceHandler::PrepareRequests(
}
}

TReadBlocksResponseFuture TDeviceHandler::ExecuteReadBlocks(
TReadBlocksResponseFuture TDeviceHandler::ExecuteAlignedReadRequest(
TCallContextPtr ctx,
const TBlocksInfo& blocksInfo,
TGuardedSgList sgList,
Expand Down Expand Up @@ -810,27 +826,28 @@ TFuture<NProto::TError> TDeviceHandler::CreateResponseFuture(

////////////////////////////////////////////////////////////////////////////////

class TAlignedDeviceHandler
class TAlignedDeviceHandler final
: public IDeviceHandler
, public std::enable_shared_from_this<TAlignedDeviceHandler>
{
private:
const IStoragePtr Storage;
const TString ClientId;
const ui32 BlockSize;
const ui32 ZeroBlocksCountLimit;
const ui32 MaxBlockCount;

public:
TAlignedDeviceHandler(
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit)
ui32 maxBlockCount)
: Storage(std::move(storage))
, ClientId(std::move(clientId))
, BlockSize(blockSize)
, ZeroBlocksCountLimit(zeroBlocksCountLimit)
, MaxBlockCount(maxBlockCount)
{
Y_ABORT_UNLESS(ZeroBlocksCountLimit > 0);
Y_ABORT_UNLESS(MaxBlockCount > 0);
}

TFuture<NProto::TReadBlocksLocalResponse> Read(
Expand All @@ -853,17 +870,11 @@ class TAlignedDeviceHandler
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
}

auto request = std::make_shared<NProto::TReadBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetCheckpointId(checkpointId);
request->SetStartIndex(blocksInfo.Range.Start);
request->SetBlocksCount(blocksInfo.Range.Size());
request->BlockSize = BlockSize;
request->Sglist = std::move(sgList);

return Storage->ReadBlocksLocal(std::move(ctx), std::move(request));
return ExecuteReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
checkpointId);
}

TFuture<NProto::TWriteBlocksLocalResponse> Write(
Expand All @@ -885,16 +896,10 @@ class TAlignedDeviceHandler
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
}

auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(blocksInfo.Range.Start);
request->BlocksCount = blocksInfo.Range.Size();
request->BlockSize = BlockSize;
request->Sglist = std::move(sgList);

return Storage->WriteBlocksLocal(std::move(ctx), std::move(request));
return ExecuteWriteRequest(
std::move(ctx),
blocksInfo,
std::move(sgList));
}

TFuture<NProto::TZeroBlocksResponse> Zero(
Expand Down Expand Up @@ -925,34 +930,177 @@ class TAlignedDeviceHandler
}

private:
TReadBlocksResponseFuture ExecuteReadRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList,
TString checkpointId) const
{
auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

auto request = std::make_shared<NProto::TReadBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
drbasic marked this conversation as resolved.
Show resolved Hide resolved
request->MutableHeaders()->SetClientId(ClientId);
request->SetCheckpointId(checkpointId);
request->SetStartIndex(blocksInfo.Range.Start);
request->SetBlocksCount(requestBlockCount);
request->BlockSize = BlockSize;

if (requestBlockCount == blocksInfo.Range.Size()) {
// The request size is quite small. We do all work at once.
request->Sglist = std::move(sgList);
return Storage->ReadBlocksLocal(std::move(ctx), std::move(request));
}

// Take the list of blocks that we will execute in the first
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TReadBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
}

auto result = Storage->ReadBlocksLocal(ctx, std::move(request));

blocksInfo.Range = TBlockRange64::WithLength(
blocksInfo.Range.Start + requestBlockCount,
blocksInfo.Range.Size() - requestBlockCount);
Y_DEBUG_ABORT_UNLESS(blocksInfo.Range.Size());

return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
blocksInfo = blocksInfo,
sgList = std::move(sgList),
checkpointId = std::move(checkpointId)](const auto& future) mutable
{
auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
std::move(checkpointId));
}
return MakeFuture<NProto::TReadBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}

TWriteBlocksResponseFuture ExecuteWriteRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList) const
{
auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(blocksInfo.Range.Start);
request->BlocksCount = requestBlockCount;
request->BlockSize = BlockSize;

if (requestBlockCount == blocksInfo.Range.Size()) {
// The request size is quite small. We do all work at once.
request->Sglist = std::move(sgList);
return Storage->WriteBlocksLocal(
std::move(ctx),
std::move(request));
}

// Take the list of blocks that we will execute in the first
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
drbasic marked this conversation as resolved.
Show resolved Hide resolved
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TWriteBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
}

auto result = Storage->WriteBlocksLocal(ctx, std::move(request));

blocksInfo.Range = TBlockRange64::WithLength(
blocksInfo.Range.Start + requestBlockCount,
blocksInfo.Range.Size() - requestBlockCount);
Y_DEBUG_ABORT_UNLESS(blocksInfo.Range.Size());

return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
blocksInfo = blocksInfo,
sgList = std::move(sgList)](const auto& future) mutable
{
auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteWriteRequest(
std::move(ctx),
blocksInfo,
std::move(sgList));
}
return MakeFuture<NProto::TWriteBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}

TZeroBlocksResponseFuture ExecuteZeroRequest(
TCallContextPtr ctx,
ui64 startIndex,
ui32 blocksCount)
ui32 blockCount) const
{
auto requestBlocksCount = std::min(blocksCount, ZeroBlocksCountLimit);
auto requestBlockCount = std::min(blockCount, MaxBlockCount);

auto request = std::make_shared<NProto::TZeroBlocksRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(startIndex);
request->SetBlocksCount(requestBlocksCount);

auto result = Storage->ZeroBlocks(ctx, std::move(request));

return result.Apply([=] (const auto& future) mutable {
auto response = future.GetValue();
request->SetBlocksCount(requestBlockCount);

startIndex += requestBlocksCount;
blocksCount -= requestBlocksCount;
if (requestBlockCount == blockCount) {
// The request size is quite small. We do all work at once.
return Storage->ZeroBlocks(std::move(ctx), std::move(request));
}

if (blocksCount == 0 || HasError(response)) {
return MakeFuture(response);
}
auto result = Storage->ZeroBlocks(ctx, std::move(request));

return ExecuteZeroRequest(std::move(ctx), startIndex, blocksCount);
});
return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
startIndex = startIndex + requestBlockCount,
blocksCount =
blockCount - requestBlockCount](const auto& future) mutable
{
// Only part of the request was completed. Continue doing the
// rest of the work

auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteZeroRequest(
std::move(ctx),
startIndex,
blocksCount);
}
return MakeFuture<NProto::TZeroBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}
};

Expand All @@ -965,23 +1113,22 @@ struct TDefaultDeviceHandlerFactory final
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit,
bool unalignedRequestsDisabled) \
override
ui32 maxBlockCount,
bool unalignedRequestsDisabled) override
{
if (unalignedRequestsDisabled) {
return std::make_shared<TAlignedDeviceHandler>(
std::move(storage),
std::move(clientId),
blockSize,
zeroBlocksCountLimit);
maxBlockCount);
}

return std::make_shared<TDeviceHandler>(
std::move(storage),
std::move(clientId),
blockSize,
zeroBlocksCountLimit);
maxBlockCount);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/service/device_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct IDeviceHandlerFactory
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit,
ui32 maxBlockCount,
bool unalignedRequestsDisabled) = 0;
};

Expand Down
Loading
Loading