-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Reindexer
Reindexing is e.g. necessary when you want to apply changes to your mapping in existing data. However, reindexing is currently a client-side operation. To make things easier for application programmers, Elastic contains a helper to reindex indices, even between clusters.
The Elasticsearch documentation has a section about reindexing if you want more information.
Here's an example of the Reindexer with various options:
client, err := elastic.NewClient()
if err != nil { ... }
ix := client.Reindex("source", "target")
ix = ix.Progress(func(current, total int64) {
fmt.Printf("%d of %d\r", current, total)
})
result, err := ix.Do()
if err != nil { ... }
fmt.Printf("%d operations succeeded, %d failed", result.Success, result.Failed)
As you see in the example above, you can provide a progress callback function with Progress(func(int64,int64))
.
If you need more information about which bulk item requests exactly failed, use StatsOnly(false)
. The result will then contain all failed items in result.Errors
([]*elastic.BulkResponseItem
).
If you need to copy data from one cluster to another, create a second client for the new cluster and set it with TargetClient(*elastic.Client)
.
You can also specify the chunk size of bulk items sent to Elasticsearch with BulkSize(int)
. The default is 500.
Also, a scroll timeout can be specified with Scroll(string)
, e.g. Scroll("15m")
. The default is 5 minutes (5m).
The Reindexer has become more versatile with this PR (thanks to @nwolff). In fact, copying data from a source index to a target index is just a special case of the more general Reindexer.
If you use the Reindexer class directly (not via client.Reindexer(...)
), you can pass a handler that will be called for each hit. Here's an example of how to use that to e.g. preserve the _ttl
of each hit:
// Carries over the source item's ttl to the reindexed item
copyWithTTL := func(hit *elastic.SearchHit, bulkService *elastic.BulkService) error {
source := make(map[string]interface{})
if err := json.Unmarshal(*hit.Source, &source); err != nil {
return err
}
req := elastic.NewBulkIndexRequest().Index(testIndexName2).Type(hit.Type).Id(hit.Id).Doc(source)
if ttl, ok := hit.Fields["_ttl"].(float64); ok {
req.Ttl(int64(ttl))
}
bulkService.Add(req)
return nil
}
r := NewReindexer(client, testIndexName, copyWithTTL).ScanFields("_source", "_ttl")
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
You can also use the Reindexer to copy data from one cluster to another. You do so by providing a different client for the target. This code will use the sourceClient
to copy the sourceIndexName
to the targetIndexName
, using the targetClient
.
sourceClient, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
...
targetClient, err := elastic.NewClient(elastic.SetURL("http://localhost:8200"))
...
r := elastic.NewReindexer(sourceClient, sourceIndexName, elastic.CopyToTargetIndex(targetIndexName))
r = r.TargetClient(targetClient)
ret, err := r.Do()
if err != nil {
t.Fatal(err)
}
More examples are available in reindexer_test.go.