diff --git a/go.mod b/go.mod index 89f343aa..5d75e67a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( golang.org/x/net v0.29.0 golang.org/x/sync v0.8.0 google.golang.org/grpc v1.66.0 - google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 google.golang.org/protobuf v1.34.2 ) diff --git a/go.sum b/go.sum index d75c5d4e..1a5ca3a6 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 h1:9SxA29VM43MF5Z9dQu694wmY5t8E/Gxr7s+RSxiIDmc= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0/go.mod h1:yZOK5zhQMiALmuweVdIVoQPa6eIJyXn2B9g5dJDhqX4= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index 479247d9..e851979e 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -22,22 +22,18 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// * -// SourceTransformerRequest represents a request element. -type SourceTransformRequest struct { +// Handshake message between client and server to indicate the start of transmission. +type Handshake struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Required field indicating the start of transmission. + Sot bool `protobuf:"varint,1,opt,name=sot,proto3" json:"sot,omitempty"` } -func (x *SourceTransformRequest) Reset() { - *x = SourceTransformRequest{} +func (x *Handshake) Reset() { + *x = Handshake{} if protoimpl.UnsafeEnabled { mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -45,13 +41,13 @@ func (x *SourceTransformRequest) Reset() { } } -func (x *SourceTransformRequest) String() string { +func (x *Handshake) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SourceTransformRequest) ProtoMessage() {} +func (*Handshake) ProtoMessage() {} -func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { +func (x *Handshake) ProtoReflect() protoreflect.Message { mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -63,42 +59,71 @@ func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SourceTransformRequest.ProtoReflect.Descriptor instead. -func (*SourceTransformRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{0} } -func (x *SourceTransformRequest) GetKeys() []string { +func (x *Handshake) GetSot() bool { if x != nil { - return x.Keys + return x.Sot } - return nil + return false } -func (x *SourceTransformRequest) GetValue() []byte { - if x != nil { - return x.Value +// * +// SourceTransformerRequest represents a request element. +type SourceTransformRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Request *SourceTransformRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + Handshake *Handshake `protobuf:"bytes,2,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` +} + +func (x *SourceTransformRequest) Reset() { + *x = SourceTransformRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return nil } -func (x *SourceTransformRequest) GetEventTime() *timestamppb.Timestamp { - if x != nil { - return x.EventTime +func (x *SourceTransformRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SourceTransformRequest) ProtoMessage() {} + +func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return nil + return mi.MessageOf(x) } -func (x *SourceTransformRequest) GetWatermark() *timestamppb.Timestamp { +// Deprecated: Use SourceTransformRequest.ProtoReflect.Descriptor instead. +func (*SourceTransformRequest) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1} +} + +func (x *SourceTransformRequest) GetRequest() *SourceTransformRequest_Request { if x != nil { - return x.Watermark + return x.Request } return nil } -func (x *SourceTransformRequest) GetHeaders() map[string]string { +func (x *SourceTransformRequest) GetHandshake() *Handshake { if x != nil { - return x.Headers + return x.Handshake } return nil } @@ -111,12 +136,16 @@ type SourceTransformResponse struct { unknownFields protoimpl.UnknownFields Results []*SourceTransformResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + // This ID is used to refer the responses to the request it corresponds to. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + // Handshake message between client and server to indicate the start of transmission. + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *SourceTransformResponse) Reset() { *x = SourceTransformResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -129,7 +158,7 @@ func (x *SourceTransformResponse) String() string { func (*SourceTransformResponse) ProtoMessage() {} func (x *SourceTransformResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -142,7 +171,7 @@ func (x *SourceTransformResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SourceTransformResponse.ProtoReflect.Descriptor instead. func (*SourceTransformResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2} } func (x *SourceTransformResponse) GetResults() []*SourceTransformResponse_Result { @@ -152,6 +181,20 @@ func (x *SourceTransformResponse) GetResults() []*SourceTransformResponse_Result return nil } +func (x *SourceTransformResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SourceTransformResponse) GetHandshake() *Handshake { + if x != nil { + return x.Handshake + } + return nil +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -165,7 +208,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -178,7 +221,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -191,7 +234,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{3} } func (x *ReadyResponse) GetReady() bool { @@ -201,6 +244,94 @@ func (x *ReadyResponse) GetReady() bool { return false } +type SourceTransformRequest_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // This ID is used to uniquely identify a transform request + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *SourceTransformRequest_Request) Reset() { + *x = SourceTransformRequest_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SourceTransformRequest_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SourceTransformRequest_Request) ProtoMessage() {} + +func (x *SourceTransformRequest_Request) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SourceTransformRequest_Request.ProtoReflect.Descriptor instead. +func (*SourceTransformRequest_Request) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *SourceTransformRequest_Request) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *SourceTransformRequest_Request) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SourceTransformRequest_Request) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *SourceTransformRequest_Request) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *SourceTransformRequest_Request) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *SourceTransformRequest_Request) GetId() string { + if x != nil { + return x.Id + } + return "" +} + type SourceTransformResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -215,7 +346,7 @@ type SourceTransformResponse_Result struct { func (x *SourceTransformResponse_Result) Reset() { *x = SourceTransformResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -228,7 +359,7 @@ func (x *SourceTransformResponse_Result) String() string { func (*SourceTransformResponse_Result) ProtoMessage() {} func (x *SourceTransformResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -241,7 +372,7 @@ func (x *SourceTransformResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use SourceTransformResponse_Result.ProtoReflect.Descriptor instead. func (*SourceTransformResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1, 0} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2, 0} } func (x *SourceTransformResponse_Result) GetKeys() []string { @@ -283,63 +414,83 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc8, 0x02, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0x8e, 0x04, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x4e, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x42, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, + 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, + 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0xd1, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, + 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, + 0x12, 0x5b, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x41, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, + 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, 0x0a, + 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0xcf, 0x02, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x42, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x81, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, - 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, - 0x6b, 0x12, 0x53, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x22, 0xed, 0x01, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, - 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, - 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x1a, 0x81, - 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, - 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcb, 0x01, 0x0a, 0x0f, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x70, 0x0a, - 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, - 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, - 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, - 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, - 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, + 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, + 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, + 0x32, 0xcf, 0x01, 0x0a, 0x0f, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x74, 0x0a, 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x07, 0x49, 0x73, + 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x23, 0x2e, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, + 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, + 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -354,31 +505,36 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP() []byte return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescData } -var file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_pkg_apis_proto_sourcetransform_v1_transform_proto_goTypes = []any{ - (*SourceTransformRequest)(nil), // 0: sourcetransformer.v1.SourceTransformRequest - (*SourceTransformResponse)(nil), // 1: sourcetransformer.v1.SourceTransformResponse - (*ReadyResponse)(nil), // 2: sourcetransformer.v1.ReadyResponse - nil, // 3: sourcetransformer.v1.SourceTransformRequest.HeadersEntry - (*SourceTransformResponse_Result)(nil), // 4: sourcetransformer.v1.SourceTransformResponse.Result - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*Handshake)(nil), // 0: sourcetransformer.v1.Handshake + (*SourceTransformRequest)(nil), // 1: sourcetransformer.v1.SourceTransformRequest + (*SourceTransformResponse)(nil), // 2: sourcetransformer.v1.SourceTransformResponse + (*ReadyResponse)(nil), // 3: sourcetransformer.v1.ReadyResponse + (*SourceTransformRequest_Request)(nil), // 4: sourcetransformer.v1.SourceTransformRequest.Request + nil, // 5: sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry + (*SourceTransformResponse_Result)(nil), // 6: sourcetransformer.v1.SourceTransformResponse.Result + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 8: google.protobuf.Empty } var file_pkg_apis_proto_sourcetransform_v1_transform_proto_depIdxs = []int32{ - 5, // 0: sourcetransformer.v1.SourceTransformRequest.event_time:type_name -> google.protobuf.Timestamp - 5, // 1: sourcetransformer.v1.SourceTransformRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: sourcetransformer.v1.SourceTransformRequest.headers:type_name -> sourcetransformer.v1.SourceTransformRequest.HeadersEntry - 4, // 3: sourcetransformer.v1.SourceTransformResponse.results:type_name -> sourcetransformer.v1.SourceTransformResponse.Result - 5, // 4: sourcetransformer.v1.SourceTransformResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 0, // 5: sourcetransformer.v1.SourceTransform.SourceTransformFn:input_type -> sourcetransformer.v1.SourceTransformRequest - 6, // 6: sourcetransformer.v1.SourceTransform.IsReady:input_type -> google.protobuf.Empty - 1, // 7: sourcetransformer.v1.SourceTransform.SourceTransformFn:output_type -> sourcetransformer.v1.SourceTransformResponse - 2, // 8: sourcetransformer.v1.SourceTransform.IsReady:output_type -> sourcetransformer.v1.ReadyResponse - 7, // [7:9] is the sub-list for method output_type - 5, // [5:7] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 4, // 0: sourcetransformer.v1.SourceTransformRequest.request:type_name -> sourcetransformer.v1.SourceTransformRequest.Request + 0, // 1: sourcetransformer.v1.SourceTransformRequest.handshake:type_name -> sourcetransformer.v1.Handshake + 6, // 2: sourcetransformer.v1.SourceTransformResponse.results:type_name -> sourcetransformer.v1.SourceTransformResponse.Result + 0, // 3: sourcetransformer.v1.SourceTransformResponse.handshake:type_name -> sourcetransformer.v1.Handshake + 7, // 4: sourcetransformer.v1.SourceTransformRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 7, // 5: sourcetransformer.v1.SourceTransformRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 5, // 6: sourcetransformer.v1.SourceTransformRequest.Request.headers:type_name -> sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry + 7, // 7: sourcetransformer.v1.SourceTransformResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 1, // 8: sourcetransformer.v1.SourceTransform.SourceTransformFn:input_type -> sourcetransformer.v1.SourceTransformRequest + 8, // 9: sourcetransformer.v1.SourceTransform.IsReady:input_type -> google.protobuf.Empty + 2, // 10: sourcetransformer.v1.SourceTransform.SourceTransformFn:output_type -> sourcetransformer.v1.SourceTransformResponse + 3, // 11: sourcetransformer.v1.SourceTransform.IsReady:output_type -> sourcetransformer.v1.ReadyResponse + 10, // [10:12] is the sub-list for method output_type + 8, // [8:10] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() } @@ -388,7 +544,7 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } if !protoimpl.UnsafeEnabled { file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*SourceTransformRequest); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -400,7 +556,7 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*SourceTransformResponse); i { + switch v := v.(*SourceTransformRequest); i { case 0: return &v.state case 1: @@ -412,6 +568,18 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*SourceTransformResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*ReadyResponse); i { case 0: return &v.state @@ -424,6 +592,18 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*SourceTransformRequest_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*SourceTransformResponse_Result); i { case 0: return &v.state @@ -436,13 +616,15 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1].OneofWrappers = []any{} + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/sourcetransform/v1/transform.proto b/pkg/apis/proto/sourcetransform/v1/transform.proto index 8885db08..7eec50c9 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.proto +++ b/pkg/apis/proto/sourcetransform/v1/transform.proto @@ -11,21 +11,35 @@ service SourceTransform { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse); + rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } +/* + * Handshake message between client and server to indicate the start of transmission. + */ + message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; +} + /** * SourceTransformerRequest represents a request element. */ message SourceTransformRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; + message Request { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; + // This ID is used to uniquely identify a transform request + string id = 6; + } + Request request = 1; + optional Handshake handshake = 2; } /** @@ -39,6 +53,10 @@ message SourceTransformResponse { repeated string tags = 4; } repeated Result results = 1; + // This ID is used to refer the responses to the request it corresponds to. + string id = 2; + // Handshake message between client and server to indicate the start of transmission. + optional Handshake handshake = 3; } /** diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index 9fd6bd4a..52995c34 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -31,7 +31,7 @@ type SourceTransformClient interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) + SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -44,14 +44,36 @@ func NewSourceTransformClient(cc grpc.ClientConnInterface) SourceTransformClient return &sourceTransformClient{cc} } -func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) { +func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(SourceTransformResponse) - err := c.cc.Invoke(ctx, SourceTransform_SourceTransformFn_FullMethodName, in, out, cOpts...) + stream, err := c.cc.NewStream(ctx, &SourceTransform_ServiceDesc.Streams[0], SourceTransform_SourceTransformFn_FullMethodName, cOpts...) if err != nil { return nil, err } - return out, nil + x := &sourceTransformSourceTransformFnClient{ClientStream: stream} + return x, nil +} + +type SourceTransform_SourceTransformFnClient interface { + Send(*SourceTransformRequest) error + Recv() (*SourceTransformResponse, error) + grpc.ClientStream +} + +type sourceTransformSourceTransformFnClient struct { + grpc.ClientStream +} + +func (x *sourceTransformSourceTransformFnClient) Send(m *SourceTransformRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnClient) Recv() (*SourceTransformResponse, error) { + m := new(SourceTransformResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *sourceTransformClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { @@ -71,7 +93,7 @@ type SourceTransformServer interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) + SourceTransformFn(SourceTransform_SourceTransformFnServer) error // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedSourceTransformServer() @@ -81,8 +103,8 @@ type SourceTransformServer interface { type UnimplementedSourceTransformServer struct { } -func (UnimplementedSourceTransformServer) SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") +func (UnimplementedSourceTransformServer) SourceTransformFn(SourceTransform_SourceTransformFnServer) error { + return status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") } func (UnimplementedSourceTransformServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") @@ -100,22 +122,30 @@ func RegisterSourceTransformServer(s grpc.ServiceRegistrar, srv SourceTransformS s.RegisterService(&SourceTransform_ServiceDesc, srv) } -func _SourceTransform_SourceTransformFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SourceTransformRequest) - if err := dec(in); err != nil { +func _SourceTransform_SourceTransformFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SourceTransformServer).SourceTransformFn(&sourceTransformSourceTransformFnServer{ServerStream: stream}) +} + +type SourceTransform_SourceTransformFnServer interface { + Send(*SourceTransformResponse) error + Recv() (*SourceTransformRequest, error) + grpc.ServerStream +} + +type sourceTransformSourceTransformFnServer struct { + grpc.ServerStream +} + +func (x *sourceTransformSourceTransformFnServer) Send(m *SourceTransformResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnServer) Recv() (*SourceTransformRequest, error) { + m := new(SourceTransformRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(SourceTransformServer).SourceTransformFn(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: SourceTransform_SourceTransformFn_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SourceTransformServer).SourceTransformFn(ctx, req.(*SourceTransformRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } func _SourceTransform_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { @@ -143,15 +173,18 @@ var SourceTransform_ServiceDesc = grpc.ServiceDesc{ ServiceName: "sourcetransformer.v1.SourceTransform", HandlerType: (*SourceTransformServer)(nil), Methods: []grpc.MethodDesc{ - { - MethodName: "SourceTransformFn", - Handler: _SourceTransform_SourceTransformFn_Handler, - }, { MethodName: "IsReady", Handler: _SourceTransform_IsReady_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SourceTransformFn", + Handler: _SourceTransform_SourceTransformFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "pkg/apis/proto/sourcetransform/v1/transform.proto", } diff --git a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go index fbc66555..8b347d06 100644 --- a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go +++ b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go @@ -58,21 +58,21 @@ func (mr *MockSourceTransformClientMockRecorder) IsReady(arg0, arg1 interface{}, } // SourceTransformFn mocks base method. -func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 *v1.SourceTransformRequest, arg2 ...grpc.CallOption) (*v1.SourceTransformResponse, error) { +func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.SourceTransform_SourceTransformFnClient, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { + varargs := []interface{}{arg0} + for _, a := range arg1 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "SourceTransformFn", varargs...) - ret0, _ := ret[0].(*v1.SourceTransformResponse) + ret0, _ := ret[0].(v1.SourceTransform_SourceTransformFnClient) ret1, _ := ret[1].(error) return ret0, ret1 } // SourceTransformFn indicates an expected call of SourceTransformFn. -func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) + varargs := append([]interface{}{arg0}, arg1...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SourceTransformFn", reflect.TypeOf((*MockSourceTransformClient)(nil).SourceTransformFn), varargs...) } diff --git a/pkg/sourcetransformer/examples/event_time_filter/go.mod b/pkg/sourcetransformer/examples/event_time_filter/go.mod index d8c44a3a..c91400a8 100644 --- a/pkg/sourcetransformer/examples/event_time_filter/go.mod +++ b/pkg/sourcetransformer/examples/event_time_filter/go.mod @@ -15,6 +15,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sourcetransformer/examples/event_time_filter/go.sum b/pkg/sourcetransformer/examples/event_time_filter/go.sum index 47607cf0..15b67493 100644 --- a/pkg/sourcetransformer/examples/event_time_filter/go.sum +++ b/pkg/sourcetransformer/examples/event_time_filter/go.sum @@ -15,6 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/sourcetransformer/examples/event_time_filter/impl/filter_test.go b/pkg/sourcetransformer/examples/event_time_filter/impl/filter_test.go index fb8b5b06..2ddd5cd7 100644 --- a/pkg/sourcetransformer/examples/event_time_filter/impl/filter_test.go +++ b/pkg/sourcetransformer/examples/event_time_filter/impl/filter_test.go @@ -23,6 +23,10 @@ func (d beforeYear2022Datum) Watermark() time.Time { return time.Now() } +func (d beforeYear2022Datum) Headers() map[string]string { + return map[string]string{} +} + type withinYear2022Datum struct{} func (d withinYear2022Datum) ID() string { @@ -39,6 +43,9 @@ func (d withinYear2022Datum) EventTime() time.Time { func (d withinYear2022Datum) Watermark() time.Time { return time.Now() } +func (d withinYear2022Datum) Headers() map[string]string { + return map[string]string{} +} type afterYear2022Datum struct{} @@ -52,6 +59,9 @@ func (d afterYear2022Datum) EventTime() time.Time { func (d afterYear2022Datum) Watermark() time.Time { return time.Now() } +func (d afterYear2022Datum) Headers() map[string]string { + return map[string]string{} +} func Test_FilterEventTime(t *testing.T) { testKeys := []string{"test-key"} diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 1651f825..bbd91f19 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -2,9 +2,16 @@ package sourcetransformer import ( "context" + "errors" + "fmt" + "io" "log" "runtime/debug" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -31,10 +38,12 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, return &v1.ReadyResponse{Ready: true}, nil } +var errTransformerPanic = errors.New("transformer function panicked") + // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. -func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) { +func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFnServer) error { // handle panic defer func() { if r := recover(); r != nil { @@ -42,19 +51,105 @@ func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformR fs.shutdownCh <- struct{}{} } }() - var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) - messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd) - var results []*v1.SourceTransformResponse_Result - for _, m := range messageTs.Items() { - results = append(results, &v1.SourceTransformResponse_Result{ - EventTime: timestamppb.New(m.EventTime()), - Keys: m.Keys(), - Value: m.Value(), - Tags: m.Tags(), + + req, err := stream.Recv() + if err != nil { + return fmt.Errorf("reading handshake message from stream: %w", err) + } + + if req.Handshake == nil || !req.Handshake.Sot { + return fmt.Errorf("invalid handshake message: %+v", req) + } + + handshakeResponse := &v1.SourceTransformResponse{ + Handshake: &v1.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + } + + ctx := stream.Context() + // We depend on grpContext to cancel all goroutines, since it will be automatically closed when the first function returns a non-nil error. + // This error will be caught later with grp.Wait() + grp, grpCtx := errgroup.WithContext(ctx) + + senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size + // goroutine to send the response to the stream + grp.Go(func() error { + for { + var resp *v1.SourceTransformResponse + select { + case <-grpCtx.Done(): + return grpCtx.Err() + case resp = <-senderCh: + } + if err := stream.Send(resp); err != nil { + return fmt.Errorf("failed to send response to client: %w", err) + } + } + }) + +outer: + for { + // Stop reading new messages when we are shutting down + select { + case <-grpCtx.Done(): + // If the context was cancelled while this loop is running, it will be caught and returned in one of the errgroup's goroutines. + break outer + default: + } + + d, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + + req := d.Request + grp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("Panic inside source transform handler: %v %v", r, string(debug.Stack())) + // We only listen for 1 message on the shutdown channel. If multiple requests panic, only the first one will succeed. + // The one that succeds returns the errTransformerPanic. This causes grpCtx to be cancelled. + select { + case fs.shutdownCh <- struct{}{}: + case <-grpCtx.Done(): + } + err = errTransformerPanic + } + }() + var hd = NewHandlerDatum(req.GetValue(), req.EventTime.AsTime(), req.Watermark.AsTime(), req.Headers) + messageTs := fs.Transformer.Transform(grpCtx, req.GetKeys(), hd) + var results []*v1.SourceTransformResponse_Result + for _, m := range messageTs.Items() { + results = append(results, &v1.SourceTransformResponse_Result{ + EventTime: timestamppb.New(m.EventTime()), + Keys: m.Keys(), + Value: m.Value(), + Tags: m.Tags(), + }) + } + resp := &v1.SourceTransformResponse{ + Results: results, + Id: req.GetId(), + } + select { + case senderCh <- resp: + case <-grpCtx.Done(): + return grpCtx.Err() + } + return nil }) } - responseList := &v1.SourceTransformResponse{ - Results: results, + + if err := grp.Wait(); err != nil { + statusErr := status.Errorf(codes.Internal, err.Error()) + return statusErr } - return responseList, nil + return nil } diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index d57d937e..79449c9a 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -2,28 +2,83 @@ package sourcetransformer import ( "context" - "reflect" + "errors" + "fmt" + "net" "testing" "time" - "google.golang.org/protobuf/types/known/timestamppb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" - stpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" + proto "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" + "google.golang.org/protobuf/types/known/timestamppb" ) +func newTestServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { + lis := bufconn.Listen(1024 * 1024) + t.Cleanup(func() { + _ = lis.Close() + }) + + server := grpc.NewServer() + t.Cleanup(func() { + server.Stop() + }) + + register(server) + + errChan := make(chan error, 1) + go func() { + // t.Fatal should only be called from the goroutine running the test + if err := server.Serve(lis); err != nil { + errChan <- err + } + }() + + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.NewClient("passthrough://", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { + _ = conn.Close() + }) + if err != nil { + t.Fatalf("Creating new gRPC client connection: %v", err) + } + + var grpcServerErr error + select { + case grpcServerErr = <-errChan: + case <-time.After(500 * time.Millisecond): + grpcServerErr = errors.New("gRPC server didn't start in 500ms") + } + if err != nil { + t.Fatalf("Failed to start gRPC server: %v", grpcServerErr) + } + + return conn +} + +var testTime = time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local) + func TestService_sourceTransformFn(t *testing.T) { type args struct { ctx context.Context - d *stpb.SourceTransformRequest + d *proto.SourceTransformRequest } - testTime := time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local) tests := []struct { name string handler SourceTransformer args args - want *stpb.SourceTransformResponse - wantErr bool + want *proto.SourceTransformResponse }{ { name: "sourceTransform_fn_forward_msg", @@ -33,15 +88,17 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.SourceTransformRequest{ + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Keys: []string{"client_test"}, @@ -49,7 +106,6 @@ func TestService_sourceTransformFn(t *testing.T) { }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_forward_to_all", @@ -59,22 +115,23 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.SourceTransformRequest{ + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Value: []byte(`test`), }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_drop_msg", @@ -83,42 +140,159 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + d: &proto.SourceTransformRequest{ + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Tags: []string{DROP}, - Value: []byte{}, + Value: nil, }, }, }, - wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fs := &Service{ + svc := &Service{ Transformer: tt.handler, } - // here's a trick for testing: - // because we are not using gRPC, we directly set a new incoming ctx - // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() - got, err := fs.SourceTransformFn(ctx, tt.args.d) - if (err != nil) != tt.wantErr { - t.Errorf("SourceTransformFn() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("SourceTransformFn() got = %v, want %v", got, tt.want) - } + + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + err = stream.Send(tt.args.d) + require.NoError(t, err, "Sending message over the stream") + + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + + assert.Equal(t, got.Results, tt.want.Results) }) } } + +func doHandshake(t *testing.T, stream proto.SourceTransform_SourceTransformFnClient) { + t.Helper() + handshakeReq := &proto.SourceTransformRequest{ + Handshake: &proto.Handshake{Sot: true}, + } + err := stream.Send(handshakeReq) + require.NoError(t, err, "Sending handshake request to the stream") + + handshakeResp, err := stream.Recv() + require.NoError(t, err, "Receiving handshake response") + + require.Empty(t, handshakeResp.Results, "Invalid handshake response") + require.Empty(t, handshakeResp.Id, "Invalid handshake response") + require.NotNil(t, handshakeResp.Handshake, "Invalid handshake response") + require.True(t, handshakeResp.Handshake.Sot, "Invalid handshake response") +} + +func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { + svc := &Service{ + Transformer: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { + msg := datum.Value() + return MessagesBuilder().Append(NewMessage(msg, testTime).WithKeys([]string{keys[0] + "_test"})) + }), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + const msgCount = 10 + for i := 0; i < msgCount; i++ { + msg := proto.SourceTransformRequest{ + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + } + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + + expectedResults := make([][]*proto.SourceTransformResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + expectedResults[i] = []*proto.SourceTransformResponse_Result{ + { + EventTime: timestamppb.New(testTime), + Keys: []string{"client_test"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + }, + } + } + + results := make([][]*proto.SourceTransformResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + got, err := stream.Recv() + require.NoError(t, err, "Receiving message from the stream") + results[i] = got.Results + } + require.ElementsMatch(t, results, expectedResults) +} + +func TestService_SourceTransformFn_Panic(t *testing.T) { + svc := &Service{ + Transformer: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { + panic("transformer panicked") + }), + // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. + // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. + // Since we don't listen for shutdown signal in the tests, we use buffered channel to unblock the server function. + shutdownCh: make(chan<- struct{}, 1), + } + conn := newTestServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) + + msg := proto.SourceTransformRequest{ + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte("test"), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + } + err = stream.Send(&msg) + require.NoError(t, err, "Sending message over the stream") + err = stream.CloseSend() + require.NoError(t, err, "Closing the send direction of the stream") + _, err = stream.Recv() + require.Error(t, err, "Expected error while receiving message from the stream") + gotStatus, _ := status.FromError(err) + expectedStatus := status.Convert(status.Errorf(codes.Internal, errTransformerPanic.Error())) + require.Equal(t, expectedStatus, gotStatus) +}