Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Query service implementations for database/sql interfaces #1552

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d242af3
fix(sql): tmp queryservice default option
vladDotH Nov 10, 2024
ef54737
feat(internal/query/xsql): conn boilerplate
vladDotH Oct 30, 2024
152fd71
feat(internal/query/conn): driver initial implemetations
vladDotH Nov 10, 2024
6ee564b
feat(internal/query/conn): normalize & execContext fns
vladDotH Nov 10, 2024
76921eb
Merge remote-tracking branch 'ydb/master'
vladDotH Nov 10, 2024
0732759
feat(internal/query/xsql): tx options transform
vladDotH Nov 10, 2024
4ca5144
feat(internal/query/xsql): tx initials
vladDotH Nov 10, 2024
2678662
feat(intenral/query/xsql): rows impls
vladDotH Nov 13, 2024
e69f310
Merge branch 'ydb-platform:master' into master
vladDotH Nov 13, 2024
daa6a68
Merge branch 'ydb-platform:master' into master
vladDotH Nov 15, 2024
ff26c0b
feat(internal/query/conn): stmts
vladDotH Nov 15, 2024
29d44eb
feat(internal/query/conn/tx): ExecContext, PrepareContext impls
vladDotH Nov 15, 2024
2d20df5
feat(internal/query/conn): PrepareContext impl
vladDotH Nov 15, 2024
e750be4
fix(sql): xsql renamed
vladDotH Nov 17, 2024
f756a76
fix(internal/query/conn): currentTx interface
vladDotH Nov 17, 2024
c1d27ac
fix(internal/query): xsql rm
vladDotH Nov 17, 2024
ed723c4
feat(internal/query/conn): QueryContext impl
vladDotH Nov 17, 2024
10c3bd4
chore(internal/query/conn): linted
vladDotH Nov 17, 2024
28cd198
feat(internal/query/conn): remained impls
vladDotH Nov 17, 2024
fc89935
chore(internal/query/conn): linted
vladDotH Nov 17, 2024
691008c
Merge remote-tracking branch 'origin' into fork/vladDotH/master
asmyasnikov Nov 19, 2024
f479b3e
added YDB_DATABASE_SQL_OVER_QUERY_SERVICE env for run database/sql ov…
asmyasnikov Nov 19, 2024
1370a72
chore(internal/query/conn/rows): linted
vladDotH Nov 19, 2024
f976dd6
fix(internal/query/conn/tx): nil txc rm
vladDotH Nov 19, 2024
6ad28a9
fix(internal/query/conn/rows/Next): correct dst pointers array
vladDotH Nov 19, 2024
380baac
fix(internal/query/conn/rows): nextResultSets check & columns cache
vladDotH Nov 19, 2024
a0cc5a0
refactor(internal/table/context): public QueryModeFromContext
vladDotH Nov 19, 2024
b292289
feat(internal/query/conn): explain mode support
vladDotH Nov 19, 2024
8e5eaab
fix(internal/query/conn/rows): pure eof returning & NextResultSet fix
vladDotH Nov 19, 2024
9f78824
chore(internal/query/conn): linted
vladDotH Nov 19, 2024
a9f83c1
added YDB_DATABASE_SQL_OVER_QUERY_SERVICE env to experimental tests
asmyasnikov Nov 19, 2024
cc37fad
feat(internal/value): driver.Value force cast
vladDotH Nov 20, 2024
c7e7a04
fix(internal/query): resultSetIndex overflow check
vladDotH Nov 20, 2024
f4a2325
Apply suggestions from code review
asmyasnikov Nov 20, 2024
dbac1d7
fix(internal/value): cast error joining
vladDotH Nov 20, 2024
2f1071a
fix(internal/value/castTo): inner castTo calling revert
vladDotH Nov 20, 2024
410e804
fix(internal/value/castTo): Value.CastTo error wrapping
vladDotH Nov 20, 2024
dc27c84
refactor: QueryModes & querymodes funtions transfered to xcontext
vladDotH Nov 23, 2024
2ffc490
chore(internal/table/conn): linted
vladDotH Nov 23, 2024
b7e83f1
fix(internal/query/conn): badconn rm
vladDotH Nov 23, 2024
a948510
refactor(internal/value): force ptr value set in missing type casts
vladDotH Nov 24, 2024
60f2b54
fix(internal/query/conn): yql type & castResult
vladDotH Nov 24, 2024
75c7061
feat(interna/value/tupleValue.castTo): driver.Value switch
vladDotH Nov 24, 2024
5c47229
feat(internal/query/conn/rows): discarded columns support
vladDotH Nov 24, 2024
756153b
ci(database_sql_with_tx_control_test): temporary query service suppor…
vladDotH Nov 24, 2024
d974d4c
Merge branch 'ydb-platform:master' into master
vladDotH Nov 24, 2024
9f8f706
fix(internal/table/conn): query modes definitions
vladDotH Nov 24, 2024
85da81d
chore(internal/query/conn): linted
vladDotH Nov 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ jobs:
YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local
YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem
YDB_SESSIONS_SHUTDOWN_URLS: http://localhost:8765/actors/kqp_proxy?force_shutdown=all
YDB_DATABASE_SQL_OVER_QUERY_SERVICE: 1
HIDE_APPLICATION_OUTPUT: 1
steps:
- name: Checkout code
Expand Down
7 changes: 4 additions & 3 deletions dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/connector"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
tableSql "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

