Skip to content

Commit

Permalink
Split huge read and write requests in TAlignedDeviceHandler (#2024)
Browse files Browse the repository at this point in the history
  • Loading branch information
drbasic committed Sep 19, 2024
1 parent 7ea55e9 commit 84dfcb2
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 61 deletions.
249 changes: 198 additions & 51 deletions cloud/blockstore/libs/service/device_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,23 @@ struct TBlocksInfo

////////////////////////////////////////////////////////////////////////////////

static TResultOrError<bool> TryToNormalize(
// Removes the first blockCount elements from the sgList. Returns these removed
// items in TGuardedSgList.
TGuardedSgList TakeHeadBlocks(TGuardedSgList& sgList, ui32 blockCount)
{
auto guard = sgList.Acquire();
if (!guard) {
return {};
}

const TSgList& blockList = guard.Get();
auto result =
sgList.Create({blockList.begin(), blockList.begin() + blockCount});
sgList.SetSgList({blockList.begin() + blockCount, blockList.end()});
return result;
}

TResultOrError<bool> TryToNormalize(
TGuardedSgList& guardedSgList,
const TBlocksInfo& blocksInfo,
ui64 length,
Expand Down Expand Up @@ -82,7 +98,7 @@ static TResultOrError<bool> TryToNormalize(

////////////////////////////////////////////////////////////////////////////////

static TBlocksInfo ConvertRangeToBlocks(
TBlocksInfo ConvertRangeToBlocks(
ui64 from,
ui64 length,
ui32 blockSize)
Expand Down Expand Up @@ -176,7 +192,7 @@ class TDeviceHandler final
}

private:
TReadBlocksResponseFuture ExecuteReadBlocks(
TReadBlocksResponseFuture ExecuteAlignedReadRequest(
TCallContextPtr ctx,
const TBlocksInfo& blocksInfo,
TGuardedSgList sgList,
Expand Down Expand Up @@ -419,7 +435,7 @@ TReadBlocksResponseFuture TDeviceHandler::Read(
checkpointId);
}

return ExecuteReadBlocks(
return ExecuteAlignedReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
Expand Down Expand Up @@ -611,7 +627,7 @@ void TDeviceHandler::PrepareRequests(
}
}

TReadBlocksResponseFuture TDeviceHandler::ExecuteReadBlocks(
TReadBlocksResponseFuture TDeviceHandler::ExecuteAlignedReadRequest(
TCallContextPtr ctx,
const TBlocksInfo& blocksInfo,
TGuardedSgList sgList,
Expand Down Expand Up @@ -810,27 +826,28 @@ TFuture<NProto::TError> TDeviceHandler::CreateResponseFuture(

////////////////////////////////////////////////////////////////////////////////

class TAlignedDeviceHandler
class TAlignedDeviceHandler final
: public IDeviceHandler
, public std::enable_shared_from_this<TAlignedDeviceHandler>
{
private:
const IStoragePtr Storage;
const TString ClientId;
const ui32 BlockSize;
const ui32 ZeroBlocksCountLimit;
const ui32 MaxBlockCount;

public:
TAlignedDeviceHandler(
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit)
ui32 maxBlockCount)
: Storage(std::move(storage))
, ClientId(std::move(clientId))
, BlockSize(blockSize)
, ZeroBlocksCountLimit(zeroBlocksCountLimit)
, MaxBlockCount(maxBlockCount)
{
Y_ABORT_UNLESS(ZeroBlocksCountLimit > 0);
Y_ABORT_UNLESS(MaxBlockCount > 0);
}

TFuture<NProto::TReadBlocksLocalResponse> Read(
Expand All @@ -853,17 +870,11 @@ class TAlignedDeviceHandler
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
}

auto request = std::make_shared<NProto::TReadBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetCheckpointId(checkpointId);
request->SetStartIndex(blocksInfo.Range.Start);
request->SetBlocksCount(blocksInfo.Range.Size());
request->BlockSize = BlockSize;
request->Sglist = std::move(sgList);

return Storage->ReadBlocksLocal(std::move(ctx), std::move(request));
return ExecuteReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
checkpointId);
}

TFuture<NProto::TWriteBlocksLocalResponse> Write(
Expand All @@ -885,16 +896,10 @@ class TAlignedDeviceHandler
TErrorResponse(E_ARGUMENT, "Request is not aligned"));
}

auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(blocksInfo.Range.Start);
request->BlocksCount = blocksInfo.Range.Size();
request->BlockSize = BlockSize;
request->Sglist = std::move(sgList);

return Storage->WriteBlocksLocal(std::move(ctx), std::move(request));
return ExecuteWriteRequest(
std::move(ctx),
blocksInfo,
std::move(sgList));
}

TFuture<NProto::TZeroBlocksResponse> Zero(
Expand Down Expand Up @@ -925,34 +930,177 @@ class TAlignedDeviceHandler
}

private:
TReadBlocksResponseFuture ExecuteReadRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList,
TString checkpointId) const
{
auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

auto request = std::make_shared<NProto::TReadBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetCheckpointId(checkpointId);
request->SetStartIndex(blocksInfo.Range.Start);
request->SetBlocksCount(requestBlockCount);
request->BlockSize = BlockSize;

if (requestBlockCount == blocksInfo.Range.Size()) {
// The request size is quite small. We do all work at once.
request->Sglist = std::move(sgList);
return Storage->ReadBlocksLocal(std::move(ctx), std::move(request));
}

// Take the list of blocks that we will execute in the first
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TReadBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
}

auto result = Storage->ReadBlocksLocal(ctx, std::move(request));

blocksInfo.Range = TBlockRange64::WithLength(
blocksInfo.Range.Start + requestBlockCount,
blocksInfo.Range.Size() - requestBlockCount);
Y_DEBUG_ABORT_UNLESS(blocksInfo.Range.Size());

return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
blocksInfo = blocksInfo,
sgList = std::move(sgList),
checkpointId = std::move(checkpointId)](const auto& future) mutable
{
auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteReadRequest(
std::move(ctx),
blocksInfo,
std::move(sgList),
std::move(checkpointId));
}
return MakeFuture<NProto::TReadBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}

TWriteBlocksResponseFuture ExecuteWriteRequest(
TCallContextPtr ctx,
TBlocksInfo blocksInfo,
TGuardedSgList sgList) const
{
auto requestBlockCount =
std::min<ui32>(blocksInfo.Range.Size(), MaxBlockCount);

auto request = std::make_shared<NProto::TWriteBlocksLocalRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(blocksInfo.Range.Start);
request->BlocksCount = requestBlockCount;
request->BlockSize = BlockSize;

if (requestBlockCount == blocksInfo.Range.Size()) {
// The request size is quite small. We do all work at once.
request->Sglist = std::move(sgList);
return Storage->WriteBlocksLocal(
std::move(ctx),
std::move(request));
}

// Take the list of blocks that we will execute in the first
// sub-request and leave the rest in original sgList.
request->Sglist = TakeHeadBlocks(sgList, requestBlockCount);
if (request->Sglist.Empty()) {
return MakeFuture<NProto::TWriteBlocksResponse>(TErrorResponse(
E_CANCELLED,
"failed to acquire sglist in DeviceHandler"));
}

auto result = Storage->WriteBlocksLocal(ctx, std::move(request));

blocksInfo.Range = TBlockRange64::WithLength(
blocksInfo.Range.Start + requestBlockCount,
blocksInfo.Range.Size() - requestBlockCount);
Y_DEBUG_ABORT_UNLESS(blocksInfo.Range.Size());

return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
blocksInfo = blocksInfo,
sgList = std::move(sgList)](const auto& future) mutable
{
auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteWriteRequest(
std::move(ctx),
blocksInfo,
std::move(sgList));
}
return MakeFuture<NProto::TWriteBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}

