Skip to content

Commit

Permalink
add ability for a measurement to target a specific influxdb database
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Feb 10, 2019
1 parent c1ca001 commit 68a529c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ fields = ["sales", "price"]
retention = "RP1"
# override the measurement name which defaults to the name of the MongoDB collection
measure = "sales"
# override the influx database name which default to the name of the MongoDB database
database = "salesdb"

[[measurement]]
namespace = "db.col"
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073 h1:OgMl+/XpaQzW5c38u59HA
github.com/rwynn/gtm v0.0.0-20190131014030-4d96eedfb073/go.mod h1:LYXeTMjbA7l9k9oEM+NUBuu0BgvNrD5nQuo8seLsar0=
github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190 h1:trPv91MYUdV6w+TaBJsudBfJOBRozGvneuDZVmEc/QI=
github.com/rwynn/mgo v0.0.0-20190130173337-9b1257fb3190/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ=
github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2 h1:+QMMf2dZav3oviMURjgWnNyppc14490oMi43Hd/PffA=
github.com/rwynn/mgo v0.0.0-20190203195949-55e8fd85e7e2/go.mod h1:OQBK0ebL25cW31topLSUPIWIrSesue7+zTa/haAXccQ=
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down
32 changes: 17 additions & 15 deletions mongofluxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var errorLog *log.Logger = log.New(os.Stdout, "ERROR ", log.Flags())

const (
Name = "mongofluxd"
Version = "0.7.1"
Version = "0.8.0"
mongoUrlDefault = "localhost"
influxUrlDefault = "http://localhost:8086"
influxClientsDefault = 10
Expand All @@ -56,6 +56,7 @@ type measureSettings struct {
Retention string
Precision string
Measure string
Database string
Symbol string
Tags []string
Fields []string
Expand Down Expand Up @@ -105,6 +106,7 @@ type InfluxMeasure struct {
retention string
precision string
measure string
database string
tags map[string]bool
fields map[string]bool
plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)
Expand Down Expand Up @@ -164,6 +166,7 @@ func (ctx *InfluxCtx) setupMeasurements() error {
retention: ms.Retention,
precision: ms.Precision,
measure: ms.Measure,
database: ms.Database,
plug: ms.plug,
tags: make(map[string]bool),
fields: make(map[string]bool),
Expand All @@ -174,6 +177,12 @@ func (ctx *InfluxCtx) setupMeasurements() error {
return err
}
}
if im.database == "" {
im.database = strings.SplitN(im.ns, ".", 2)[0]
}
if im.measure == "" {
im.measure = strings.SplitN(im.ns, ".", 2)[1]
}
if im.precision == "" {
im.precision = "s"
}
Expand Down Expand Up @@ -218,18 +227,19 @@ func (ctx *InfluxCtx) createDatabase(db string) error {
}

func (ctx *InfluxCtx) setupDatabase(op *gtm.Op) error {
db, ns := op.GetDatabase(), op.Namespace
ns := op.Namespace
if _, found := ctx.m[ns]; found == false {
measure := ctx.measures[ns]
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: db,
RetentionPolicy: ctx.measures[ns].retention,
Precision: ctx.measures[ns].precision,
Database: measure.database,
RetentionPolicy: measure.retention,
Precision: measure.precision,
})
if err != nil {
return err
}
ctx.m[ns] = bp
if err := ctx.createDatabase(db); err != nil {
if err := ctx.createDatabase(measure.database); err != nil {
return err
}
}
Expand Down Expand Up @@ -313,14 +323,6 @@ func (m *InfluxDataMap) unsupportedType(op *gtm.Op, k string, v interface{}, kin
errorLog.Printf("Unsupported type %T for %s %s in namespace %s\n", v, kind, k, op.Namespace)
}

func (m *InfluxDataMap) loadName() {
if m.measure.measure != "" {
m.name = m.measure.measure
} else {
m.name = m.op.GetCollection()
}
}

func (m *InfluxDataMap) loadKV(k string, v interface{}) {
if m.measure.tags[k] {
if m.istagtype(v) {
Expand Down Expand Up @@ -419,8 +421,8 @@ func (ctx *InfluxCtx) addPoint(op *gtm.Op) error {
mapper := &InfluxDataMap{
op: op,
measure: measure,
name: measure.measure,
}
mapper.loadName()
if measure.plug != nil {
points, err := measure.plug(&mongofluxdplug.MongoDocument{
Data: op.Data,
Expand Down

0 comments on commit 68a529c

Please sign in to comment.