Skip to content

Commit

Permalink
Merge pull request #801 from ydb-platform/truncated
Browse files Browse the repository at this point in the history
`table.options.WithIgnoreTruncated` + `table.result.ErrTruncated`
  • Loading branch information
asmyasnikov authored Aug 3, 2023
2 parents 66a967f + 4ca1e2c commit d2be4cd
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Added `table.options.WithIgnoreTruncated` option for `session.Execute` method
* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`

## v3.49.0
* Added `table.Session.ReadRows` method for getting rows by keys
* Added `table/options.ChangefeedFormatDynamoDBStreamsJSON` format of `DynamoDB` change feeds
Expand Down
3 changes: 2 additions & 1 deletion internal/table/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
Expand Down Expand Up @@ -224,7 +225,7 @@ func (s *scanner) Err() error {
}
if !s.ignoreTruncated && s.truncated() {
err := xerrors.Wrap(
fmt.Errorf("truncated result (more than %d rows)", len(s.set.GetRows())),
fmt.Errorf("more than %d rows: %w", len(s.set.GetRows()), result.ErrTruncated),
)
if s.markTruncatedAsRetryable {
err = xerrors.Retryable(err)
Expand Down
21 changes: 13 additions & 8 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,12 @@ func (s *session) Execute(
txr table.Transaction, r result.Result, err error,
) {
var (
a = allocator.New()
q = queryFromText(query)
request = a.TableExecuteDataQueryRequest()
a = allocator.New()
q = queryFromText(query)
request = options.ExecuteDataQueryDesc{
ExecuteDataQueryRequest: a.TableExecuteDataQueryRequest(),
IgnoreTruncated: s.config.IgnoreTruncated(),
}
callOptions []grpc.CallOption
)
defer a.Free()
Expand All @@ -671,7 +674,7 @@ func (s *session) Execute(

for _, opt := range opts {
if opt != nil {
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
}
}

Expand All @@ -683,18 +686,20 @@ func (s *session) Execute(
onDone(txr, false, r, err)
}()

result, err := s.executeDataQuery(ctx, a, request, callOptions...)
result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}

return s.executeQueryResult(result, request.TxControl)
return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated)
}

// executeQueryResult returns Transaction and result built from received
// result.
func (s *session) executeQueryResult(
res *Ydb_Table.ExecuteQueryResult, txControl *Ydb_Table.TransactionControl,
res *Ydb_Table.ExecuteQueryResult,
txControl *Ydb_Table.TransactionControl,
ignoreTruncated bool,
) (
table.Transaction, result.Result, error,
) {
Expand All @@ -711,7 +716,7 @@ func (s *session) executeQueryResult(
return tx, scanner.NewUnary(
res.GetResultSets(),
res.GetQueryStats(),
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
scanner.WithIgnoreTruncated(ignoreTruncated),
), nil
}

Expand Down
17 changes: 10 additions & 7 deletions internal/table/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ func (s *statement) Execute(
txr table.Transaction, r result.Result, err error,
) {
var (
a = allocator.New()
request = a.TableExecuteDataQueryRequest()
a = allocator.New()
request = options.ExecuteDataQueryDesc{
ExecuteDataQueryRequest: a.TableExecuteDataQueryRequest(),
IgnoreTruncated: s.session.config.IgnoreTruncated(),
}
callOptions []grpc.CallOption
)
defer a.Free()
Expand All @@ -51,7 +54,7 @@ func (s *statement) Execute(

for _, opt := range opts {
if opt != nil {
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption((*options.ExecuteDataQueryDesc)(request), a)...)
callOptions = append(callOptions, opt.ApplyExecuteDataQueryOption(&request, a)...)
}
}

Expand All @@ -63,22 +66,22 @@ func (s *statement) Execute(
onDone(txr, true, r, err)
}()

return s.execute(ctx, a, request, request.TxControl, callOptions...)
return s.execute(ctx, a, &request, request.TxControl, callOptions...)
}

// execute executes prepared query without any tracing.
func (s *statement) execute(
ctx context.Context, a *allocator.Allocator,
request *Ydb_Table.ExecuteDataQueryRequest, txControl *Ydb_Table.TransactionControl,
request *options.ExecuteDataQueryDesc, txControl *Ydb_Table.TransactionControl,
callOptions ...grpc.CallOption,
) (
txr table.Transaction, r result.Result, err error,
) {
res, err := s.session.executeDataQuery(ctx, a, request, callOptions...)
res, err := s.session.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}
return s.session.executeQueryResult(res, txControl)
return s.session.executeQueryResult(res, txControl, request.IgnoreTruncated)
}

func (s *statement) NumInput() int {
Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ type ydbDriver interface {

type nopPathNormalizer struct{}

func (nopPathNormalizer) NormalizePath(folderOrTable string) string {
func (nopPathNormalizer) NormalizePath(_ string) string {
return tablePathPrefixTransformer
}

Expand Down
14 changes: 13 additions & 1 deletion table/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,11 @@ type (
)

type (
ExecuteDataQueryDesc Ydb_Table.ExecuteDataQueryRequest
ExecuteDataQueryDesc struct {
*Ydb_Table.ExecuteDataQueryRequest

IgnoreTruncated bool
}
ExecuteDataQueryOption interface {
ApplyExecuteDataQueryOption(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption
}
Expand Down Expand Up @@ -856,6 +860,14 @@ func WithCommit() ExecuteDataQueryOption {
})
}

// WithIgnoreTruncated mark truncated result as good (without error)
func WithIgnoreTruncated() ExecuteDataQueryOption {
return executeDataQueryOptionFunc(func(desc *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption {
desc.IgnoreTruncated = true
return nil
})
}

// WithQueryCachePolicyKeepInCache manages keep-in-cache policy
//
// Deprecated: data queries always executes with enabled keep-in-cache policy.
Expand Down
7 changes: 7 additions & 0 deletions table/result/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package result

import (
"errors"
)

var ErrTruncated = errors.New("truncated result")
12 changes: 8 additions & 4 deletions tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,11 @@ func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver {
}).(*ydb.Driver)
}

func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
func (scope *scopeT) SQLDriver(opts ...ydb.ConnectorOption) *sql.DB {
return scope.Cache(nil, nil, func() (res interface{}, err error) {
driver := scope.Driver()
scope.Logf("Create sql db connector")
connector, err := ydb.Connector(driver,
append([]ydb.ConnectorOption{ydb.WithTablePathPrefix(scope.Folder())}, opts...)...,
)
connector, err := ydb.Connector(driver, opts...)
if err != nil {
return nil, err
}
Expand All @@ -124,6 +122,12 @@ func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
}).(*sql.DB)
}

func (scope *scopeT) SQLDriverWithFolder(opts ...ydb.ConnectorOption) *sql.DB {
return scope.SQLDriver(
append([]ydb.ConnectorOption{ydb.WithTablePathPrefix(scope.Folder())}, opts...)...,
)
}

func (scope *scopeT) Folder() string {
return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) {
driver := scope.Driver()
Expand Down
Loading

0 comments on commit d2be4cd

Please sign in to comment.