diff --git a/async_transport.go b/async_transport.go index 61c055a..10fb109 100644 --- a/async_transport.go +++ b/async_transport.go @@ -2,6 +2,7 @@ package rollbar import ( "sync" + "time" ) // AsyncTransport is a concrete implementation of the Transport type which communicates with the @@ -30,6 +31,7 @@ func NewAsyncTransport(token string, endpoint string, buffer int) *AsyncTranspor Endpoint: endpoint, RetryAttempts: DefaultRetryAttempts, PrintPayloadOnError: true, + ItemsPerMinute: 0, }, bodyChannel: make(chan payload, buffer), Buffer: buffer, @@ -37,27 +39,37 @@ func NewAsyncTransport(token string, endpoint string, buffer int) *AsyncTranspor go func() { for p := range transport.bodyChannel { - canRetry, err := transport.post(p.body) - if err != nil { - if canRetry && p.retriesLeft > 0 { - p.retriesLeft -= 1 - select { - case transport.bodyChannel <- p: - default: - // This can happen if the bodyChannel had an item added to it from another - // thread while we are processing such that the channel is now full. If we try - // to send the payload back to the channel without this select statement we - // could deadlock. Instead we consider this a retry failure. + elapsedTime := time.Now().Sub(transport.startTime).Seconds() + if elapsedTime < 0 || elapsedTime >= 60 { + transport.startTime = time.Now() + transport.perMinCounter = 0 + } + if transport.shouldSend() { + canRetry, err := transport.post(p.body) + if err != nil { + if canRetry && p.retriesLeft > 0 { + p.retriesLeft -= 1 + select { + case transport.bodyChannel <- p: + default: + // This can happen if the bodyChannel had an item added to it from another + // thread while we are processing such that the channel is now full. If we try + // to send the payload back to the channel without this select statement we + // could deadlock. Instead we consider this a retry failure. + if transport.PrintPayloadOnError { + writePayloadToStderr(transport.Logger, p.body) + } + transport.waitGroup.Done() + } + } else { if transport.PrintPayloadOnError { writePayloadToStderr(transport.Logger, p.body) } transport.waitGroup.Done() } } else { - if transport.PrintPayloadOnError { - writePayloadToStderr(transport.Logger, p.body) - } transport.waitGroup.Done() + transport.perMinCounter++ } } else { transport.waitGroup.Done() diff --git a/async_transport_test.go b/async_transport_test.go index 470e485..192372a 100644 --- a/async_transport_test.go +++ b/async_transport_test.go @@ -1,13 +1,12 @@ -package rollbar_test +package rollbar import ( - "github.com/rollbar/rollbar-go" "testing" ) func TestAsyncTransportSend(t *testing.T) { - transport := rollbar.NewAsyncTransport("", "", 1) - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewAsyncTransport("", "", 1) + transport.SetLogger(&SilentClientLogger{}) body := map[string]interface{}{ "hello": "world", } @@ -19,8 +18,8 @@ func TestAsyncTransportSend(t *testing.T) { } func TestAsyncTransportSendFull(t *testing.T) { - transport := rollbar.NewAsyncTransport("", "", 1) - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewAsyncTransport("", "", 1) + transport.SetLogger(&SilentClientLogger{}) body := map[string]interface{}{ "hello": "world", } @@ -31,11 +30,14 @@ func TestAsyncTransportSendFull(t *testing.T) { t.Error("Expected to receive ErrBufferFull") } transport.Wait() + if transport.perMinCounter != 1 { + t.Error("shouldSend check failed") + } } func TestAsyncTransportClose(t *testing.T) { - transport := rollbar.NewAsyncTransport("", "", 1) - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewAsyncTransport("", "", 1) + transport.SetLogger(&SilentClientLogger{}) result := transport.Close() if result != nil { t.Error("Close returned an unexpected error:", result) @@ -43,8 +45,8 @@ func TestAsyncTransportClose(t *testing.T) { } func TestAsyncTransportSetToken(t *testing.T) { - transport := rollbar.NewAsyncTransport("", "", 1) - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewAsyncTransport("", "", 1) + transport.SetLogger(&SilentClientLogger{}) token := "abc" transport.SetToken(token) if transport.Token != token { @@ -53,11 +55,34 @@ func TestAsyncTransportSetToken(t *testing.T) { } func TestAsyncTransportSetEndpoint(t *testing.T) { - transport := rollbar.NewAsyncTransport("", "", 1) - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewAsyncTransport("", "", 1) + transport.SetLogger(&SilentClientLogger{}) endpoint := "https://fake.com" transport.SetEndpoint(endpoint) if transport.Endpoint != endpoint { t.Error("SetEndpoint failed") } } + +func TestAsyncTransportNotSend(t *testing.T) { + transport := NewAsyncTransport("", "", 2) + transport.SetLogger(&SilentClientLogger{}) + transport.SetItemsPerMinute(1) + if transport.ItemsPerMinute != 1 { + t.Error("SetItemsPerMinute failed") + } + + body := map[string]interface{}{ + "hello": "world", + } + + transport.Send(body) + result := transport.Send(body) + if result != nil { + t.Error("Send returned an unexpected error:", result) + } + transport.Wait() + if transport.perMinCounter != 1 { + t.Error("shouldSend check failed") + } +} diff --git a/base_transport.go b/base_transport.go index 0b16ef4..e205de6 100644 --- a/base_transport.go +++ b/base_transport.go @@ -3,9 +3,11 @@ package rollbar import ( "bytes" "encoding/json" + "fmt" "io" "io/ioutil" "net/http" + "time" ) type baseTransport struct { @@ -24,8 +26,13 @@ type baseTransport struct { // PrintPayloadOnError is whether or not to output the payload to the set logger or to stderr if // an error occurs during transport to the Rollbar API. PrintPayloadOnError bool + // ItemsPerMinute has the max number of items to send in a given minute + ItemsPerMinute int // custom http client (http.DefaultClient used by default) httpClient *http.Client + + perMinCounter int + startTime time.Time } // SetToken updates the token to use for future API requests. @@ -38,6 +45,11 @@ func (t *baseTransport) SetEndpoint(endpoint string) { t.Endpoint = endpoint } +// SetItemsPerMinute sets the max number of items to send in a given minute +func (t *baseTransport) SetItemsPerMinute(itemsPerMinute int) { + t.ItemsPerMinute = itemsPerMinute +} + // SetLogger updates the logger that this transport uses for reporting errors that occur while // processing items. func (t *baseTransport) SetLogger(logger ClientLogger) { @@ -106,3 +118,12 @@ func (t *baseTransport) post(body map[string]interface{}) (bool, error) { return false, nil } + +func (t *baseTransport) shouldSend() bool { + if t.ItemsPerMinute > 0 && t.perMinCounter >= t.ItemsPerMinute { + rollbarError(t.Logger, fmt.Sprintf("item per minute limit reached: %d occurences, "+ + "ignoring errors until timeout", t.perMinCounter)) + return false + } + return true +} diff --git a/client.go b/client.go index 6793c41..b4381e4 100644 --- a/client.go +++ b/client.go @@ -240,6 +240,12 @@ func (c *Client) SetRetryAttempts(retryAttempts int) { c.Transport.SetRetryAttempts(retryAttempts) } +// SetItemsPerMinute sets the max number of items to send in a given minute +func (c *Client) SetItemsPerMinute(itemsPerMinute int) { + c.configuration.itemsPerMinute = itemsPerMinute + c.Transport.SetItemsPerMinute(itemsPerMinute) +} + // SetPrintPayloadOnError sets whether or not to output the payload to the set logger or to // stderr if an error occurs during transport to the Rollbar API. For example, if you hit // your rate limit and we run out of retry attempts, then if this is true we will output the @@ -258,6 +264,11 @@ func (c *Client) Token() string { return c.configuration.token } +// ItemsPerMinute is the currently set Rollbar items per minute +func (c *Client) ItemsPerMinute() int { + return c.configuration.itemsPerMinute +} + // Environment is the currently set environment underwhich all errors and // messages will be submitted. func (c *Client) Environment() string { @@ -652,24 +663,25 @@ const ( ) type configuration struct { - enabled bool - token string - environment string - platform string - codeVersion string - serverHost string - serverRoot string - endpoint string - custom map[string]interface{} - fingerprint bool - scrubHeaders *regexp.Regexp - scrubFields *regexp.Regexp - checkIgnore func(string) bool - transform func(map[string]interface{}) - unwrapper UnwrapperFunc - stackTracer StackTracerFunc - person Person - captureIp captureIp + enabled bool + token string + environment string + platform string + codeVersion string + serverHost string + serverRoot string + endpoint string + custom map[string]interface{} + fingerprint bool + scrubHeaders *regexp.Regexp + scrubFields *regexp.Regexp + checkIgnore func(string) bool + transform func(map[string]interface{}) + unwrapper UnwrapperFunc + stackTracer StackTracerFunc + person Person + captureIp captureIp + itemsPerMinute int } func createConfiguration(token, environment, codeVersion, serverHost, serverRoot string) configuration { @@ -678,23 +690,24 @@ func createConfiguration(token, environment, codeVersion, serverHost, serverRoot hostname, _ = os.Hostname() } return configuration{ - enabled: true, - token: token, - environment: environment, - platform: runtime.GOOS, - endpoint: "https://api.rollbar.com/api/1/item/", - scrubHeaders: regexp.MustCompile("Authorization"), - scrubFields: regexp.MustCompile("password|secret|token"), - codeVersion: codeVersion, - serverHost: hostname, - serverRoot: serverRoot, - fingerprint: false, - checkIgnore: func(_s string) bool { return false }, - transform: func(_d map[string]interface{}) {}, - unwrapper: DefaultUnwrapper, - stackTracer: DefaultStackTracer, - person: Person{}, - captureIp: CaptureIpFull, + enabled: true, + token: token, + environment: environment, + platform: runtime.GOOS, + endpoint: "https://api.rollbar.com/api/1/item/", + scrubHeaders: regexp.MustCompile("Authorization"), + scrubFields: regexp.MustCompile("password|secret|token"), + codeVersion: codeVersion, + serverHost: hostname, + serverRoot: serverRoot, + fingerprint: false, + checkIgnore: func(_s string) bool { return false }, + transform: func(_d map[string]interface{}) {}, + unwrapper: DefaultUnwrapper, + stackTracer: DefaultStackTracer, + person: Person{}, + captureIp: CaptureIpFull, + itemsPerMinute: 0, } } diff --git a/client_test.go b/client_test.go index e213bdd..09c13b8 100644 --- a/client_test.go +++ b/client_test.go @@ -32,6 +32,7 @@ func (t *TestTransport) SetLogger(_l rollbar.ClientLogger) {} func (t *TestTransport) SetRetryAttempts(_r int) {} func (t *TestTransport) SetPrintPayloadOnError(_p bool) {} func (t *TestTransport) SetHTTPClient(_c *http.Client) {} +func (t *TestTransport) SetItemsPerMinute(_r int) {} func (t *TestTransport) Send(body map[string]interface{}) error { t.Body = body return nil @@ -425,6 +426,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) { scrubHeaders := regexp.MustCompile("Foo") scrubFields := regexp.MustCompile("squirrel|doggo") captureIP := rollbar.CaptureIpNone + itemsPerMinute := 10 errorIfEqual(token, client.Token(), t) errorIfEqual(environment, client.Environment(), t) @@ -438,6 +440,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) { errorIfEqual(scrubHeaders, client.ScrubHeaders(), t) errorIfEqual(scrubHeaders, client.Telemetry.Network.ScrubHeaders, t) errorIfEqual(scrubFields, client.ScrubFields(), t) + errorIfEqual(itemsPerMinute, client.ItemsPerMinute(), t) if client.Fingerprint() { t.Error("expected fingerprint to default to false") @@ -468,6 +471,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) { client.SetTelemetry() client.SetEnabled(true) + client.SetItemsPerMinute(itemsPerMinute) errorIfNotEqual(token, client.Token(), t) errorIfNotEqual(environment, client.Environment(), t) @@ -481,6 +485,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) { errorIfNotEqual(scrubHeaders, client.ScrubHeaders(), t) errorIfNotEqual(scrubHeaders, client.Telemetry.Network.ScrubHeaders, t) errorIfNotEqual(scrubFields, client.ScrubFields(), t) + errorIfNotEqual(itemsPerMinute, client.ItemsPerMinute(), t) if !client.Fingerprint() { t.Error("expected fingerprint to default to false") @@ -513,7 +518,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) { errorIfNotEqual(fingerprint, configuredOptions["fingerprint"].(bool), t) errorIfNotEqual(scrubHeaders, configuredOptions["scrubHeaders"].(*regexp.Regexp), t) errorIfNotEqual(scrubFields, configuredOptions["scrubFields"].(*regexp.Regexp), t) - + errorIfNotEqual(itemsPerMinute, configuredOptions["itemsPerMinute"].(int), t) } else { t.Fail() } diff --git a/rollbar.go b/rollbar.go index 33776be..f6bda08 100644 --- a/rollbar.go +++ b/rollbar.go @@ -128,6 +128,11 @@ func SetEndpoint(endpoint string) { std.SetEndpoint(endpoint) } +// SetItemsPerMinute sets the max number of items to send in a given minute +func SetItemsPerMinute(itemsPerMinute int) { + std.SetItemsPerMinute(itemsPerMinute) +} + // SetPlatform sets the platform on the managed Client instance. // The platform is reported for all Rollbar items. The default is // the running operating system (darwin, freebsd, linux, etc.) but it can diff --git a/sync_transport.go b/sync_transport.go index e56c2fe..db54ad8 100644 --- a/sync_transport.go +++ b/sync_transport.go @@ -1,5 +1,9 @@ package rollbar +import ( + "time" +) + // SyncTransport is a concrete implementation of the Transport type which communicates with the // Rollbar API synchronously. type SyncTransport struct { @@ -15,6 +19,9 @@ func NewSyncTransport(token, endpoint string) *SyncTransport { Endpoint: endpoint, RetryAttempts: DefaultRetryAttempts, PrintPayloadOnError: true, + ItemsPerMinute: 0, + perMinCounter: 0, + startTime: time.Now(), }, } } @@ -28,15 +35,24 @@ func (t *SyncTransport) Send(body map[string]interface{}) error { } func (t *SyncTransport) doSend(body map[string]interface{}, retriesLeft int) error { - canRetry, err := t.post(body) - if err != nil { - if !canRetry || retriesLeft <= 0 { - if t.PrintPayloadOnError { - writePayloadToStderr(t.Logger, body) + elapsedTime := time.Now().Sub(t.startTime).Seconds() + if elapsedTime < 0 || elapsedTime >= 60 { + t.startTime = time.Now() + t.perMinCounter = 0 + } + if t.shouldSend() { + canRetry, err := t.post(body) + if err != nil { + if !canRetry || retriesLeft <= 0 { + if t.PrintPayloadOnError { + writePayloadToStderr(t.Logger, body) + } + return err } - return err + return t.doSend(body, retriesLeft-1) + } else { + t.perMinCounter++ } - return t.doSend(body, retriesLeft-1) } return nil } diff --git a/sync_transport_test.go b/sync_transport_test.go index afc6bdb..4a8db37 100644 --- a/sync_transport_test.go +++ b/sync_transport_test.go @@ -1,13 +1,12 @@ -package rollbar_test +package rollbar import ( - "github.com/rollbar/rollbar-go" "testing" ) func TestSyncTransportSend(t *testing.T) { - transport := rollbar.NewSyncTransport("", "") - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) body := map[string]interface{}{ "hello": "world", } @@ -18,8 +17,8 @@ func TestSyncTransportSend(t *testing.T) { } func TestSyncTransportSendTwice(t *testing.T) { - transport := rollbar.NewSyncTransport("", "") - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) body := map[string]interface{}{ "hello": "world", } @@ -29,11 +28,15 @@ func TestSyncTransportSendTwice(t *testing.T) { if result != nil { t.Error("Send returned an unexpected error:", result) } + + if transport.perMinCounter != 2 { + t.Error("shouldSend check failed") + } } func TestSyncTransportClose(t *testing.T) { - transport := rollbar.NewSyncTransport("", "") - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) result := transport.Close() if result != nil { t.Error("Close returned an unexpected error:", result) @@ -41,8 +44,8 @@ func TestSyncTransportClose(t *testing.T) { } func TestSyncTransportSetToken(t *testing.T) { - transport := rollbar.NewSyncTransport("", "") - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) token := "abc" transport.SetToken(token) if transport.Token != token { @@ -51,11 +54,33 @@ func TestSyncTransportSetToken(t *testing.T) { } func TestSyncTransportSetEndpoint(t *testing.T) { - transport := rollbar.NewSyncTransport("", "") - transport.SetLogger(&rollbar.SilentClientLogger{}) + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) endpoint := "https://fake.com" transport.SetEndpoint(endpoint) if transport.Endpoint != endpoint { t.Error("SetEndpoint failed") } } + +func TestSyncTransportNotSend(t *testing.T) { + transport := NewSyncTransport("", "") + transport.SetLogger(&SilentClientLogger{}) + transport.SetItemsPerMinute(1) + if transport.ItemsPerMinute != 1 { + t.Error("SetItemsPerMinute failed") + } + + body := map[string]interface{}{ + "hello": "world", + } + + transport.Send(body) + result := transport.Send(body) + if result != nil { + t.Error("Send returned an unexpected error:", result) + } + if transport.perMinCounter != 1 { + t.Error("shouldSend check failed") + } +} diff --git a/transforms.go b/transforms.go index 11278e5..8dff133 100644 --- a/transforms.go +++ b/transforms.go @@ -79,20 +79,21 @@ func buildCustom(custom map[string]interface{}, extras map[string]interface{}) m func buildConfiguredOptions(configuration configuration) map[string]interface{} { return map[string]interface{}{ - "environment": configuration.environment, - "endpoint": configuration.endpoint, - "platform": configuration.platform, - "codeVersion": configuration.codeVersion, - "serverHost": configuration.serverHost, - "serverRoot": configuration.serverRoot, - "fingerprint": configuration.fingerprint, - "scrubHeaders": configuration.scrubHeaders, - "scrubFields": configuration.scrubFields, - "transform": functionToString(configuration.transform), - "unwrapper": functionToString(configuration.unwrapper), - "stackTracer": functionToString(configuration.stackTracer), - "checkIgnore": functionToString(configuration.checkIgnore), - "captureIp": configuration.captureIp, + "environment": configuration.environment, + "endpoint": configuration.endpoint, + "platform": configuration.platform, + "codeVersion": configuration.codeVersion, + "serverHost": configuration.serverHost, + "serverRoot": configuration.serverRoot, + "fingerprint": configuration.fingerprint, + "scrubHeaders": configuration.scrubHeaders, + "scrubFields": configuration.scrubFields, + "transform": functionToString(configuration.transform), + "unwrapper": functionToString(configuration.unwrapper), + "stackTracer": functionToString(configuration.stackTracer), + "checkIgnore": functionToString(configuration.checkIgnore), + "captureIp": configuration.captureIp, + "itemsPerMinute": configuration.itemsPerMinute, "person": map[string]string{ "Id": configuration.person.Id, "Username": configuration.person.Username, diff --git a/transport.go b/transport.go index 12faf31..ccbc986 100644 --- a/transport.go +++ b/transport.go @@ -40,6 +40,8 @@ type Transport interface { SetPrintPayloadOnError(printPayloadOnError bool) // Sets custom http client. http.DefaultClient is used by default SetHTTPClient(httpClient *http.Client) + // SetItemsPerMinute sets the max number of items to send in a given minute + SetItemsPerMinute(itemsPerMinute int) } // ClientLogger is the interface used by the rollbar Client/Transport to report problems.