Skip to content

Commit

Permalink
* Added table.options.WithIgnoreTruncated option for `session.Execu…
Browse files Browse the repository at this point in the history
…te` method

* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`
  • Loading branch information
asmyasnikov committed Aug 2, 2023
1 parent 66a967f commit 5c10c95
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Added `table.options.WithIgnoreTruncated` option for `session.Execute` method
* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`

## v3.49.0
* Added `table.Session.ReadRows` method for getting rows by keys
* Added `table/options.ChangefeedFormatDynamoDBStreamsJSON` format of `DynamoDB` change feeds
Expand Down
3 changes: 2 additions & 1 deletion internal/table/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
Expand Down Expand Up @@ -224,7 +225,7 @@ func (s *scanner) Err() error {
}
if !s.ignoreTruncated && s.truncated() {
err := xerrors.Wrap(
fmt.Errorf("truncated result (more than %d rows)", len(s.set.GetRows())),
fmt.Errorf("more than %d rows: %w", len(s.set.GetRows()), result.ErrTruncated),
)
if s.markTruncatedAsRetryable {
err = xerrors.Retryable(err)
Expand Down
21 changes: 13 additions & 8 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,12 @@ func (s *session) Execute(
txr table.Transaction, r result.Result, err error,
) {
var (
a = allocator.New()
q = queryFromText(query)
request = a.TableExecuteDataQueryRequest()
a = allocator.New()
q = queryFromText(query)
request = options.ExecuteDataQueryDesc{

Check failure on line 655 in internal/table/session.go

View workflow job for this annotation

GitHub Actions / golangci-lint

composites: github.com/ydb-platform/ydb-go-sdk/v3/table/options.ExecuteDataQueryDesc struct literal uses unkeyed fields (govet)
a.TableExecuteDataQueryRequest(),
s.config.IgnoreTruncated(),
}
callOptions []grpc.CallOption
)
defer a.Free()
Expand All @@ -671,7 +674,7 @@ func (s *session) Execute(

for _, opt := range opts {
if opt != nil {
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
}
}

Expand All @@ -683,18 +686,20 @@ func (s *session) Execute(
onDone(txr, false, r, err)
}()

result, err := s.executeDataQuery(ctx, a, request, callOptions...)
result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}

return s.executeQueryResult(result, request.TxControl)
return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated)
}

// executeQueryResult returns Transaction and result built from received
// result.
func (s *session) executeQueryResult(
res *Ydb_Table.ExecuteQueryResult, txControl *Ydb_Table.TransactionControl,
res *Ydb_Table.ExecuteQueryResult,
txControl *Ydb_Table.TransactionControl,
ignoreTruncated bool,
) (
table.Transaction, result.Result, error,
) {
Expand All @@ -711,7 +716,7 @@ func (s *session) executeQueryResult(
return tx, scanner.NewUnary(
res.GetResultSets(),
res.GetQueryStats(),
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
scanner.WithIgnoreTruncated(ignoreTruncated),
), nil
}

Expand Down
17 changes: 10 additions & 7 deletions internal/table/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ func (s *statement) Execute(
txr table.Transaction, r result.Result, err error,
) {
var (
a = allocator.New()
request = a.TableExecuteDataQueryRequest()
a = allocator.New()
request = options.ExecuteDataQueryDesc{

Check failure on line 35 in internal/table/statement.go

View workflow job for this annotation

GitHub Actions / golangci-lint

composites: github.com/ydb-platform/ydb-go-sdk/v3/table/options.ExecuteDataQueryDesc struct literal uses unkeyed fields (govet)
a.TableExecuteDataQueryRequest(),
s.session.config.IgnoreTruncated(),
}
callOptions []grpc.CallOption
)
defer a.Free()
Expand All @@ -51,7 +54,7 @@ func (s *statement) Execute(

for _, opt := range opts {
if opt != nil {
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
}
}

Expand All @@ -63,22 +66,22 @@ func (s *statement) Execute(
onDone(txr, true, r, err)
}()

return s.execute(ctx, a, request, request.TxControl, callOptions...)
return s.execute(ctx, a, &request, request.TxControl, callOptions...)
}

// execute executes prepared query without any tracing.
func (s *statement) execute(
ctx context.Context, a *allocator.Allocator,
request *Ydb_Table.ExecuteDataQueryRequest, txControl *Ydb_Table.TransactionControl,
request *options.ExecuteDataQueryDesc, txControl *Ydb_Table.TransactionControl,
callOptions ...grpc.CallOption,
) (
txr table.Transaction, r result.Result, err error,
) {
res, err := s.session.executeDataQuery(ctx, a, request, callOptions...)
res, err := s.session.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}
return s.session.executeQueryResult(res, txControl)
return s.session.executeQueryResult(res, txControl, request.IgnoreTruncated)
}

func (s *statement) NumInput() int {
Expand Down
14 changes: 13 additions & 1 deletion table/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,11 @@ type (
)

type (
ExecuteDataQueryDesc Ydb_Table.ExecuteDataQueryRequest
ExecuteDataQueryDesc struct {
*Ydb_Table.ExecuteDataQueryRequest

IgnoreTruncated bool
}
ExecuteDataQueryOption interface {
ApplyExecuteDataQueryOption(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption
}
Expand Down Expand Up @@ -856,6 +860,14 @@ func WithCommit() ExecuteDataQueryOption {
})
}

// WithIgnoreTruncated mark truncated result as good (without error)
func WithIgnoreTruncated() ExecuteDataQueryOption {
return executeDataQueryOptionFunc(func(desc *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption {
desc.IgnoreTruncated = true
return nil
})
}

// WithQueryCachePolicyKeepInCache manages keep-in-cache policy
//
// Deprecated: data queries always executes with enabled keep-in-cache policy.
Expand Down
7 changes: 7 additions & 0 deletions table/result/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package result

import (
"errors"
)

var ErrTruncated = errors.New("truncated result")
122 changes: 122 additions & 0 deletions tests/integration/table_truncated_err_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//go:build integration
// +build integration

package integration

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)

// https://github.com/ydb-platform/ydb-go-sdk/issues/798
func TestIssue798TruncatedError(t *testing.T) {
const rowsLimit = 1000
var (
scope = newScope(t)
driver = scope.Driver()
tablePath = scope.TablePath()
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// upsert rows
{
rows := make([]types.Value, rowsLimit)
for i := range rows {
rows[i] = types.StructValue(
types.StructFieldValue("id", types.Int64Value(int64(i))),
types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))),
)
}
err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.BulkUpsert(ctx, tablePath, types.ListValue(rows...))
}, table.WithIdempotent())
scope.Require.NoError(err)
}

