Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reshuffle kmeans actor scan #9725

Merged
merged 7 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ message TEvLocalKMeansRequest {

// id of parent cluster
optional uint32 Parent = 15;
// [Child ... Child + K] ids reserved for our clusters
// [Child ... Child + K) ids reserved for this kmeans clusters
optional uint32 Child = 16;

optional string LevelName = 17;
Expand All @@ -1532,7 +1532,7 @@ message TEvLocalKMeansRequest {
repeated string DataColumns = 20;
}

message TEvLocalKMeansProgressResponse {
message TEvLocalKMeansResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
Expand All @@ -1552,6 +1552,53 @@ message TEvLocalKMeansProgressResponse {
// optional uint32 DoneRounds = 11;
}

message TEvReshuffleKMeansRequest {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 SnapshotTxId = 4;
optional uint64 SnapshotStep = 5;

optional uint64 SeqNoGeneration = 6;
optional uint64 SeqNoRound = 7;

optional Ydb.Table.VectorIndexSettings Settings = 8;

optional TEvLocalKMeansRequest.EState Upload = 9;

// id of parent cluster
optional uint32 Parent = 10;
// [Child ... Child + ClustersSize) ids of this kmeans clusters
optional uint32 Child = 11;
// centroids of clusters
repeated string Clusters = 12;

optional string PostingName = 13;

optional string EmbeddingColumn = 14;
repeated string DataColumns = 15;
}

message TEvReshuffleKMeansResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 RequestSeqNoGeneration = 4;
optional uint64 RequestSeqNoRound = 5;

optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;
// optional last written primary key
}

message TEvCdcStreamScanRequest {
message TLimits {
optional uint32 BatchMaxBytes = 1 [default = 512000];
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/buffer_data.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "ydb/core/scheme/scheme_tablecell.h"
#include "ydb/core/tx/datashard/upload_stats.h"
#include "ydb/core/tx/tx_proxy/upload_rows.h"
Expand Down
25 changes: 20 additions & 5 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ struct TEvDataShard {
EvSampleKResponse,

EvLocalKMeansRequest,
EvLocalKMeansProgressResponse,
EvLocalKMeansResponse,

EvReshuffleKMeansRequest,
EvReshuffleKMeansResponse,

EvEnd
};
Expand Down Expand Up @@ -1457,16 +1460,28 @@ struct TEvDataShard {
TEvDataShard::EvSampleKResponse> {
};

struct TEvReshuffleKMeansRequest
: public TEventPB<TEvReshuffleKMeansRequest,
NKikimrTxDataShard::TEvReshuffleKMeansRequest,
TEvDataShard::EvReshuffleKMeansRequest> {
};

struct TEvReshuffleKMeansResponse
: public TEventPB<TEvReshuffleKMeansResponse,
NKikimrTxDataShard::TEvReshuffleKMeansResponse,
TEvDataShard::EvReshuffleKMeansResponse> {
};

struct TEvLocalKMeansRequest
: public TEventPB<TEvLocalKMeansRequest,
NKikimrTxDataShard::TEvLocalKMeansRequest,
TEvDataShard::EvLocalKMeansRequest> {
};

struct TEvLocalKMeansProgressResponse
: public TEventPB<TEvLocalKMeansProgressResponse,
NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
TEvDataShard::EvLocalKMeansProgressResponse> {
struct TEvLocalKMeansResponse
: public TEventPB<TEvLocalKMeansResponse,
NKikimrTxDataShard::TEvLocalKMeansResponse,
TEvDataShard::EvLocalKMeansResponse> {
};

struct TEvKqpScan
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class TDataShard
class TTxHandleSafeBuildIndexScan;
class TTxHandleSafeSampleKScan;
class TTxHandleSafeLocalKMeansScan;
class TTxHandleSafeReshuffleKMeansScan;
class TTxHandleSafeStatisticsScan;

class TTxMediatorStateRestored;
Expand Down Expand Up @@ -1325,6 +1326,8 @@ class TDataShard
void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -3117,6 +3120,7 @@ class TDataShard
HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle);
HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle);
HFunc(TEvDataShard::TEvSampleKRequest, Handle);
HFunc(TEvDataShard::TEvReshuffleKMeansRequest, Handle);
HFunc(TEvDataShard::TEvLocalKMeansRequest, Handle);
HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);
Expand Down
Loading
Loading