Skip to content

Commit

Permalink
Add streaming handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 29, 2024
1 parent 2455a57 commit 9737e35
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
60 changes: 48 additions & 12 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,19 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
if err != nil {
return tabletconn.ErrorFromGRPC(err)
}
if fields == nil {
fields = ser.Result.Fields
var result *sqltypes.Result
if options.RawMysqlPackets {
result, err = mysql.ParseResult(ser.Result, true)
if err != nil {
return err
}
} else {
if fields == nil {
fields = ser.Result.Fields
}
result = sqltypes.CustomProto3ToResult(fields, ser.Result)
}
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err := callback(result); err != nil {
if err == io.EOF {
return nil
}
Expand Down Expand Up @@ -560,10 +569,19 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que
return state, nil
}

if fields == nil {
fields = ser.Result.Fields
var result *sqltypes.Result
if options.RawMysqlPackets {
result, err = mysql.ParseResult(ser.Result, true)
if err != nil {
return state, err
}
} else {
if fields == nil {
fields = ser.Result.Fields
}
result = sqltypes.CustomProto3ToResult(fields, ser.Result)
}
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err := callback(result); err != nil {
if err == io.EOF {
return state, nil
}
Expand Down Expand Up @@ -963,10 +981,19 @@ func (conn *gRPCQueryClient) ReserveBeginStreamExecute(ctx context.Context, targ
return state, nil
}

if fields == nil {
fields = ser.Result.Fields
var result *sqltypes.Result
if options.RawMysqlPackets {
result, err = mysql.ParseResult(ser.Result, true)
if err != nil {
return state, err
}
} else {
if fields == nil {
fields = ser.Result.Fields
}
result = sqltypes.CustomProto3ToResult(fields, ser.Result)
}
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err := callback(result); err != nil {
if err == io.EOF {
return state, nil
}
Expand Down Expand Up @@ -1071,10 +1098,19 @@ func (conn *gRPCQueryClient) ReserveStreamExecute(ctx context.Context, target *q
return state, nil
}

if fields == nil {
fields = ser.Result.Fields
var result *sqltypes.Result
if options.RawMysqlPackets {
result, err = mysql.ParseResult(ser.Result, true)
if err != nil {
return state, err
}
} else {
if fields == nil {
fields = ser.Result.Fields
}
result = sqltypes.CustomProto3ToResult(fields, ser.Result)
}
if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil {
if err := callback(result); err != nil {
if err == io.EOF {
return state, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
}
if qr.CachedProto == nil { // If we're not passing on the raw MySQL packets
// Do we still need to figure out a way to do this?
// I don't think so, since when using raw packets we enforce this when
// parsing it in ReadQueryResultAsProto.
if err := qre.verifyRowCount(int64(len(qr.Rows)), maxrows); err != nil {
return nil, err
}
Expand Down

0 comments on commit 9737e35

Please sign in to comment.