Skip to content

Commit

Permalink
* Fixed error with incompleted data returen from transaction.ReadQuer…
Browse files Browse the repository at this point in the history
…yResult method

* Added option WithResponsePartLimitSizeBytes for queries with query service
  • Loading branch information
rekby committed Nov 14, 2024
1 parent 667ac49 commit 9030bbe
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 27 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Fixed error with incompleted data returen from transaction.ReadQueryResult method
* Added option WithResponsePartLimitSizeBytes for queries with query service

## v3.92.2
* Added `table/options.WithShardNodesInfo()` experimental option to get shard nodeId for describe table call

Expand Down
2 changes: 2 additions & 0 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type executeSettings interface {
CallOptions() []grpc.CallOption
RetryOpts() []retry.Option
ResourcePool() string
ResponsePartLimitSizeBytes() int64
}

type executeScriptConfig interface {
Expand Down Expand Up @@ -77,6 +78,7 @@ func executeQueryRequest(a *allocator.Allocator, sessionID, q string, cfg execut
request.StatsMode = Ydb_Query.StatsMode(cfg.StatsMode())
request.ConcurrentResultSets = false
request.PoolId = cfg.ResourcePool()
request.ResponsePartLimitBytes = cfg.ResponsePartLimitSizeBytes()

return request, cfg.CallOptions()
}
Expand Down
34 changes: 24 additions & 10 deletions internal/query/options/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ type (

// executeSettings is a holder for execute settings
executeSettings struct {
syntax Syntax
params params.Parameters
execMode ExecMode
statsMode StatsMode
resourcePool string
statsCallback func(queryStats stats.QueryStats)
callOptions []grpc.CallOption
txControl *tx.Control
retryOptions []retry.Option
syntax Syntax
params params.Parameters
execMode ExecMode
statsMode StatsMode
resourcePool string
statsCallback func(queryStats stats.QueryStats)
callOptions []grpc.CallOption
txControl *tx.Control
retryOptions []retry.Option
responsePartLimitBytes int64
}

// Execute is an interface for execute method options
Expand All @@ -58,7 +59,8 @@ type (
mode StatsMode
callback func(stats.QueryStats)
}
execModeOption = ExecMode
execModeOption = ExecMode
responsePartLimitBytes int64
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand Down Expand Up @@ -175,6 +177,10 @@ func (s *executeSettings) Params() *params.Parameters {
return &s.params
}

func (s *executeSettings) ResponsePartLimitSizeBytes() int64 {
return s.responsePartLimitBytes
}

func WithParameters(parameters *params.Parameters) parametersOption {
return parametersOption(*parameters)
}
Expand All @@ -201,6 +207,14 @@ func WithExecMode(mode ExecMode) execModeOption {
return mode
}

func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes {
return responsePartLimitBytes(size)
}

func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) {
s.responsePartLimitBytes = int64(size)
}

func WithSyntax(syntax Syntax) syntaxOption {
return syntax
}
Expand Down
4 changes: 1 addition & 3 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var (
errReadNextResultSet = xerrors.Wrap(errors.New("ydb: stop read the result set because see part of next result set"))
)
var errReadNextResultSet = xerrors.Wrap(errors.New("ydb: stop read the result set because see part of next result set"))

var (
_ result.Result = (*streamResult)(nil)
Expand Down
6 changes: 6 additions & 0 deletions query/execute_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) options.Execute
return options.WithStatsMode(mode, callback)
}

// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse
// it isn't limit total size of answer
func WithResponsePartLimitSizeBytes(size int64) options.Execute {
return options.WithResponsePartLimitSizeBytes(size)
}

func WithCallOptions(opts ...grpc.CallOption) options.Execute {
return options.WithCallOptions(opts...)
}
Expand Down
91 changes: 91 additions & 0 deletions tests/integration/query_execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,94 @@ func TestIssue1456TooManyUnknownTransactions(t *testing.T) {
wg.Wait()
})
}

func TestQueryHelperQueryResultSet(t *testing.T) {
t.Run("FailOnSecondResultSet", func(t *testing.T) {
scope := newScope(t)

var secondRowError error
err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
rs, err := tx.QueryResultSet(ctx, "SELECT 1; SELECT 2")
if err != nil {
return err
}

_, err = rs.NextRow(ctx)
if err != nil {
return err
}

_, secondRowError = rs.NextRow(ctx)

return nil
})
require.NoError(t, err)
require.Error(t, secondRowError)
require.NotErrorIs(t, secondRowError, io.EOF)
})
}