TZeroBlocksResponseFuture ExecuteZeroRequest(
TCallContextPtr ctx,
ui64 startIndex,
ui32 blocksCount)
ui32 blockCount) const
{
auto requestBlocksCount = std::min(blocksCount, ZeroBlocksCountLimit);
auto requestBlockCount = std::min(blockCount, MaxBlockCount);

auto request = std::make_shared<NProto::TZeroBlocksRequest>();
request->MutableHeaders()->SetRequestId(ctx->RequestId);
request->MutableHeaders()->SetTimestamp(TInstant::Now().MicroSeconds());
request->MutableHeaders()->SetClientId(ClientId);
request->SetStartIndex(startIndex);
request->SetBlocksCount(requestBlocksCount);

auto result = Storage->ZeroBlocks(ctx, std::move(request));

return result.Apply([=] (const auto& future) mutable {
auto response = future.GetValue();
request->SetBlocksCount(requestBlockCount);

startIndex += requestBlocksCount;
blocksCount -= requestBlocksCount;
if (requestBlockCount == blockCount) {
// The request size is quite small. We do all work at once.
return Storage->ZeroBlocks(std::move(ctx), std::move(request));
}

if (blocksCount == 0 || HasError(response)) {
return MakeFuture(response);
}
auto result = Storage->ZeroBlocks(ctx, std::move(request));

return ExecuteZeroRequest(std::move(ctx), startIndex, blocksCount);
});
return result.Apply(
[ctx = std::move(ctx),
weakPtr = weak_from_this(),
startIndex = startIndex + requestBlockCount,
blocksCount =
blockCount - requestBlockCount](const auto& future) mutable
{
// Only part of the request was completed. Continue doing the
// rest of the work

auto response = future.GetValue();
if (HasError(response)) {
return MakeFuture(response);
}

if (auto self = weakPtr.lock()) {
return self->ExecuteZeroRequest(
std::move(ctx),
startIndex,
blocksCount);
}
return MakeFuture<NProto::TZeroBlocksResponse>(
TErrorResponse(E_CANCELLED));
});
}
};

Expand All @@ -965,23 +1113,22 @@ struct TDefaultDeviceHandlerFactory final
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit,
bool unalignedRequestsDisabled) \
override
ui32 maxBlockCount,
bool unalignedRequestsDisabled) override
{
if (unalignedRequestsDisabled) {
return std::make_shared<TAlignedDeviceHandler>(
std::move(storage),
std::move(clientId),
blockSize,
zeroBlocksCountLimit);
maxBlockCount);
}

return std::make_shared<TDeviceHandler>(
std::move(storage),
std::move(clientId),
blockSize,
zeroBlocksCountLimit);
maxBlockCount);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/service/device_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct IDeviceHandlerFactory
IStoragePtr storage,
TString clientId,
ui32 blockSize,
ui32 zeroBlocksCountLimit,
ui32 maxBlockCount,
bool unalignedRequestsDisabled) = 0;
};

Expand Down
Loading

0 comments on commit 84dfcb2

Please sign in to comment.