Skip to content

Commit

Permalink
upgrade gtm
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Mar 17, 2018
2 parents 54e5b74 + bc623ea commit 21e571e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 9 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplu
switch ptt := p.(type) {
case map[string]interface{}:
pt = ptt
case gtm.OpLogEntry:
pt = map[string]interface{}(ptt)
default:
return nil, fmt.Errorf("expected point of type %T but got %T", pt, p)
}
Expand Down
12 changes: 5 additions & 7 deletions mongofluxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"flag"
"fmt"
"github.com/BurntSushi/toml"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/influxdata/influxdb/client/v2"
"github.com/rwynn/gtm"
"github.com/rwynn/mongofluxd/mongofluxdplug"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"io/ioutil"
"log"
"net"
Expand All @@ -28,7 +28,7 @@ var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags())

const (
Name = "mongofluxd"
Version = "0.4.0"
Version = "0.5.0"
mongoUrlDefault = "localhost"
influxUrlDefault = "http://localhost:8086"
influxClientsDefault = 10
Expand Down Expand Up @@ -812,7 +812,7 @@ func main() {
}
gtmCtx := gtm.Start(mongo, &gtm.Options{
After: after,
Filter: filter,
NamespaceFilter: filter,
OpLogDatabaseName: oplogDatabaseName,
OpLogCollectionName: oplogCollectionName,
CursorTimeout: cursorTimeout,
Expand All @@ -822,8 +822,6 @@ func main() {
BufferDuration: gtmBufferDuration,
BufferSize: config.GtmSettings.BufferSize,
DirectReadNs: directReadNs,
DirectReadLimit: 1000,
DirectReadersPerCol: 1,
})
if config.DirectReads && config.ExitAfterDirectReads {
go func() {
Expand All @@ -835,8 +833,8 @@ func main() {
shutdownC := make(chan bool, config.InfluxClients)
var wg sync.WaitGroup
for i := 1; i <= config.InfluxClients; i++ {
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
flusher := time.NewTicker(1 * time.Second)
influx := &InfluxCtx{
Expand Down

0 comments on commit 21e571e

Please sign in to comment.