From a9cf70467e4ee39de558b3b8cd0c92aeed922b7c Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Sun, 12 Mar 2017 02:29:44 +0000 Subject: [PATCH] performance op on filters --- mongofluxd.go | 54 ++++++++++++--------------------------------------- 1 file changed, 12 insertions(+), 42 deletions(-) diff --git a/mongofluxd.go b/mongofluxd.go index 102b8c8..49740b0 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -23,12 +23,10 @@ import ( ) var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) -var chunksRegex = regexp.MustCompile("\\.chunks$") -var systemsRegex = regexp.MustCompile("system\\..+$") const ( Name = "mongofluxd" - Version = "0.1.0" + Version = "0.2.0" mongoUrlDefault = "localhost" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 @@ -74,8 +72,6 @@ type configOptions struct { MongoSessionSettings mongoSessionSettings `toml:"mongo-session-settings"` GtmSettings gtmSettings `toml:"gtm-settings"` ResumeName string `toml:"resume-name"` - NsRegex string `toml:"namespace-regex"` - NsExcludeRegex string `toml:"namespace-exclude-regex"` Version bool Verbose bool Resume bool @@ -354,28 +350,6 @@ func NotMongoFlux(op *gtm.Op) bool { return op.GetDatabase() != Name } -func NotChunks(op *gtm.Op) bool { - return !chunksRegex.MatchString(op.GetCollection()) -} - -func NotSystem(op *gtm.Op) bool { - return !systemsRegex.MatchString(op.GetCollection()) -} - -func FilterWithRegex(regex string) gtm.OpFilter { - var validNameSpace = regexp.MustCompile(regex) - return func(op *gtm.Op) bool { - return validNameSpace.MatchString(op.Namespace) - } -} - -func FilterInverseWithRegex(regex string) gtm.OpFilter { - var invalidNameSpace = regexp.MustCompile(regex) - return func(op *gtm.Op) bool { - return !invalidNameSpace.MatchString(op.Namespace) - } -} - func ResumeWork(ctx *gtm.OpCtx, session *mgo.Session, config *configOptions) { col := session.DB(Name).C("resume") doc := make(map[string]interface{}) @@ -395,6 +369,16 @@ func SaveTimestamp(session *mgo.Session, ts bson.MongoTimestamp, resumeName stri return err } +func (config *configOptions) onlyMeasured() gtm.OpFilter { + measured := make(map[string]bool) + for _, m := range config.Measurement { + measured[m.Namespace] = true + } + return func(op *gtm.Op) bool { + return measured[op.Namespace] + } +} + func (config *configOptions) ParseCommandLineFlags() *configOptions { flag.StringVar(&config.InfluxUrl, "influx-url", "", "InfluxDB connection URL") flag.StringVar(&config.InfluxUser, "influx-user", "", "InfluxDB user name") @@ -418,8 +402,6 @@ func (config *configOptions) ParseCommandLineFlags() *configOptions { flag.BoolVar(&config.ResumeWriteUnsafe, "resume-write-unsafe", false, "True to speedup writes of the last timestamp synched for resuming at the cost of error checking") flag.BoolVar(&config.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch") flag.StringVar(&config.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'") - flag.StringVar(&config.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (.). Only operations which match are synched to elasticsearch") - flag.StringVar(&config.NsExcludeRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (.). Only operations which do not match are synched to elasticsearch") flag.BoolVar(&config.DirectReads, "direct-reads", false, "Set to true to read directly from MongoDB collections") flag.BoolVar(&config.ExitAfterDirectReads, "exit-after-direct-reads", false, "Set to true to exit after direct reads are complete") flag.Parse() @@ -505,12 +487,6 @@ func (config *configOptions) LoadConfigFile() *configOptions { if config.Resume && config.ResumeName == "" { config.ResumeName = tomlConfig.ResumeName } - if config.NsRegex == "" { - config.NsRegex = tomlConfig.NsRegex - } - if config.NsExcludeRegex == "" { - config.NsExcludeRegex = tomlConfig.NsExcludeRegex - } config.MongoDialSettings = tomlConfig.MongoDialSettings config.MongoSessionSettings = tomlConfig.MongoSessionSettings config.GtmSettings = tomlConfig.GtmSettings @@ -694,13 +670,7 @@ func main() { } var filter gtm.OpFilter = nil - filterChain := []gtm.OpFilter{NotMongoFlux, IsInsertOrUpdate, NotSystem, NotChunks} - if config.NsRegex != "" { - filterChain = append(filterChain, FilterWithRegex(config.NsRegex)) - } - if config.NsExcludeRegex != "" { - filterChain = append(filterChain, FilterInverseWithRegex(config.NsExcludeRegex)) - } + filterChain := []gtm.OpFilter{NotMongoFlux, config.onlyMeasured(), IsInsertOrUpdate} filter = gtm.ChainOpFilters(filterChain...) var oplogDatabaseName, oplogCollectionName, cursorTimeout *string if config.MongoOpLogDatabaseName != "" {