Skip to content

Commit

Permalink
Refactor pinot visibility and add migration config (cadence-workflow#…
Browse files Browse the repository at this point in the history
…6072)

* Refactor visibility dual manager to be more generic, add migration mode to pinot visibility

* Update to use bitnami kafka image

* add should start indexer help method

* Add messaging client mock and test for factory new visibility manager
  • Loading branch information
neil-xie authored Jun 21, 2024
1 parent 1e300fc commit 83ebf7a
Show file tree
Hide file tree
Showing 17 changed files with 716 additions and 151 deletions.
55 changes: 37 additions & 18 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,32 @@ func (s *server) startService() common.Daemon {
log.Fatalf("not able to find advanced visibility store in config: %v", advancedVisStoreKey)
}

params.ESConfig = advancedVisStore.ElasticSearch
// there are 3 circumstances:
// 1. advanced visibility store == elasticsearch, use ESClient and visibilityDualManager
// 2. advanced visibility store == pinot and in process of migration, use ESClient, PinotClient and and visibilityTripleManager
// 3. advanced visibility store == pinot and not migrating, use PinotClient and visibilityDualManager
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
// components like ESAnalyzer is still using ElasticSearch
// The plan is to clean those after we switch to operate on Pinot
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("Missing Elasticsearch config")
if advancedVisStore.Pinot.Migration.Enabled {
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("not able to find elasticsearch visibility store in config")
}
params.ESConfig = esVisibilityStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient

// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
}
} else {
params.ESClient = nil
}
params.ESConfig = esVisibilityStore.ElasticSearch
params.PinotConfig = advancedVisStore.Pinot
pinotBroker := params.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
Expand All @@ -239,18 +256,20 @@ func (s *server) startService() common.Daemon {
}
pinotClient := pnt.NewPinotClient(pinotRawClient, params.Logger, params.PinotConfig)
params.PinotClient = pinotClient
}
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient
} else {
params.ESConfig = advancedVisStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient

// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions common/config/pinot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ package config
// PinotVisibilityConfig for connecting to Pinot
type (
PinotVisibilityConfig struct {
Cluster string `yaml:"cluster"` //nolint:govet
Broker string `yaml:"broker"` //nolint:govet
Table string `yaml:"table"` //nolint:govet
ServiceName string `yaml:"serviceName"` //nolint:govet
Cluster string `yaml:"cluster"` //nolint:govet
Broker string `yaml:"broker"` //nolint:govet
Table string `yaml:"table"` //nolint:govet
ServiceName string `yaml:"serviceName"` //nolint:govet
Migration PinotMigration `yaml:"migration"` //nolint:govet
}

PinotMigration struct {
Enabled bool `yaml:"enabled"` //nolint:govet
}
)
Loading

0 comments on commit 83ebf7a

Please sign in to comment.