// select rows without truncated error
{
err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, results, err := s.Execute(ctx,
table.DefaultTxControl(),
fmt.Sprintf("SELECT * FROM `%s`;", tablePath),
nil,
)
if err != nil {
return err
}
if err = results.NextResultSetErr(ctx); err != nil {
return fmt.Errorf("no result sets: %w", err)
}
if results.CurrentResultSet().RowCount() != rowsLimit {
return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount())
}
return results.Err()
}, table.WithIdempotent())
scope.Require.NoError(err)
}

// upsert 1 row for get 1001 rows and truncated error
{
err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
return s.BulkUpsert(ctx, tablePath, types.ListValue(types.StructValue(
types.StructFieldValue("id", types.Int64Value(rowsLimit)),
types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))),
)))
}, table.WithIdempotent())
scope.Require.NoError(err)
}

// select all rows with truncated error
{
err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, results, err := s.Execute(ctx,
table.DefaultTxControl(),
fmt.Sprintf("SELECT * FROM `%s`;", tablePath),
nil,
)
if err != nil {
return err
}
if err = results.NextResultSetErr(ctx); err != nil {
return fmt.Errorf("no result sets: %w", err)
}
if results.CurrentResultSet().RowCount() != rowsLimit {
return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount())
}
return results.Err() // expected
}, table.WithIdempotent())
scope.Require.ErrorIs(err, result.ErrTruncated)
}

// select all rows with truncated error
{
err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, results, err := s.Execute(ctx,
table.DefaultTxControl(),
fmt.Sprintf("SELECT * FROM `%s`;", tablePath),
nil,
options.WithIgnoreTruncated(),
)
if err != nil {
return err
}
if err = results.NextResultSetErr(ctx); err != nil {
return fmt.Errorf("no result sets: %w", err)
}
if results.CurrentResultSet().RowCount() != rowsLimit {
return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount())
}
return results.Err() // expected
}, table.WithIdempotent())
scope.Require.NoError(err)
}
}

0 comments on commit 5c10c95

Please sign in to comment.