From 68a529c97ac75f941a622be6dd92a3113f659ff1 Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Sun, 10 Feb 2019 22:53:35 +0000 Subject: [PATCH] add ability for a measurement to target a specific influxdb database --- README.md | 2 ++ go.sum | 2 ++ mongofluxd.go | 32 +++++++++++++++++--------------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 3bcd522..f3d8605 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,8 @@ fields = ["sales", "price"] retention = "RP1" # override the measurement name which defaults to the name of the MongoDB collection measure = "sales" +# override the influx database name which default to the name of the MongoDB database +database = "salesdb" [[measurement]] namespace = "db.col" diff --git a/go.sum b/go.sum index 7e3061a..a91fbcc 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,8 @@ github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073 h1:OgMl+/XpaQzW5c38u59HA github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073/go.mod h1:LYXeTMjbA7l9k9oEM+NUBuu0BgvNrD5nQuo8seLsar0= github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190 h1:trPv91MYUdV6w+TaBJsudBfJOBRozGvneuDZVmEc/QI= github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ= +github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2 h1:+QMMf2dZav3oviMURjgWnNyppc14490oMi43Hd/PffA= +github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= diff --git a/mongofluxd.go b/mongofluxd.go index 452eb62..5fd73c9 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -29,7 +29,7 @@ var errorLog *log.Logger = log.New(os.Stdout, "ERROR ", log.Flags()) const ( Name = "mongofluxd" - Version = "0.7.1" + Version = "0.8.0" mongoUrlDefault = "localhost" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 @@ -56,6 +56,7 @@ type measureSettings struct { Retention string Precision string Measure string + Database string Symbol string Tags []string Fields []string @@ -105,6 +106,7 @@ type InfluxMeasure struct { retention string precision string measure string + database string tags map[string]bool fields map[string]bool plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error) @@ -164,6 +166,7 @@ func (ctx *InfluxCtx) setupMeasurements() error { retention: ms.Retention, precision: ms.Precision, measure: ms.Measure, + database: ms.Database, plug: ms.plug, tags: make(map[string]bool), fields: make(map[string]bool), @@ -174,6 +177,12 @@ func (ctx *InfluxCtx) setupMeasurements() error { return err } } + if im.database == "" { + im.database = strings.SplitN(im.ns, ".", 2)[0] + } + if im.measure == "" { + im.measure = strings.SplitN(im.ns, ".", 2)[1] + } if im.precision == "" { im.precision = "s" } @@ -218,18 +227,19 @@ func (ctx *InfluxCtx) createDatabase(db string) error { } func (ctx *InfluxCtx) setupDatabase(op *gtm.Op) error { - db, ns := op.GetDatabase(), op.Namespace + ns := op.Namespace if _, found := ctx.m[ns]; found == false { + measure := ctx.measures[ns] bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: db, - RetentionPolicy: ctx.measures[ns].retention, - Precision: ctx.measures[ns].precision, + Database: measure.database, + RetentionPolicy: measure.retention, + Precision: measure.precision, }) if err != nil { return err } ctx.m[ns] = bp - if err := ctx.createDatabase(db); err != nil { + if err := ctx.createDatabase(measure.database); err != nil { return err } } @@ -313,14 +323,6 @@ func (m *InfluxDataMap) unsupportedType(op *gtm.Op, k string, v interface{}, kin errorLog.Printf("Unsupported type %T for %s %s in namespace %s\n", v, kind, k, op.Namespace) } -func (m *InfluxDataMap) loadName() { - if m.measure.measure != "" { - m.name = m.measure.measure - } else { - m.name = m.op.GetCollection() - } -} - func (m *InfluxDataMap) loadKV(k string, v interface{}) { if m.measure.tags[k] { if m.istagtype(v) { @@ -419,8 +421,8 @@ func (ctx *InfluxCtx) addPoint(op *gtm.Op) error { mapper := &InfluxDataMap{ op: op, measure: measure, + name: measure.measure, } - mapper.loadName() if measure.plug != nil { points, err := measure.plug(&mongofluxdplug.MongoDocument{ Data: op.Data,