Skip to content

Commit

Permalink
Use one grpc status error instead of joining them together. Round up …
Browse files Browse the repository at this point in the history
…in rate limiting.
  • Loading branch information
ggreer committed Sep 11, 2024
1 parent b2318d0 commit a9c9d7c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 66 deletions.
11 changes: 9 additions & 2 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -108,14 +109,20 @@ func shouldWaitAndRetry(ctx context.Context, err error) bool {
for _, detail := range details {
if rlData, ok := detail.(*v2.RateLimitDescription); ok {
wait = time.Until(rlData.ResetAt.AsTime())
wait /= time.Duration(rlData.Limit)
// Round up to the nearest second to make sure we don't hit the rate limit again
wait = time.Duration(math.Ceil(wait.Seconds())) * time.Second
}
}
l.Debug("details from status error", zap.Any("details", details), zap.Int("len of details", len(details)), zap.Error(err))
} else {
l.Debug("unable to parse rate limit description from error", zap.Error(err), zap.Any("status", st))
}

l.Error("RETRYING OPERATION", zap.Error(err), zap.Duration("wait", wait))
if wait == time.Duration(attempts)*time.Second {
l.Error("normal retry :(", zap.Error(err), zap.Duration("wait", wait))
} else {
l.Error("SUCCESS RETRYING OPERATION", zap.Error(err), zap.Duration("wait", wait))
}

for {
select {
Expand Down
91 changes: 27 additions & 64 deletions pkg/uhttp/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/url"
"os"
"strconv"
"time"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
"github.com/conductorone/baton-sdk/pkg/ratelimit"
Expand Down Expand Up @@ -214,18 +213,22 @@ func WithResponse(response interface{}) DoOption {
}
}

func wrapRetryAfterInStatus(httpStatus int, headers *http.Header, preferredCode codes.Code) (*status.Status, error) {
description, err := ratelimit.ExtractRateLimitData(httpStatus, headers)
func wrapErrors(preferredCode codes.Code, resp *http.Response, errs ...error) error {
description, err := ratelimit.ExtractRateLimitData(resp.StatusCode, &resp.Header)
if err != nil {
return nil, err
return err
}

st := status.New(preferredCode, http.StatusText(httpStatus))
st := status.New(preferredCode, resp.Status)
st, err = st.WithDetails(description)
if err != nil {
return nil, err
return err
}
return st, nil

allErrs := []error{st.Err()}
allErrs = append(allErrs, errs...)

return errors.Join(allErrs...)
}

func (c *BaseHttpClient) Do(req *http.Request, options ...DoOption) (*http.Response, error) {
Expand Down Expand Up @@ -284,86 +287,46 @@ func (c *BaseHttpClient) Do(req *http.Request, options ...DoOption) (*http.Respo
Body: body,
}

var optErr error
var optErrs []error
for _, option := range options {
err = option(&wresp)
if err != nil {
optErr = errors.Join(optErr, err)
optErr := option(&wresp)
if optErr != nil {
optErrs = append(optErrs, optErr)
}
}

st, err := wrapRetryAfterInStatus(resp.StatusCode, &resp.Header, codes.Unavailable)
if err != nil {
l.Debug("error getting rate limit data from response", zap.Error(err))
err = nil
}
var stErr error
if st != nil {
stErr = st.Err()
}

switch resp.StatusCode {
case http.StatusRequestTimeout:
return resp, errors.Join(status.Error(codes.DeadlineExceeded, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.DeadlineExceeded, resp, optErrs...)
case http.StatusTooManyRequests, http.StatusServiceUnavailable:
return resp, errors.Join(status.Error(codes.Unavailable, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.Unavailable, resp, optErrs...)
case http.StatusNotFound:
return resp, errors.Join(status.Error(codes.NotFound, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.NotFound, resp, optErrs...)
case http.StatusUnauthorized:
return resp, errors.Join(status.Error(codes.Unauthenticated, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.Unauthenticated, resp, optErrs...)
case http.StatusForbidden:
return resp, errors.Join(status.Error(codes.PermissionDenied, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.PermissionDenied, resp, optErrs...)
case http.StatusNotImplemented:
return resp, errors.Join(status.Error(codes.Unimplemented, resp.Status), optErr, stErr)
return resp, wrapErrors(codes.Unimplemented, resp, optErrs...)
}

if resp.StatusCode >= 500 && resp.StatusCode <= 599 {
return resp, errors.Join(status.Error(codes.Unavailable, resp.Status), optErr, stErr)
}

allErrs := errors.Join(stErr, err, optErr)
l.Error("allErrs", zap.Error(allErrs))
type grpcstatus interface{ GRPCStatus() *status.Status }

var grpcStatus grpcstatus
ok := errors.As(allErrs, &grpcStatus)
if ok {
realStatus := grpcStatus.GRPCStatus()
l.Error("GRPC_STATUS", zap.Any("grpcStatus", realStatus), zap.Any("details", realStatus.Details()))
} else {
l.Error("NO_GRPC_STATUS", zap.Error(allErrs))
}
if stfe, ok := status.FromError(allErrs); ok {
details := stfe.Details()
if len(details) == 0 {
l.Error("OMG NO DETAILS", zap.Any("stfe", stfe))
if allErrs == nil {
l.Error("ALL ERRS IS NIL")
}
}
for _, detail := range details {
if rlData, ok := detail.(*v2.RateLimitDescription); ok {
wait := time.Until(rlData.ResetAt.AsTime())
l.Debug("RL_DATA_SUCCESS", zap.Duration("reset_at", wait))
} else {
l.Debug("RL_DATA_FAIL", zap.Any("detail", detail))
}
}
return resp, wrapErrors(codes.Unavailable, resp, optErrs...)
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return resp, errors.Join(stErr, status.Error(codes.Unknown, fmt.Sprintf("unexpected status code: %d", resp.StatusCode)), optErr)
// TODO: add opterrs here
return resp, wrapErrors(codes.Unknown, resp, fmt.Errorf("unexpected status code: %d", resp.StatusCode))
}

if req.Method == http.MethodGet && resp.StatusCode == http.StatusOK {
err := c.baseHttpCache.Set(cacheKey, resp)
if err != nil {
l.Warn("error setting cache", zap.String("cacheKey", cacheKey), zap.String("url", req.URL.String()), zap.Error(err))
err = nil
cacheErr := c.baseHttpCache.Set(cacheKey, resp)
if cacheErr != nil {
l.Warn("error setting cache", zap.String("cacheKey", cacheKey), zap.String("url", req.URL.String()), zap.Error(cacheErr))
}
}

return resp, errors.Join(err, optErr)
return resp, errors.Join(optErrs...)
}

func WithHeader(key, value string) RequestOption {
Expand Down

0 comments on commit a9c9d7c

Please sign in to comment.