Skip to content

Commit

Permalink
add plugin support
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Mar 15, 2017
1 parent a9cf704 commit 0ac6e83
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 105 deletions.
129 changes: 123 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# mongofluxd
Real time sync from MongoDB into InfluxDB


### Installation

Download the latest [release](https://github.com/rwynn/mongofluxd/releases) or install with go get
Expand Down Expand Up @@ -83,16 +84,16 @@ measure = "sales"

Load 100K documents of time series data into MongoDB.

// sleep for 2ms to ensure t is 2ms apart
for (var i=0; i<100000; ++i) { sleep(2); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); }
// sleep for 1ms to ensure t is 1ms apart
for (var i=0; i<100000; ++i) { sleep(1); var t = new Date(); db.test.insert({c: 1, d: 5.5, t: t}); }

Run monfluxd with direct reads on test.test (config contents above)

time ./mongofluxd -f flux.toml

real 0m2.167s
user 0m2.028s
sys 0m0.444s
real 0m1.982s
user 0m1.936s
sys 0m0.396s

Verify it all got into InfluxDB

Expand All @@ -108,5 +109,121 @@ Verify it all got into InfluxDB


On a VirtualBox VM with 4 virtual cores and 4096 mb of memory, syncing 100K documents from MongoDB to InfluxDB
took only 2.167 seconds for a throughput of 46,146 points per second.
took only 1.936 seconds for a throughput of 50,454 points per second.

### Advanced

mongofluxd supports golang 1.8 plugins for advanced use cases. For example, you have a one-to-many relationship
between MongoDB documents and InfluxDB points. mongofluxd supports consuming 1 go plugin .so. This .so may expose
many public functions. mongofluxd supports mapping one plugin symbol (a function) with each measurement.

The mapping function must be of the form:

func (*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)

The following example plugin maps a single MongoDB document to multiple Points in InfluxDB:

```
package main
import (
"fmt"
"github.com/rwynn/gtm"
"github.com/rwynn/mongofluxd/mongofluxdplug"
"time"
)
// Plugin to map a single MongoDB document to multiple InfluxDB points
//
// e.g.
// db.testplug.insert({ts: new Date(), pts: [{o: 0, d: 1.5}, {o: 2, d: 3.2}]})
// where ts is the base time, o is the second offset for each point, and d is field data for each point
func MyPointMapper(input *mongofluxdplug.MongoDocument) (output []*mongofluxdplug.InfluxPoint, err error) {
doc := input.Data
// reference base time
var t time.Time
switch doc["ts"].(type) {
case time.Time:
t = doc["ts"].(time.Time)
default:
return nil, fmt.Errorf("expected ts field with type %T but got %T", t, doc["ts"])
}
// reference list of points with time offset and data
var pts []interface{}
switch doc["pts"].(type) {
case []interface{}:
pts = doc["pts"].([]interface{})
default:
return nil, fmt.Errorf("expected pts field with type %T but got %T", pts, doc["pts"])
}
// for each pt in this single document, add an InfluxPoint to the output
for _, p := range pts {
// assert type of each point p
var pt map[string]interface{}
switch ptt := p.(type) {
case map[string]interface{}:
pt = ptt
case gtm.OpLogEntry:
pt = map[string]interface{}(ptt)
default:
return nil, fmt.Errorf("expected point of type %T but got %T", pt, p)
}
// read offset and point data
var offset, pointData float64
switch pt["o"].(type) {
case float64:
offset = pt["o"].(float64)
default:
return nil, fmt.Errorf("expected offset of type %T but got %T", offset, pt["o"])
}
switch pt["d"].(type) {
case float64:
pointData = pt["d"].(float64)
default:
return nil, fmt.Errorf("expected point data of type %T but got %T", pointData, pt["d"])
}
// create a new InfluxPoint
point := &mongofluxdplug.InfluxPoint{
Tags: make(map[string]string),
Fields: make(map[string]interface{}),
}
// set time, fields, and tags on the Point
point.Timestamp = t.Add(time.Duration(int64(offset)) * time.Second)
point.Fields["d"] = pointData
// append the Point to the output
output = append(output, point)
}
return output, nil
}
```
The plugin can be built for go 1.8 and above using the go build command

go build -buildmode=plugin -o myplugin.so myplugin.go

The public plugin function, or symbol, can then be assigned to a measurement in the config file

```toml

plugin-path = "/path/to/myplugin.so"

[[measurement]]
namespace = "test.testplug"
# for this measurement use a go plugin to map a single MongoDB document to multiple InfluxDB points
# in this case the function name to use is MyPointMapper
# the time, fields, and tags will be generated by the plugin
symbol = "MyPointMapper"
precision = "ms"
```

When a MongoDB document is inserted into the `test.testplug` namespace, the `MyPointMapper` function will
be invoked to determine a slice of Points to write to InfluxDB.

Loading

0 comments on commit 0ac6e83

Please sign in to comment.