Skip to content

Commit

Permalink
performance op on filters
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Mar 12, 2017
1 parent cb7e7b7 commit a9cf704
Showing 1 changed file with 12 additions and 42 deletions.
54 changes: 12 additions & 42 deletions mongofluxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import (
)

var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags())
var chunksRegex = regexp.MustCompile("\\.chunks$")
var systemsRegex = regexp.MustCompile("system\\..+$")

const (
Name = "mongofluxd"
Version = "0.1.0"
Version = "0.2.0"
mongoUrlDefault = "localhost"
influxUrlDefault = "http://localhost:8086"
influxClientsDefault = 10
Expand Down Expand Up @@ -74,8 +72,6 @@ type configOptions struct {
MongoSessionSettings mongoSessionSettings `toml:"mongo-session-settings"`
GtmSettings gtmSettings `toml:"gtm-settings"`
ResumeName string `toml:"resume-name"`
NsRegex string `toml:"namespace-regex"`
NsExcludeRegex string `toml:"namespace-exclude-regex"`
Version bool
Verbose bool
Resume bool
Expand Down Expand Up @@ -354,28 +350,6 @@ func NotMongoFlux(op *gtm.Op) bool {
return op.GetDatabase() != Name
}

func NotChunks(op *gtm.Op) bool {
return !chunksRegex.MatchString(op.GetCollection())
}

func NotSystem(op *gtm.Op) bool {
return !systemsRegex.MatchString(op.GetCollection())
}

func FilterWithRegex(regex string) gtm.OpFilter {
var validNameSpace = regexp.MustCompile(regex)
return func(op *gtm.Op) bool {
return validNameSpace.MatchString(op.Namespace)
}
}

func FilterInverseWithRegex(regex string) gtm.OpFilter {
var invalidNameSpace = regexp.MustCompile(regex)
return func(op *gtm.Op) bool {
return !invalidNameSpace.MatchString(op.Namespace)
}
}

func ResumeWork(ctx *gtm.OpCtx, session *mgo.Session, config *configOptions) {
col := session.DB(Name).C("resume")
doc := make(map[string]interface{})
Expand All @@ -395,6 +369,16 @@ func SaveTimestamp(session *mgo.Session, ts bson.MongoTimestamp, resumeName stri
return err
}

func (config *configOptions) onlyMeasured() gtm.OpFilter {
measured := make(map[string]bool)
for _, m := range config.Measurement {
measured[m.Namespace] = true
}
return func(op *gtm.Op) bool {
return measured[op.Namespace]
}
}

func (config *configOptions) ParseCommandLineFlags() *configOptions {
flag.StringVar(&config.InfluxUrl, "influx-url", "", "InfluxDB connection URL")
flag.StringVar(&config.InfluxUser, "influx-user", "", "InfluxDB user name")
Expand All @@ -418,8 +402,6 @@ func (config *configOptions) ParseCommandLineFlags() *configOptions {
flag.BoolVar(&config.ResumeWriteUnsafe, "resume-write-unsafe", false, "True to speedup writes of the last timestamp synched for resuming at the cost of error checking")
flag.BoolVar(&config.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch")
flag.StringVar(&config.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'")
flag.StringVar(&config.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which match are synched to elasticsearch")
flag.StringVar(&config.NsExcludeRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which do not match are synched to elasticsearch")
flag.BoolVar(&config.DirectReads, "direct-reads", false, "Set to true to read directly from MongoDB collections")
flag.BoolVar(&config.ExitAfterDirectReads, "exit-after-direct-reads", false, "Set to true to exit after direct reads are complete")
flag.Parse()
Expand Down Expand Up @@ -505,12 +487,6 @@ func (config *configOptions) LoadConfigFile() *configOptions {
if config.Resume && config.ResumeName == "" {
config.ResumeName = tomlConfig.ResumeName
}
if config.NsRegex == "" {
config.NsRegex = tomlConfig.NsRegex
}
if config.NsExcludeRegex == "" {
config.NsExcludeRegex = tomlConfig.NsExcludeRegex
}
config.MongoDialSettings = tomlConfig.MongoDialSettings
config.MongoSessionSettings = tomlConfig.MongoSessionSettings
config.GtmSettings = tomlConfig.GtmSettings
Expand Down Expand Up @@ -694,13 +670,7 @@ func main() {
}

var filter gtm.OpFilter = nil
filterChain := []gtm.OpFilter{NotMongoFlux, IsInsertOrUpdate, NotSystem, NotChunks}
if config.NsRegex != "" {
filterChain = append(filterChain, FilterWithRegex(config.NsRegex))
}
if config.NsExcludeRegex != "" {
filterChain = append(filterChain, FilterInverseWithRegex(config.NsExcludeRegex))
}
filterChain := []gtm.OpFilter{NotMongoFlux, config.onlyMeasured(), IsInsertOrUpdate}
filter = gtm.ChainOpFilters(filterChain...)
var oplogDatabaseName, oplogCollectionName, cursorTimeout *string
if config.MongoOpLogDatabaseName != "" {
Expand Down

0 comments on commit a9cf704

Please sign in to comment.