From dd36b86dde138c1609f0e708c907f623849a1cd4 Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Mon, 29 Jan 2018 23:31:58 -0500 Subject: [PATCH 1/3] switch mongodb driver --- mongofluxd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mongofluxd.go b/mongofluxd.go index 0cd17ed..8af2a99 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -6,11 +6,11 @@ import ( "flag" "fmt" "github.com/BurntSushi/toml" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" "github.com/influxdata/influxdb/client/v2" "github.com/rwynn/gtm" "github.com/rwynn/mongofluxd/mongofluxdplug" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" "io/ioutil" "log" "net" @@ -28,7 +28,7 @@ var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) const ( Name = "mongofluxd" - Version = "0.3.0" + Version = "0.4.0" mongoUrlDefault = "localhost" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 From 2c108812f31bef1bc7127aad6cc3a4abccc081ce Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Wed, 21 Feb 2018 16:26:26 -0500 Subject: [PATCH 2/3] fix build --- mongofluxd.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mongofluxd.go b/mongofluxd.go index 8af2a99..f6773db 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -28,7 +28,7 @@ var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) const ( Name = "mongofluxd" - Version = "0.4.0" + Version = "0.4.1" mongoUrlDefault = "localhost" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 @@ -812,7 +812,7 @@ func main() { } gtmCtx := gtm.Start(mongo, >m.Options{ After: after, - Filter: filter, + NamespaceFilter: filter, OpLogDatabaseName: oplogDatabaseName, OpLogCollectionName: oplogCollectionName, CursorTimeout: cursorTimeout, @@ -822,8 +822,6 @@ func main() { BufferDuration: gtmBufferDuration, BufferSize: config.GtmSettings.BufferSize, DirectReadNs: directReadNs, - DirectReadLimit: 1000, - DirectReadersPerCol: 1, }) if config.DirectReads && config.ExitAfterDirectReads { go func() { @@ -835,8 +833,8 @@ func main() { shutdownC := make(chan bool, config.InfluxClients) var wg sync.WaitGroup for i := 1; i <= config.InfluxClients; i++ { + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() flusher := time.NewTicker(1 * time.Second) influx := &InfluxCtx{ From bc623ea6d88710b45f42805957bc5d17e8746d5e Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Wed, 21 Feb 2018 17:05:16 -0500 Subject: [PATCH 3/3] update docs --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 09e3920..2172534 100644 --- a/README.md +++ b/README.md @@ -168,8 +168,6 @@ func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplu switch ptt := p.(type) { case map[string]interface{}: pt = ptt - case gtm.OpLogEntry: - pt = map[string]interface{}(ptt) default: return nil, fmt.Errorf("expected point of type %T but got %T", pt, p) }