From 0ac6e83073a6c9e618ad7e6cf3dd01b7849bb437 Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Wed, 15 Mar 2017 21:18:55 +0000 Subject: [PATCH] add plugin support --- README.md | 129 +++++++++++++++++- mongofluxd.go | 298 +++++++++++++++++++++++++++-------------- mongofluxdplug/plug.go | 29 ++++ 3 files changed, 351 insertions(+), 105 deletions(-) create mode 100644 mongofluxdplug/plug.go diff --git a/README.md b/README.md index c144a93..09e3920 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # mongofluxd Real time sync from MongoDB into InfluxDB + ### Installation Download the latest [release](https://github.com/rwynn/mongofluxd/releases) or install with go get @@ -83,16 +84,16 @@ measure = "sales" Load 100K documents of time series data into MongoDB. - // sleep for 2ms to ensure t is 2ms apart - for (var i=0; i<100000; ++i) { sleep(2); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); } + // sleep for 1ms to ensure t is 1ms apart + for (var i=0; i<100000; ++i) { sleep(1); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); } Run monfluxd with direct reads on test.test (config contents above) time ./mongofluxd -f flux.toml - real 0m2.167s - user 0m2.028s - sys 0m0.444s + real 0m1.982s + user 0m1.936s + sys 0m0.396s Verify it all got into InfluxDB @@ -108,5 +109,121 @@ Verify it all got into InfluxDB On a VirtualBox VM with 4 virtual cores and 4096 mb of memory, syncing 100K documents from MongoDB to InfluxDB -took only 2.167 seconds for a throughput of 46,146 points per second. +took only 1.936 seconds for a throughput of 50,454 points per second. + +### Advanced + +mongofluxd supports golang 1.8 plugins for advanced use cases. For example, you have a one-to-many relationship +between MongoDB documents and InfluxDB points. mongofluxd supports consuming 1 go plugin .so. This .so may expose +many public functions. mongofluxd supports mapping one plugin symbol (a function) with each measurement. + +The mapping function must be of the form: + + func (*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error) + +The following example plugin maps a single MongoDB document to multiple Points in InfluxDB: + +``` +package main + +import ( + "fmt" + "github.com/rwynn/gtm" + "github.com/rwynn/mongofluxd/mongofluxdplug" + "time" +) + +// Plugin to map a single MongoDB document to multiple InfluxDB points +// +// e.g. +// db.testplug.insert({ts: new Date(), pts: [{o: 0, d: 1.5}, {o: 2, d: 3.2}]}) +// where ts is the base time, o is the second offset for each point, and d is field data for each point + +func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplug.InfluxPoint, err error) { + doc := input.Data + + // reference base time + var t time.Time + switch doc["ts"].(type) { + case time.Time: + t = doc["ts"].(time.Time) + default: + return nil, fmt.Errorf("expected ts field with type %T but got %T", t, doc["ts"]) + } + + // reference list of points with time offset and data + var pts []interface{} + switch doc["pts"].(type) { + case []interface{}: + pts = doc["pts"].([]interface{}) + default: + return nil, fmt.Errorf("expected pts field with type %T but got %T", pts, doc["pts"]) + } + + // for each pt in this single document, add an InfluxPoint to the output + for _, p := range pts { + + // assert type of each point p + var pt map[string]interface{} + switch ptt := p.(type) { + case map[string]interface{}: + pt = ptt + case gtm.OpLogEntry: + pt = map[string]interface{}(ptt) + default: + return nil, fmt.Errorf("expected point of type %T but got %T", pt, p) + } + + // read offset and point data + var offset, pointData float64 + switch pt["o"].(type) { + case float64: + offset = pt["o"].(float64) + default: + return nil, fmt.Errorf("expected offset of type %T but got %T", offset, pt["o"]) + } + switch pt["d"].(type) { + case float64: + pointData = pt["d"].(float64) + default: + return nil, fmt.Errorf("expected point data of type %T but got %T", pointData, pt["d"]) + } + + // create a new InfluxPoint + point := &mongofluxdplug.InfluxPoint{ + Tags: make(map[string]string), + Fields: make(map[string]interface{}), + } + + // set time, fields, and tags on the Point + point.Timestamp = t.Add(time.Duration(int64(offset)) * time.Second) + point.Fields["d"] = pointData + + // append the Point to the output + output = append(output, point) + } + return output, nil +} +``` +The plugin can be built for go 1.8 and above using the go build command + + go build -buildmode=plugin -o myplugin.so myplugin.go + +The public plugin function, or symbol, can then be assigned to a measurement in the config file + +```toml + +plugin-path = "/path/to/myplugin.so" + +[[measurement]] +namespace = "test.testplug" +# for this measurement use a go plugin to map a single MongoDB document to multiple InfluxDB points +# in this case the function name to use is MyPointMapper +# the time, fields, and tags will be generated by the plugin +symbol = "MyPointMapper" +precision = "ms" +``` + +When a MongoDB document is inserted into the `test.testplug` namespace, the `MyPointMapper` function will +be invoked to determine a slice of Points to write to InfluxDB. diff --git a/mongofluxd.go b/mongofluxd.go index 49740b0..0cd17ed 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -8,6 +8,7 @@ import ( "github.com/BurntSushi/toml" "github.com/influxdata/influxdb/client/v2" "github.com/rwynn/gtm" + "github.com/rwynn/mongofluxd/mongofluxdplug" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "io/ioutil" @@ -15,6 +16,7 @@ import ( "net" "os" "os/signal" + "plugin" "regexp" "strings" "sync" @@ -26,7 +28,7 @@ var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) const ( Name = "mongofluxd" - Version = "0.2.0" + Version = "0.3.0" mongoUrlDefault = "localhost" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 @@ -57,8 +59,10 @@ type measureSettings struct { Retention string Precision string Measure string + Symbol string Tags []string Fields []string + plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error) } type configOptions struct { @@ -79,7 +83,7 @@ type configOptions struct { ResumeFromTimestamp int64 `toml:"resume-from-timestamp"` Replay bool ConfigFile string - Measurement []measureSettings + Measurement []*measureSettings InfluxUrl string `toml:"influx-url"` InfluxUser string `toml:"influx-user"` InfluxPassword string `toml:"influx-password"` @@ -90,6 +94,7 @@ type configOptions struct { InfluxBufferSize int `toml:"influx-buffer-size"` DirectReads bool `toml:"direct-reads"` ExitAfterDirectReads bool `toml:"exit-after-direct-reads"` + PluginPath string `toml:"plugin-path"` } type InfluxMeasure struct { @@ -100,6 +105,7 @@ type InfluxMeasure struct { measure string tags map[string]bool fields map[string]bool + plug func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error) } type InfluxCtx struct { @@ -112,6 +118,16 @@ type InfluxCtx struct { mongo *mgo.Session } +type InfluxDataMap struct { + op *gtm.Op + tags map[string]string + fields map[string]interface{} + timefield bool + measure *InfluxMeasure + t time.Time + name string +} + func TimestampTime(ts bson.MongoTimestamp) time.Time { return time.Unix(int64(ts>>32), 0).UTC() } @@ -134,6 +150,7 @@ func (ctx *InfluxCtx) setupMeasurements() error { retention: ms.Retention, precision: ms.Precision, measure: ms.Measure, + plug: ms.plug, tags: make(map[string]bool), fields: make(map[string]bool), } @@ -146,8 +163,10 @@ func (ctx *InfluxCtx) setupMeasurements() error { for _, field := range ms.Fields { im.fields[field] = true } - if len(im.fields) == 0 { - return fmt.Errorf("at least one field is required per measurement") + if im.plug == nil { + if len(im.fields) == 0 { + return fmt.Errorf("at least one field is required per measurement") + } } ctx.measures[ms.Namespace] = im } @@ -204,7 +223,7 @@ func (ctx *InfluxCtx) writeBatch() (err error) { } if ctx.config.Verbose { if points > 0 { - infoLog.Printf("%d points flushed", points) + infoLog.Printf("%d points flushed\n", points) } } ctx.m = make(map[string]client.BatchPoints) @@ -214,93 +233,7 @@ func (ctx *InfluxCtx) writeBatch() (err error) { return } -func (ctx *InfluxCtx) addPoint(op *gtm.Op) error { - measure := ctx.measures[op.Namespace] - if measure != nil { - if err := ctx.setupDatabase(op); err != nil { - return err - } - var t time.Time - timefield := measure.timefield != "" - tags := make(map[string]string) - fields := make(map[string]interface{}) - name := op.GetCollection() - if timefield == false { - t = TimestampTime(op.Timestamp) - } - if measure.measure != "" { - name = measure.measure - } - for k, v := range op.Data { - if k == "_id" { - continue - } - switch v.(type) { - case time.Time: - if measure.timefield == k { - t = v.(time.Time).UTC() - timefield = false - } - case bson.MongoTimestamp: - if measure.timefield == k { - ts := v.(bson.MongoTimestamp) - t = TimestampTime(ts) - timefield = false - } - case gtm.OpLogEntry: - flat := flatmap(k+".", v.(gtm.OpLogEntry)) - for fk, fv := range flat { - if measure.tags[fk] { - if istagtype(fv) { - tags[fk] = fv.(string) - } else { - log.Printf("Unsupported type %T for tag %s in namespace %s", fv, fk, op.Namespace) - } - } else if measure.fields[fk] { - if isfieldtype(fv) { - fields[fk] = fv - } else { - log.Printf("Unsupported type %T for field %s in namespace %s", fv, fk, op.Namespace) - } - - } - } - default: - if measure.tags[k] { - if istagtype(v) { - tags[k] = v.(string) - } else { - log.Printf("Unsupported type %T for tag %s in namespace %s", v, k, op.Namespace) - } - } else if measure.fields[k] { - if isfieldtype(v) { - fields[k] = v - } else { - log.Printf("Unsupported type %T for field %s in namespace %s", v, k, op.Namespace) - } - } - } - } - if timefield { - return fmt.Errorf("time field %s not found in document", measure.timefield) - } - pt, err := client.NewPoint(name, tags, fields, t) - if err != nil { - return err - } - bp := ctx.m[op.Namespace] - bp.AddPoint(pt) - ctx.lastTs = op.Timestamp - if len(bp.Points()) >= ctx.config.InfluxBufferSize { - if err := ctx.writeBatch(); err != nil { - return err - } - } - } - return nil -} - -func istagtype(v interface{}) bool { +func (m *InfluxDataMap) istagtype(v interface{}) bool { switch v.(type) { case string: return true @@ -309,7 +242,7 @@ func istagtype(v interface{}) bool { } } -func isfieldtype(v interface{}) bool { +func (m *InfluxDataMap) isfieldtype(v interface{}) bool { switch v.(type) { case string: return true @@ -324,17 +257,22 @@ func isfieldtype(v interface{}) bool { } } -func flatmap(prefix string, m gtm.OpLogEntry) gtm.OpLogEntry { +func (m *InfluxDataMap) flatmap(prefix string, e map[string]interface{}) map[string]interface{} { o := make(map[string]interface{}) - for k, v := range m { + for k, v := range e { switch child := v.(type) { + case map[string]interface{}: + nm := m.flatmap("", child) + for nk, nv := range nm { + o[prefix+k+"."+nk] = nv + } case gtm.OpLogEntry: - nm := flatmap("", child) + nm := m.flatmap("", child) for nk, nv := range nm { o[prefix+k+"."+nk] = nv } default: - if isfieldtype(v) { + if m.isfieldtype(v) { o[prefix+k] = v } } @@ -342,6 +280,132 @@ func flatmap(prefix string, m gtm.OpLogEntry) gtm.OpLogEntry { return o } +func (m *InfluxDataMap) unsupportedType(op *gtm.Op, k string, v interface{}, kind string) { + log.Printf("Unsupported type %T for %s %s in namespace %s\n", v, kind, k, op.Namespace) +} + +func (m *InfluxDataMap) loadName() { + if m.measure.measure != "" { + m.name = m.measure.measure + } else { + m.name = m.op.GetCollection() + } +} + +func (m *InfluxDataMap) loadKV(k string, v interface{}) { + if m.measure.tags[k] { + if m.istagtype(v) { + m.tags[k] = v.(string) + } else { + m.unsupportedType(m.op, k, v, "tag") + } + } else if m.measure.fields[k] { + if m.isfieldtype(v) { + m.fields[k] = v + } else { + m.unsupportedType(m.op, k, v, "field") + } + } +} + +func (m *InfluxDataMap) loadData() error { + m.tags = make(map[string]string) + m.fields = make(map[string]interface{}) + if m.measure.timefield == "" { + m.t = TimestampTime(m.op.Timestamp) + m.timefield = true + } + for k, v := range m.op.Data { + if k == "_id" { + continue + } + switch vt := v.(type) { + case time.Time: + if m.measure.timefield == k { + m.t = vt.UTC() + m.timefield = true + } + case bson.MongoTimestamp: + if m.measure.timefield == k { + m.t = TimestampTime(vt) + m.timefield = true + } + case map[string]interface{}: + flat := m.flatmap(k+".", vt) + for fk, fv := range flat { + m.loadKV(fk, fv) + } + case gtm.OpLogEntry: + flat := m.flatmap(k+".", vt) + for fk, fv := range flat { + m.loadKV(fk, fv) + } + default: + m.loadKV(k, v) + } + } + if m.timefield == false { + if tf, ok := m.op.Data[m.measure.timefield]; ok { + return fmt.Errorf("time field %s had type %T, but expected %T", m.measure.timefield, tf, m.t) + } else { + return fmt.Errorf("time field %s not found in document", m.measure.timefield) + } + } else { + return nil + } + +} + +func (ctx *InfluxCtx) addPoint(op *gtm.Op) error { + measure := ctx.measures[op.Namespace] + if measure != nil { + if err := ctx.setupDatabase(op); err != nil { + return err + } + bp := ctx.m[op.Namespace] + mapper := &InfluxDataMap{ + op: op, + measure: measure, + } + mapper.loadName() + if measure.plug != nil { + points, err := measure.plug(&mongofluxdplug.MongoDocument{ + Data: op.Data, + Namespace: op.Namespace, + Database: op.GetDatabase(), + Collection: op.GetCollection(), + Operation: op.Operation, + }) + if err != nil { + return err + } + for _, pt := range points { + pt, err := client.NewPoint(mapper.name, pt.Tags, pt.Fields, pt.Timestamp) + if err != nil { + return err + } + bp.AddPoint(pt) + } + } else { + if err := mapper.loadData(); err != nil { + return err + } + pt, err := client.NewPoint(mapper.name, mapper.tags, mapper.fields, mapper.t) + if err != nil { + return err + } + bp.AddPoint(pt) + } + ctx.lastTs = op.Timestamp + if len(bp.Points()) >= ctx.config.InfluxBufferSize { + if err := ctx.writeBatch(); err != nil { + return err + } + } + } + return nil +} + func IsInsertOrUpdate(op *gtm.Op) bool { return op.IsInsert() || op.IsUpdate() } @@ -402,12 +466,44 @@ func (config *configOptions) ParseCommandLineFlags() *configOptions { flag.BoolVar(&config.ResumeWriteUnsafe, "resume-write-unsafe", false, "True to speedup writes of the last timestamp synched for resuming at the cost of error checking") flag.BoolVar(&config.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch") flag.StringVar(&config.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'") + flag.StringVar(&config.PluginPath, "plugin-path", "", "The file path to a .so file plugin") flag.BoolVar(&config.DirectReads, "direct-reads", false, "Set to true to read directly from MongoDB collections") flag.BoolVar(&config.ExitAfterDirectReads, "exit-after-direct-reads", false, "Set to true to exit after direct reads are complete") flag.Parse() return config } +func (config *configOptions) LoadPlugin() *configOptions { + if config.PluginPath == "" { + if config.Verbose { + infoLog.Println("no plugins detected") + } + return config + } + p, err := plugin.Open(config.PluginPath) + if err != nil { + log.Panicf("Unable to load plugin <%s>: %s", config.PluginPath, err) + } + for _, m := range config.Measurement { + if m.Symbol != "" { + f, err := p.Lookup(m.Symbol) + if err != nil { + log.Panicf("Unable to lookup symbol <%s> for plugin <%s>: %s", m.Symbol, config.PluginPath, err) + } + switch f.(type) { + case func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error): + m.plug = f.(func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)) + default: + log.Panicf("Plugin symbol <%s> must be typed %T", m.Symbol, m.plug) + } + } + } + if config.Verbose { + infoLog.Printf("plugin <%s> loaded succesfully\n", config.PluginPath) + } + return config +} + func (config *configOptions) LoadConfigFile() *configOptions { if config.ConfigFile != "" { var tomlConfig configOptions = configOptions{ @@ -487,6 +583,9 @@ func (config *configOptions) LoadConfigFile() *configOptions { if config.Resume && config.ResumeName == "" { config.ResumeName = tomlConfig.ResumeName } + if config.PluginPath == "" { + config.PluginPath = tomlConfig.PluginPath + } config.MongoDialSettings = tomlConfig.MongoDialSettings config.MongoSessionSettings = tomlConfig.MongoSessionSettings config.GtmSettings = tomlConfig.GtmSettings @@ -576,7 +675,7 @@ func (config *configOptions) DialMongo() (*mgo.Session, error) { dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) { conn, err := tls.Dial("tcp", addr.String(), tlsConfig) if err != nil { - log.Printf("Unable to dial mongodb: %s", err) + log.Printf("Unable to dial mongodb: %s\n", err) } return conn, err } @@ -607,6 +706,7 @@ func GtmDefaultSettings() gtmSettings { func main() { log.SetPrefix("ERROR ") + config := &configOptions{ MongoDialSettings: mongoDialSettings{Timeout: -1}, MongoSessionSettings: mongoSessionSettings{SocketTimeout: -1, SyncTimeout: -1}, @@ -617,7 +717,7 @@ func main() { fmt.Println(Version) os.Exit(0) } - config.LoadConfigFile().SetDefaults() + config.LoadConfigFile().SetDefaults().LoadPlugin() sigs := make(chan os.Signal, 1) stopC := make(chan bool, 1) diff --git a/mongofluxdplug/plug.go b/mongofluxdplug/plug.go new file mode 100644 index 0000000..3d96ab5 --- /dev/null +++ b/mongofluxdplug/plug.go @@ -0,0 +1,29 @@ +package mongofluxdplug + +import "time" + +// plugins must import this package +// import "github.com/rwynn/mongofluxd/mongofluxdplug + +// plugins must implement a function per measurement with the following signature +// e.g. func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplug.InfluxPoint, err error) +// the function name must then be associated with the measurement in the toml config +// [[measurement]] +// symbol = "MyPointMapper" + +// plugins can be compiled using go build -buildmode=plugin -o myplugin.so myplugin.go +// to enable the plugin start with mongofluxd -plugin-path /path/to/myplugin.so + +type MongoDocument struct { + Data map[string]interface{} // the original document data from MongoDB + Database string // the origin database in MongoDB + Collection string // the origin collection in MongoDB + Namespace string // the entire namespace for the original document + Operation string // "i" for a insert or "u" for update +} + +type InfluxPoint struct { + Tags map[string]string // optional tags to set on the Point + Fields map[string]interface{} // fields to set on the Point + Timestamp time.Time // the time of the Point +}