Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Chart repo index fetcher is moved to a background job
Browse files Browse the repository at this point in the history
Due to a massive overhead that happened as a consequence of a heavy
chart repo index request pattern (introduced in 0.5.0), application and
installation controller latencies spiked up.

This change is aiming to get rid of ad-hoc repo index fetch approach and
move this heavy operation to a background job.

From now on, every repo instance start polling chart repo index every 10
seconds. Once the data is successfully fetched, it is preserved as a
repo attribute unmarshalled. The first fetch is blocking: assuming
Shipper starts cold, there is no previous cache we can rely upon (it
starts in a new container). On top of it, index data is never cached on
the disk as there is no use for it any longer: in-memory only.

If repo fails to fetch repo index, it behaves quite naively: simply
spins next iteration with the same delay.

Signed-off-by: Oleg Sidorov <oleg.sidorov@booking.com>
  • Loading branch information
Oleg Sidorov committed Aug 13, 2019
1 parent bf0eef9 commit ff75c31
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 70 deletions.
5 changes: 4 additions & 1 deletion pkg/chart/repo/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func (c *Catalog) CreateRepoIfNotExist(repoURL string) (*Repo, error) {
fmt.Errorf("failed to create cache: %v", err),
)
}
repo = NewRepo(repoURL, cache, c.fetcher)
repo, err = NewRepo(repoURL, cache, c.fetcher)
if err != nil {
return nil, err
}
c.repos[name] = repo
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/chart/repo/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func TestCreateRepoIfNotExist(t *testing.T) {

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
c := NewCatalog(testCase.factory, nil)
c := NewCatalog(testCase.factory, func(_ string) ([]byte, error) {
return []byte{}, nil
})
_, err := c.CreateRepoIfNotExist(testCase.url)
if (err == nil && testCase.err != nil) ||
(err != nil && testCase.err == nil) ||
Expand Down
103 changes: 46 additions & 57 deletions pkg/chart/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

// Importing this yaml package is a very crucial point:
Expand All @@ -20,6 +20,7 @@ import (
yaml "github.com/ghodss/yaml"
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/helm/pkg/chartutil"
"k8s.io/helm/pkg/proto/hapi/chart"
"k8s.io/helm/pkg/repo"
Expand All @@ -29,7 +30,7 @@ import (
)

const (
RepoIndexTTL = 5 * time.Second
RepoIndexRefreshPeriod = 10 * time.Second
)

var (
Expand All @@ -38,28 +39,16 @@ var (
)

type Repo struct {
url string
cache Cache
fetcher RemoteFetcher
mutex sync.Mutex
indexFetched time.Time
repoURL string
indexURL string
cache Cache
fetcher RemoteFetcher
index atomic.Value
indexResolved chan struct{}
}

func NewRepo(repoURL string, cache Cache, fetcher RemoteFetcher) *Repo {
return &Repo{
url: repoURL,
cache: cache,
fetcher: fetcher,
}
}

func (r *Repo) isIndexExpired() bool {
return r.indexFetched.Add(RepoIndexTTL).Before(time.Now())
}

// This method is not thread-safe and requires concurrency control by the caller
func (r *Repo) refreshIndex() (*repo.IndexFile, error) {
parsed, err := url.ParseRequestURI(r.url)
func NewRepo(repoURL string, cache Cache, fetcher RemoteFetcher) (*Repo, error) {
parsed, err := url.ParseRequestURI(repoURL)
if err != nil {
return nil, shippererrors.NewChartRepoIndexError(
fmt.Errorf("failed to parse repo URL: %v", err),
Expand All @@ -68,29 +57,50 @@ func (r *Repo) refreshIndex() (*repo.IndexFile, error) {
parsed.Path = path.Join(parsed.Path, "index.yaml")
indexURL := parsed.String()

data, err := r.fetcher(indexURL)
repo := &Repo{
repoURL: repoURL,
indexURL: indexURL,
cache: cache,
fetcher: fetcher,
indexResolved: make(chan struct{}),
}

// runs repo.refreshIndex forever
go wait.Forever(func() {
if err := repo.refreshIndex(); err != nil {
glog.Errorf("failed to refresh repo %q index: %s", repo.repoURL, err)
}
}, RepoIndexRefreshPeriod)

return repo, nil
}

func (r *Repo) refreshIndex() error {
data, err := r.fetcher(r.indexURL)
if err != nil {
return nil, shippererrors.NewChartRepoIndexError(
fmt.Errorf("failed to fetch %q: %v", indexURL, err),
return shippererrors.NewChartRepoIndexError(
fmt.Errorf("failed to fetch %q: %v", r.indexURL, err),
)
}

index, err := loadIndexData(data)
if err != nil {
return nil, shippererrors.NewChartRepoIndexError(
return shippererrors.NewChartRepoIndexError(
fmt.Errorf("failed to load index file: %v", err),
)
}

if err := r.cache.Store("index.yaml", data); err != nil {
return nil, shippererrors.NewChartRepoIndexError(
fmt.Errorf("failed to cache index.yaml: %v", err),
)
}
r.index.Store(index)

r.indexFetched = time.Now()
// close indexResolved once
select {
default:
close(r.indexResolved)
case <-r.indexResolved:
// already closed
}

return index, nil
return nil
}

func (r *Repo) ResolveVersion(chartspec *shipper.Chart) (*repo.ChartVersion, error) {
Expand Down Expand Up @@ -130,31 +140,10 @@ func (r *Repo) ResolveVersion(chartspec *shipper.Chart) (*repo.ChartVersion, err
}

func (r *Repo) FetchChartVersions(chartspec *shipper.Chart) (repo.ChartVersions, error) {
r.mutex.Lock()
if r.isIndexExpired() {
if _, err := r.refreshIndex(); err != nil {
glog.Warningf("failed to refresh repo[%s] index: %s", chartspec.RepoURL, err)
}
}
r.mutex.Unlock()

data, err := r.cache.Fetch("index.yaml")
if err != nil {
return nil, shippererrors.NewChartFetchFailureError(
chartspec,
err,
)
}

index, err := loadIndexData(data)
if err != nil {
return nil, shippererrors.NewChartFetchFailureError(
chartspec,
err,
)
}
<-r.indexResolved

vs, ok := index.Entries[chartspec.Name]
vs, ok := r.index.Load().(*repo.IndexFile).Entries[chartspec.Name]
if !ok {
return nil, repo.ErrNoChartName
}
Expand Down Expand Up @@ -234,7 +223,7 @@ func (r *Repo) FetchRemote(cv *repo.ChartVersion) (*chart.Chart, error) {

// If the URL is relative (no scheme), prepend the chart repo's base URL
if !chartURL.IsAbs() {
repoURL, err := url.Parse(r.url)
repoURL, err := url.Parse(r.repoURL)
if err != nil {
return nil, err
}
Expand Down
36 changes: 26 additions & 10 deletions pkg/chart/repo/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,34 @@ func TestRefreshIndex(t *testing.T) {
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
var fetchedURL string
var mutex sync.Mutex

cache := NewTestCache(testCase.name)

repo := NewRepo(
repo, err := NewRepo(
testCase.repoURL,
cache,
func(url string) ([]byte, error) {
mutex.Lock()
defer mutex.Unlock()
fetchedURL = url
return []byte(testCase.fetchBody), testCase.fetchErr
},
)

_, err := repo.refreshIndex()
if err != nil {
t.Fatalf("failed to initialize repo: %s", err)
}

if !equivalent(err, testCase.expectedErr) {
if err := repo.refreshIndex(); !equivalent(err, testCase.expectedErr) {
t.Fatalf("Unexpected error: %q, want: %q", err, testCase.expectedErr)
}

mutex.Lock()
if fetchedURL != testCase.expectedFetchURL {
t.Fatalf("Unexpected fetch URL: %q, want: %q", fetchedURL, testCase.expectedFetchURL)
}
mutex.Unlock()
})
}
}
Expand Down Expand Up @@ -264,18 +271,21 @@ func TestResolveVersion(t *testing.T) {

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
repo := NewRepo(
repo, err := NewRepo(
"https://charts.example.com",
cache,
localFetch(t),
)
if _, err := repo.refreshIndex(); err != nil {
if err != nil {
t.Fatalf("failed to initialize repo: %s", err)
}
if err := repo.refreshIndex(); err != nil {
t.Fatalf(err.Error())
}
chartspec := &shipper.Chart{
Name: testCase.chartname,
Version: testCase.verspec,
RepoURL: repo.url,
RepoURL: repo.repoURL,
}
gotcv, goterr := repo.ResolveVersion(chartspec)

Expand Down Expand Up @@ -356,19 +366,22 @@ func TestFetch(t *testing.T) {

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
repo := NewRepo(
repo, err := NewRepo(
"https://chart.example.com",
cache,
localFetch(t),
)
if _, err := repo.refreshIndex(); err != nil {
if err != nil {
t.Fatalf("failed to initialize repo: %s", err)
}
if err := repo.refreshIndex(); err != nil {
t.Fatalf(err.Error())
}

chartspec := &shipper.Chart{
Name: testCase.chartname,
Version: testCase.chartver,
RepoURL: repo.url,
RepoURL: repo.repoURL,
}

chart, err := repo.Fetch(chartspec)
Expand All @@ -394,7 +407,7 @@ func TestFetch(t *testing.T) {
func TestConcurrentFetchChartVersionsRefreshesIndexOnce(t *testing.T) {
var cnt int
cache := NewTestCache("test-cache")
repo := NewRepo(
repo, err := NewRepo(
"https://chart.example.com",
cache,
func(url string) ([]byte, error) {
Expand All @@ -405,6 +418,9 @@ func TestConcurrentFetchChartVersionsRefreshesIndexOnce(t *testing.T) {
return []byte(IndexYamlResp), nil
},
)
if err != nil {
t.Fatalf("failed to initialize repo: %s", err)
}

// Chart contents doesn't really matter
chartspec := &shipper.Chart{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/release/release_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2123,7 +2123,7 @@ func workingOnIncumbentCapacity(percent int, wg *sync.WaitGroup, t *testing.T) {
func TestShouldNotProducePatches(t *testing.T) {
var wg sync.WaitGroup

for i := 0; i < 50; i++ {
for i := 0; i < 25; i++ {
wg.Add(1)
go workingOnContenderCapacity(i, &wg, t)

Expand Down

0 comments on commit ff75c31

Please sign in to comment.