Skip to content

Commit

Permalink
TEvRecords helpers, little style guide changes (#8421)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Aug 28, 2024
1 parent dee88c3 commit e5fc449
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 71 deletions.
5 changes: 0 additions & 5 deletions ydb/core/change_exchange/change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
}

// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(const TChangeRecordVector& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TChangeRecordVector&& records)
: Records(std::move(records))
{
Expand Down
26 changes: 19 additions & 7 deletions ydb/core/change_exchange/change_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <util/string/builder.h>
#include <util/string/join.h>

#include <memory>
#include <variant>

namespace NKikimr {

namespace NDataShard {
Expand Down Expand Up @@ -51,12 +54,6 @@ struct TChangeRecordContainer {};

namespace NKikimr::NChangeExchange {

using TChangeRecordVector = std::variant<
std::shared_ptr<TChangeRecordContainer<NDataShard::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NReplication::NService::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NBackup::NImpl::TChangeRecord>>
>;

struct TEvChangeExchange {
enum EEv {
// Enqueue for sending
Expand Down Expand Up @@ -120,11 +117,26 @@ struct TEvChangeExchange {
};

struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
using TChangeRecordVector = std::variant<
std::shared_ptr<TChangeRecordContainer<NDataShard::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NReplication::NService::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NBackup::NImpl::TChangeRecord>>
>;

TChangeRecordVector Records;

explicit TEvRecords(const TChangeRecordVector& records);
explicit TEvRecords(TChangeRecordVector&& records);
TString ToString() const override;

template <typename T>
static TEvRecords* New(TVector<typename T::TPtr>&& records) {
return new TEvRecords(std::make_shared<TChangeRecordContainer<T>>(std::move(records)));
}

template <typename T>
inline auto& GetRecords() {
return std::get<std::shared_ptr<TChangeRecordContainer<T>>>(Records)->Records;
}
};

struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include "change_exchange.h"
#include "change_sender_resolver.h"
#include "change_sender_partitioner.h"
#include "change_sender_resolver.h"

#include <ydb/core/change_exchange/change_sender_monitoring.h>

Expand All @@ -18,8 +18,6 @@
#include <util/generic/set.h>
#include <util/string/builder.h>

#include <concepts>

namespace NKikimr::NChangeExchange {

struct TEvChangeExchangePrivate {
Expand Down Expand Up @@ -72,7 +70,7 @@ class ISenderFactory {
virtual IActor* CreateSender(ui64 partitionId) const = 0;
};

template <class TChangeRecord>
template <typename TChangeRecord>
class TBaseChangeSender {
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
// we need this to safely cast and call Out on a container
Expand Down Expand Up @@ -291,7 +289,7 @@ class TBaseChangeSender {
}

Y_ABORT_UNLESS(sender.ActorId);
ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::make_shared<TChangeRecordContainer<TChangeRecord>>(std::exchange(sender.Prepared, {}))));
ActorOps->Send(sender.ActorId, TEvChangeExchange::TEvRecords::New<TChangeRecord>(std::exchange(sender.Prepared, {})));
}

void ReEnqueueRecords(const TSender& sender) {
Expand Down Expand Up @@ -767,6 +765,8 @@ class TBaseChangeSender {
IActorOps* const ActorOps;
IChangeSenderResolver* const Resolver;
ISenderFactory* const SenderFactory;
THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner;

protected:
TActorId ChangeServer;
const TPathId PathId;
Expand All @@ -783,7 +783,6 @@ class TBaseChangeSender {

TVector<ui64> GonePartitions;

THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner;
}; // TBaseChangeSender

}
47 changes: 23 additions & 24 deletions ydb/core/change_exchange/change_sender_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@
namespace NKikimr::NChangeExchange {

ui64 ResolveSchemaBoundaryPartitionId(const NKikimr::TKeyDesc& keyDesc, TConstArrayRef<TCell> key) {
const auto& partitions = keyDesc.GetPartitions();
Y_ABORT_UNLESS(partitions);
const auto& schema = keyDesc.KeyColumnTypes;

const auto range = TTableRange(key);
Y_ABORT_UNLESS(range.Point);

const auto it = LowerBound(
partitions.cbegin(), partitions.cend(), true,
[&](const auto& partition, bool) {
Y_ABORT_UNLESS(partition.Range);
const int compares = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
partition.Range->IsInclusive || partition.Range->IsPoint,
range.InclusiveFrom || range.Point, schema
);

return (compares < 0);
}
);

Y_ABORT_UNLESS(it != partitions.cend());
return it->ShardId;
const auto& partitions = keyDesc.GetPartitions();
Y_ABORT_UNLESS(partitions);
const auto& schema = keyDesc.KeyColumnTypes;

const auto range = TTableRange(key);
Y_ABORT_UNLESS(range.Point);

const auto it = LowerBound(
partitions.cbegin(), partitions.cend(), true,
[&](const auto& partition, bool) {
Y_ABORT_UNLESS(partition.Range);
const int compares = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
partition.Range->IsInclusive || partition.Range->IsPoint,
range.InclusiveFrom || range.Point, schema
);

return (compares < 0);
}
);

Y_ABORT_UNLESS(it != partitions.cend());
return it->ShardId;
}


} // NKikimr::NChangeExchange
20 changes: 8 additions & 12 deletions ydb/core/change_exchange/change_sender_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <util/generic/vector.h>

namespace NKikimr::NChangeExchange {

template<typename TChangeRecord>
template <typename TChangeRecord>
class IChangeSenderPartitioner {
public:
virtual ~IChangeSenderPartitioner() = default;

virtual ui64 ResolvePartitionId(const typename TChangeRecord::TPtr& record) const = 0;
};


ui64 ResolveSchemaBoundaryPartitionId(const NKikimr::TKeyDesc& keyDesc, TConstArrayRef<TCell> key);

template<typename TChangeRecord>
class SchemaBoundaryPartitioner final : public IChangeSenderPartitioner<TChangeRecord> {
template <typename TChangeRecord>
class TSchemaBoundaryPartitioner final : public IChangeSenderPartitioner<TChangeRecord> {
public:
SchemaBoundaryPartitioner(const NKikimr::TKeyDesc& keyDesc)
: KeyDesc(keyDesc) {
TSchemaBoundaryPartitioner(const NKikimr::TKeyDesc& keyDesc)
: KeyDesc(keyDesc)
{
}

ui64 ResolvePartitionId(const typename TChangeRecord::TPtr& record) const override {
Expand All @@ -34,11 +32,9 @@ class SchemaBoundaryPartitioner final : public IChangeSenderPartitioner<TChangeR
const NKikimr::TKeyDesc& KeyDesc;
};


template<typename TChangeRecord>
template <typename TChangeRecord>
IChangeSenderPartitioner<TChangeRecord>* CreateSchemaBoundaryPartitioner(const NKikimr::TKeyDesc& keyDesc) {
return new SchemaBoundaryPartitioner<TChangeRecord>(keyDesc);
return new TSchemaBoundaryPartitioner<TChangeRecord>(keyDesc);
}


} // NKikimr::NChangeExchange
2 changes: 0 additions & 2 deletions ydb/core/change_exchange/change_sender_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <util/generic/vector.h>

namespace NKikimr::NChangeExchange {

class IChangeSenderResolver {
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
records->Record.SetOrigin(DataShard.TabletId);
records->Record.SetGeneration(DataShard.Generation);

auto& evRecords = std::get<std::shared_ptr<TChangeRecordContainer<NKikimr::NDataShard::TChangeRecord>>>(ev->Get()->Records)->Records;

for (auto& recordPtr : evRecords) {
for (auto recordPtr : ev->Get()->GetRecords<TChangeRecord>()) {
const auto& record = *recordPtr;

if (record.GetOrder() <= LastRecordOrder) {
Expand Down Expand Up @@ -728,8 +726,7 @@ class TAsyncIndexChangeSenderMain

void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto& records = std::get<std::shared_ptr<TChangeRecordContainer<NKikimr::NDataShard::TChangeRecord>>>(ev->Get()->Records)->Records;
ProcessRecords(std::move(records));
ProcessRecords(std::move(ev->Get()->GetRecords<TChangeRecord>()));
}

void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
LOG_D("Handle " << ev->Get()->ToString());
NKikimrClient::TPersQueueRequest request;

auto& records = std::get<std::shared_ptr<TChangeRecordContainer<NKikimr::NDataShard::TChangeRecord>>>(ev->Get()->Records)->Records;
for (auto recordPtr : records) {
for (auto recordPtr : ev->Get()->GetRecords<TChangeRecord>()) {
const auto& record = *recordPtr;

if (record.GetSeqNo() <= MaxSeqNo) {
Expand Down Expand Up @@ -699,8 +698,7 @@ class TCdcChangeSenderMain

void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto& records = std::get<std::shared_ptr<TChangeRecordContainer<NKikimr::NDataShard::TChangeRecord>>>(ev->Get()->Records)->Records;
ProcessRecords(std::move(records));
ProcessRecords(std::move(ev->Get()->GetRecords<TChangeRecord>()));
}

void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send " << records.size() << " change records"
<< ": to# " << to
<< ", at tablet# " << Self->TabletID());
ctx.Send(to, new NChangeExchange::TEvChangeExchange::TEvRecords(std::make_shared<TChangeRecordContainer<NKikimr::NDataShard::TChangeRecord>>(std::move(records))));
ctx.Send(to, NChangeExchange::TEvChangeExchange::TEvRecords::New<TChangeRecord>(std::move(records)));
}

size_t forgotten = 0;
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/replication/service/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace NKikimr::NReplication::NService {

template <class TChangeRecord>
template <typename TChangeRecord>
class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TChangeRecord>> {
using TBase = TActorBootstrapped<TTablePartitionWriter<TChangeRecord>>;
using TThis = TTablePartitionWriter;
Expand Down Expand Up @@ -83,9 +83,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh

TString source;

auto& records = std::get<std::shared_ptr<TChangeRecordContainer<TChangeRecord>>>(ev->Get()->Records)->Records;

for (auto recordPtr : records) {
for (auto recordPtr : ev->Get()->GetRecords<TChangeRecord>()) {
const auto& record = *recordPtr;
record.Serialize(*event->Record.AddChanges(), BuilderContext);

Expand Down Expand Up @@ -213,7 +211,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh

}; // TTablePartitionWriter

template <class TChangeRecord>
template <typename TChangeRecord>
class TLocalTableWriter
: public TActor<TLocalTableWriter<TChangeRecord>>
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
Expand Down

0 comments on commit e5fc449

Please sign in to comment.