func TestQueryPartLimiter(t *testing.T) {
scope := newScope(t)

var readPartCount int
scope.Driver(ydb.WithTraceQuery(trace.Query{
OnResultNextPart: func(info trace.QueryResultNextPartStartInfo) func(info trace.QueryResultNextPartDoneInfo) {
return func(info trace.QueryResultNextPartDoneInfo) {
if info.Error == nil {
readPartCount++
}
}
},
}))

targetCount := 1000
items := make([]types.Value, 0, targetCount)
for i := 0; i < targetCount; i++ {
item := types.StructValue(
types.StructFieldValue("val", types.Int64Value(int64(i))),
)
items = append(items, item)
}

getPartCount := func(partSize int64) int {
partCount := 0
err := scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
oldParts := readPartCount
rs, err := tx.QueryResultSet(ctx, `
DECLARE $arg AS List<Struct<val: Int64>>;
SELECT * FROM AS_TABLE($arg);
`,
query.WithParameters(ydb.ParamsBuilder().Param("$arg").Any(types.ListValue(items...)).Build()),
query.WithResponsePartLimitSizeBytes(partSize),
)
if err != nil {
return err
}

rowCount := 0
for {
_, err = rs.NextRow(scope.Ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
rowCount++
}
require.Equal(t, targetCount, rowCount)

partCount = readPartCount - oldParts
return nil
})

require.NoError(t, err)
return partCount
}

partsWithBigSize := getPartCount(1000000)
partsWithLittleSize := getPartCount(100)

require.Equal(t, 1, partsWithBigSize)
require.Greater(t, partsWithLittleSize, 1)
}
45 changes: 31 additions & 14 deletions tests/integration/query_regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package integration

import (
"context"
"errors"
"fmt"
"io"
"os"
"slices"
"strconv"
"strings"
"testing"
Expand All @@ -16,11 +17,13 @@ import (
"github.com/stretchr/testify/require"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func TestUUIDSerializationQueryServiceIssue1501(t *testing.T) {
Expand Down Expand Up @@ -311,9 +314,19 @@ SELECT CAST($arg1 AS Utf8) AS v1, CAST($arg2 AS Utf8) AS v2
func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) {
scope := newScope(t)

var readPartCount int
scope.Driver(ydb.WithTraceQuery(trace.Query{
OnResultNextPart: func(info trace.QueryResultNextPartStartInfo) func(info trace.QueryResultNextPartDoneInfo) {
return func(info trace.QueryResultNextPartDoneInfo) {
if info.Error == nil {
readPartCount++
}
}
},
}))

// prepare data
const targetCount = 100000 // must be more then returned in one part
const batchSize = 20000
const targetCount = 1000 // must be more then returned in one part
items := make([]types.Value, 0, targetCount)
for i := 0; i < targetCount; i++ {
item := types.StructValue(
Expand All @@ -323,14 +336,10 @@ func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) {
items = append(items, item)
}

t.Log("inserting items to a table")
batches := slices.Chunk(items, batchSize)
for batch := range batches {
err := scope.Driver().Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error {
return s.BulkUpsert(ctx, scope.TablePath(), types.ListValue(batch...))
})
require.NoError(t, err)
}
err := scope.Driver().Table().Do(scope.Ctx, func(ctx context.Context, s table.Session) error {
return s.BulkUpsert(ctx, scope.TablePath(), types.ListValue(items...))
})
require.NoError(t, err)

q := fmt.Sprintf("SELECT COUNT(*) FROM `%s`", scope.TablePath())

Expand All @@ -347,21 +356,29 @@ func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) {
var rows []query.Row

// reproduce the problem
var partReaded int
scope.Driver().Query().DoTx(scope.Ctx, func(ctx context.Context, tx query.TxActor) error {
rs, err := tx.QueryResultSet(scope.Ctx, q)
oldCOunt := readPartCount
rs, err := tx.QueryResultSet(scope.Ctx, q, options.WithResponsePartLimitSizeBytes(100))
if err != nil {
return err
}

rows = make([]query.Row, 0, targetCount)
for row, err := range rs.Rows(scope.Ctx) {
for {
row, err := rs.NextRow(ctx)
if errors.Is(err, io.EOF) {
break
}

require.NoError(t, err)
rows = append(rows, row)
}

partReaded = readPartCount - oldCOunt
return nil
})
require.NoError(t, err)

require.Equal(t, targetCount, len(rows))
require.Greater(t, partReaded, 1)
}

0 comments on commit 9030bbe

Please sign in to comment.