Skip to content

Commit

Permalink
Allow to set preferred node id to execute query
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Nov 7, 2024
1 parent 4d80170 commit d43de94
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Allow to set preferred node id to execute query

## v3.90.1
* Small broken change: added method `ID()` into `spans.Span` interface (need to implement in adapter)
* Fixed traceparent header for tracing grpc requests
Expand Down
5 changes: 5 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ func WithOperationTimeout(ctx context.Context, operationTimeout time.Duration) c
func WithOperationCancelAfter(ctx context.Context, operationCancelAfter time.Duration) context.Context {
return operation.WithCancelAfter(ctx, operationCancelAfter)
}

// WithPreferredNodeID allows to set preferred node to get session from
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
return operation.WithPreferredNodeID(ctx, nodeID)
}
12 changes: 12 additions & 0 deletions internal/conn/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"google.golang.org/grpc"

balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
)

var _ grpc.ClientConnInterface = (*middleware)(nil)
Expand All @@ -30,6 +32,16 @@ func (m *middleware) NewStream(
return m.newStream(ctx, desc, method, opts...)
}

func ModifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface {
if nodeID != 0 {
return WithContextModifier(cc, func(ctx context.Context) context.Context {
return balancerContext.WithNodeID(ctx, nodeID)
})
} else {
return cc
}
}

func WithContextModifier(
cc grpc.ClientConnInterface,
modifyCtx func(ctx context.Context) context.Context,
Expand Down
13 changes: 13 additions & 0 deletions internal/operation/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
type (
ctxOperationTimeoutKey struct{}
ctxOperationCancelAfterKey struct{}
ctxWithPreferredNodeIDKey struct{}
)

// WithTimeout returns a copy of parent context in which YDB operation timeout
Expand All @@ -33,6 +34,10 @@ func WithCancelAfter(ctx context.Context, operationCancelAfter time.Duration) co
return context.WithValue(ctx, ctxOperationCancelAfterKey{}, operationCancelAfter)
}

func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
return context.WithValue(ctx, ctxWithPreferredNodeIDKey{}, nodeID)
}

// ctxTimeout returns the timeout within given context after which
// YDB should try to cancel operation and return result regardless of the cancelation.
func ctxTimeout(ctx context.Context) (d time.Duration, ok bool) {
Expand All @@ -57,3 +62,11 @@ func ctxUntilDeadline(ctx context.Context) (time.Duration, bool) {

return 0, false
}

func CtxPreferredNodeID(ctx context.Context) uint32 {
x := ctx.Value(ctxWithPreferredNodeIDKey{})
if x == nil {
return 0
}
return x.(uint32)
}
35 changes: 22 additions & 13 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pool
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
"sync"
"time"

Expand All @@ -20,6 +21,7 @@ type (
Item interface {
IsAlive() bool
Close(ctx context.Context) error
NodeID() uint32
}
ItemConstraint[T any] interface {
*T
Expand All @@ -30,7 +32,7 @@ type (
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
Expand All @@ -48,7 +50,7 @@ type (
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
closeItem func(ctx context.Context, item PT)

mu xsync.RWMutex
Expand All @@ -63,7 +65,7 @@ type (
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
)

func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(context.Context, uint32) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func New[PT ItemConstraint[T], T any](
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context, uint32) (PT, error) {
var item T

return &item, nil
Expand All @@ -182,8 +184,8 @@ func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error)
// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
) func(ctx context.Context, preferredNodeID uint32) (PT, error) {
return func(ctx context.Context, preferredNodeID uint32) (PT, error) {
if !xsync.WithLock(&p.mu, func() bool {
if len(p.index)+p.createInProgress < p.config.limit {
p.createInProgress++
Expand Down Expand Up @@ -222,7 +224,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
defer cancelCreate()
}

newItem, err := p.config.createItem(createCtx)
newItem, err := p.config.createItem(createCtx, preferredNodeID)
if newItem != nil {
p.mu.WithLock(func() {
var useCounter uint64
Expand Down Expand Up @@ -314,7 +316,7 @@ func (p *Pool[PT, T]) changeState(changeState func() Stats) {
}
}

func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
func (p *Pool[PT, T]) try(ctx context.Context, f func(context.Context, PT) error) (finalErr error) {
if onTry := p.config.trace.OnTry; onTry != nil {
onDone := onTry(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
Expand Down Expand Up @@ -460,8 +462,13 @@ func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic
}

// p.mu must be held.
func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
func (p *Pool[PT, T]) peekFirstIdle(preferredNodeID uint32) (item PT, touched time.Time) {
el := p.idle.Front()
if preferredNodeID != 0 {
for el != nil && el.Value.NodeID() != preferredNodeID {
el = el.Next()
}
}
if el == nil {
return
}
Expand All @@ -478,8 +485,8 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
// to prevent session from dying in the internalPoolGC after it was returned
// to be used only in outgoing functions that make session busy.
// p.mu must be held.
func (p *Pool[PT, T]) removeFirstIdle() PT {
idle, _ := p.peekFirstIdle()
func (p *Pool[PT, T]) removeFirstIdle(preferredNodeID uint32) PT {
idle, _ := p.peekFirstIdle(preferredNodeID)
if idle != nil {
info := p.removeIdle(idle)
p.index[idle] = info
Expand Down Expand Up @@ -585,6 +592,8 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

preferredNodeID := operation.CtxPreferredNodeID(ctx)

for ; attempt < maxAttempts; attempt++ {
select {
case <-p.done:
Expand All @@ -593,7 +602,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}

if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
return p.removeFirstIdle()
return p.removeFirstIdle(preferredNodeID)
}); item != nil {
if item.IsAlive() {
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {
Expand Down Expand Up @@ -625,7 +634,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

item, err := p.createItem(ctx)
item, err := p.createItem(ctx, preferredNodeID)
if item != nil {
return item, nil
}
Expand Down
Loading

0 comments on commit d43de94

Please sign in to comment.