Expand Down Expand Up @@ -60,21 +61,21 @@ func parseConnectionString(dataSourceName string) (opts []Option, _ error) {
opts = append(opts, WithBalancer(balancers.FromConfig(balancer)))
}
if queryMode := info.Params.Get("go_query_mode"); queryMode != "" {
mode := tableSql.QueryModeFromString(queryMode)
mode := xcontext.QueryModeFromString(queryMode)
if mode == tableSql.UnknownQueryMode {
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
}
opts = append(opts, withConnectorOptions(connector.WithDefaultQueryMode(mode)))
} else if queryMode := info.Params.Get("query_mode"); queryMode != "" {
mode := tableSql.QueryModeFromString(queryMode)
mode := xcontext.QueryModeFromString(queryMode)
if mode == tableSql.UnknownQueryMode {
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
}
opts = append(opts, withConnectorOptions(connector.WithDefaultQueryMode(mode)))
}
if fakeTx := info.Params.Get("go_fake_tx"); fakeTx != "" {
for _, queryMode := range strings.Split(fakeTx, ",") {
mode := tableSql.QueryModeFromString(queryMode)
mode := xcontext.QueryModeFromString(queryMode)
if mode == tableSql.UnknownQueryMode {
return nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
}
Expand Down
13 changes: 10 additions & 3 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql/driver"
"io"
"os"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -184,9 +185,15 @@ func (c *Connector) Close() error {

func Open(parent ydbDriver, balancer grpc.ClientConnInterface, opts ...Option) (_ *Connector, err error) {
c := &Connector{
parent: parent,
balancer: balancer,
queryProcessor: TABLE_SERVICE,
parent: parent,
balancer: balancer,
queryProcessor: func() queryProcessor {
if v, has := os.LookupEnv("YDB_DATABASE_SQL_OVER_QUERY_SERVICE"); has && v != "" {
return QUERY_SERVICE
}

return TABLE_SERVICE
}(),
clock: clockwork.NewRealClock(),
done: make(chan struct{}),
trace: &trace.DatabaseSQL{},
Expand Down
250 changes: 174 additions & 76 deletions internal/query/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,230 @@ package conn

import (
"context"
"database/sql"
"database/sql/driver"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/bind"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/conn/badconn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

type (
Parent interface {
Query() *query.Client
Trace() *trace.DatabaseSQL
TraceRetry() *trace.Retry
RetryBudget() budget.Budget
Bindings() bind.Bindings
Clock() clockwork.Clock
}
currentTx interface {
Rollback() error
}
Conn struct {
ctx context.Context //nolint:containedctx
parent Parent
session *query.Session
onClose []func()
closed atomic.Bool
currentTx
}
)
type resultNoRows struct{}

func (c *Conn) ID() string {
return c.session.ID()
}
func (resultNoRows) LastInsertId() (int64, error) { return 0, ErrUnsupported }
func (resultNoRows) RowsAffected() (int64, error) { return 0, ErrUnsupported }

func (c *Conn) IsValid() bool {
panic("implement me")
}
var _ driver.Result = resultNoRows{}

func (c *Conn) CheckNamedValue(value *driver.NamedValue) error {
panic("implement me")
type Parent interface {
Query() *query.Client
Trace() *trace.DatabaseSQL
TraceRetry() *trace.Retry
RetryBudget() budget.Budget
Bindings() bind.Bindings
Clock() clockwork.Clock
}

func (c *Conn) Ping(ctx context.Context) error {
panic("implement me")
type currentTx interface {
tx.Identifier
driver.Tx
driver.ExecerContext
driver.QueryerContext
driver.ConnPrepareContext
Rollback() error
}

func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
panic("implement me")
type Conn struct {
currentTx
ctx context.Context //nolint:containedctx
parent Parent
session *query.Session
onClose []func()
closed atomic.Bool
lastUsage atomic.Int64
}

func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
panic("implement me")
}
func New(ctx context.Context, parent Parent, s *query.Session, opts ...Option) *Conn {
cc := &Conn{
ctx: ctx,
parent: parent,
session: s,
}

for _, opt := range opts {
if opt != nil {
opt(cc)
}
}

func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
panic("implement me")
return cc
}

func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
panic("implement me")
func (c *Conn) isReady() bool {
return c.session.Status() == session.StatusIdle.String()
}

func (c *Conn) Prepare(query string) (driver.Stmt, error) {
panic("implement me")
func (c *Conn) normalize(q string, args ...driver.NamedValue) (query string, _ params.Parameters, _ error) {
queryArgs := make([]any, len(args))
for i := range args {
queryArgs[i] = args[i]
}

return c.parent.Bindings().RewriteQuery(q, queryArgs...)
}

func (c *Conn) Close() (finalErr error) {
if !c.closed.CompareAndSwap(false, true) {
return badconn.Map(xerrors.WithStackTrace(errConnClosedEarly))
func (c *Conn) beginTx(ctx context.Context, txOptions driver.TxOptions) (tx currentTx, finalErr error) {
onDone := trace.DatabaseSQLOnConnBegin(c.parent.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/conn.(*Conn).beginTx"),
)
defer func() {
onDone(tx, finalErr)
}()

if c.currentTx != nil {
return nil, xerrors.WithStackTrace(xerrors.AlreadyHasTx(c.currentTx.ID()))
}

tx, err := beginTx(ctx, c, txOptions)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

c.currentTx = tx

return tx, nil
}

func (c *Conn) execContext(
ctx context.Context,
query string,
args []driver.NamedValue,
) (_ driver.Result, finalErr error) {
defer func() {
for _, onClose := range c.onClose {
onClose()
}
c.lastUsage.Store(c.parent.Clock().Now().Unix())
}()

var (
ctx = c.ctx
onDone = trace.DatabaseSQLOnConnClose(
c.parent.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/conn.(*Conn).Close"),
)
if !c.isReady() {
return nil, xerrors.WithStackTrace(errNotReadyConn)
}

if c.currentTx != nil {
return c.currentTx.ExecContext(ctx, query, args)
}

onDone := trace.DatabaseSQLOnConnExec(c.parent.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/conn.(*Conn).execContext"),
query, xcontext.UnknownQueryMode.String(), xcontext.IsIdempotent(ctx), c.parent.Clock().Since(c.LastUsage()),
)
defer func() {
onDone(finalErr)
}()
if c.currentTx != nil {
_ = c.currentTx.Rollback()

normalizedQuery, params, err := c.normalize(query, args...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
err := c.session.Close(xcontext.ValueOnly(ctx))

err = c.session.Exec(ctx, normalizedQuery, options.WithParameters(&params))
if err != nil {
return badconn.Map(xerrors.WithStackTrace(err))
return nil, xerrors.WithStackTrace(err)
}

return nil
return resultNoRows{}, nil
}

func (c *Conn) Begin() (driver.Tx, error) {
panic("implement me")
}
func (c *Conn) queryContext(ctx context.Context, queryString string, args []driver.NamedValue) (
_ driver.Rows, finalErr error,
) {
defer func() {
c.lastUsage.Store(c.parent.Clock().Now().Unix())
}()

if !c.isReady() {
return nil, xerrors.WithStackTrace(errNotReadyConn)
}

if c.currentTx != nil {
return c.currentTx.QueryContext(ctx, queryString, args)
}

onDone := trace.DatabaseSQLOnConnQuery(c.parent.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/conn.(*Conn).queryContext"),
queryString, xcontext.UnknownQueryMode.String(), xcontext.IsIdempotent(ctx), c.parent.Clock().Since(c.LastUsage()),
)

defer func() {
onDone(finalErr)
}()

normalizedQuery, parameters, err := c.normalize(queryString, args...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

func (c *Conn) LastUsage() time.Time {
panic("implement me")
queryMode := xcontext.QueryModeFromContext(ctx, xcontext.UnknownQueryMode)

if queryMode == xcontext.ExplainQueryMode {
return c.queryContextExplain(ctx, normalizedQuery, parameters)
}

return c.queryContextOther(ctx, normalizedQuery, parameters)
}

func New(ctx context.Context, parent Parent, s *query.Session, opts ...Option) *Conn {
cc := &Conn{
ctx: ctx,
parent: parent,
session: s,
func (c *Conn) queryContextOther(
ctx context.Context,
queryString string,
parameters params.Parameters,
) (driver.Rows, error) {
res, err := c.session.Query(
ctx, queryString,
options.WithParameters(&parameters),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

for _, opt := range opts {
if opt != nil {
opt(cc)
}
return &rows{
conn: c,
result: res,
}, nil
}

func (c *Conn) queryContextExplain(
ctx context.Context,
queryString string,
parameters params.Parameters,
) (driver.Rows, error) {
var ast, plan string
_, err := c.session.Query(
ctx, queryString,
options.WithParameters(&parameters),
options.WithExecMode(options.ExecModeExplain),
options.WithStatsMode(options.StatsModeNone, func(stats stats.QueryStats) {
ast = stats.QueryAST()
plan = stats.QueryPlan()
}),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return cc
return &single{
values: []sql.NamedArg{
sql.Named("AST", ast),
sql.Named("Plan", plan),
},
}, nil
}
Loading
Loading