Skip to content

Commit

Permalink
CP 974 and CP 896 Add index endpoint url to resource and Add timeouts (
Browse files Browse the repository at this point in the history
…#32)

Add `marqo_endpoint` url to provider `resource`
Add configurable timeouts to index creation (default 30 minutes)
  • Loading branch information
RaynorChavez authored Dec 16, 2024
1 parent 4e31c20 commit 611396d
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 11 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
name: Terraform Provider Acceptance Tests
needs: build
runs-on: ubuntu-latest
timeout-minutes: 60
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
Expand All @@ -55,9 +55,9 @@ jobs:
terraform_wrapper: false
- run: go mod download
- name: Run acceptance tests
run: go test -v -cover -timeout 50m ./...
run: go test -v -cover -timeout 90m ./...
env:
TF_ACC: "1"
MARQO_HOST: ${{ secrets.MARQO_HOST }}
MARQO_API_KEY: ${{ secrets.MARQO_API_KEY }}
timeout-minutes: 50
timeout-minutes: 90
17 changes: 17 additions & 0 deletions docs/resources/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ description: |-
- `index_name` (String) The name of the index.
- `settings` (Attributes) The settings for the index. (see [below for nested schema](#nestedatt--settings))

### Optional

- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))

### Read-Only

- `marqo_endpoint` (String) The Marqo endpoint used by the index

<a id="nestedatt--settings"></a>
### Nested Schema for `settings`

Expand Down Expand Up @@ -155,3 +163,12 @@ Optional:

- `split_length` (Number)
- `split_overlap` (Number)



<a id="nestedatt--timeouts"></a>
### Nested Schema for `timeouts`

Optional:

- `create` (String) Time to wait for index to be ready (e.g., '30m', '1h'). Default is 30m.
186 changes: 178 additions & 8 deletions internal/provider/indices_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"strconv"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
Expand All @@ -31,8 +32,14 @@ type indicesResource struct {

// IndexResourceModel maps the resource schema data.
type IndexResourceModel struct {
IndexName types.String `tfsdk:"index_name"`
Settings IndexSettingsModel `tfsdk:"settings"`
IndexName types.String `tfsdk:"index_name"`
Settings IndexSettingsModel `tfsdk:"settings"`
MarqoEndpoint types.String `tfsdk:"marqo_endpoint"`
Timeouts *timeouts `tfsdk:"timeouts"`
}

type timeouts struct {
Create types.String `tfsdk:"create"`
}

type IndexSettingsModel struct {
Expand Down Expand Up @@ -138,6 +145,19 @@ func (r *indicesResource) Schema(_ context.Context, _ resource.SchemaRequest, re
Required: true,
Description: "The name of the index.",
},
"marqo_endpoint": schema.StringAttribute{
Computed: true,
Description: "The Marqo endpoint used by the index",
},
"timeouts": schema.SingleNestedAttribute{
Optional: true,
Attributes: map[string]schema.Attribute{
"create": schema.StringAttribute{
Optional: true,
Description: "Time to wait for index to be ready (e.g., '30m', '1h'). Default is 30m.",
},
},
},
"settings": schema.SingleNestedAttribute{
Required: true,
Description: "The settings for the index.",
Expand Down Expand Up @@ -173,7 +193,6 @@ func (r *indicesResource) Schema(_ context.Context, _ resource.SchemaRequest, re
Optional: true,
ElementType: types.StringType,
},

"inference_type": schema.StringAttribute{
Required: true,
},
Expand Down Expand Up @@ -446,12 +465,14 @@ func convertModelPropertiesToResource(props *go_marqo.ModelProperties) *ModelPro
return model
}

func (r *indicesResource) findAndCreateState(indices []go_marqo.IndexDetail, indexName string) (*IndexResourceModel, bool) {
func (r *indicesResource) findAndCreateState(indices []go_marqo.IndexDetail, indexName string, existingTimeouts *timeouts) (*IndexResourceModel, bool) {
for _, indexDetail := range indices {
if indexDetail.IndexName == indexName {
return &IndexResourceModel{
//ID: types.StringValue(indexDetail.IndexName),
IndexName: types.StringValue(indexDetail.IndexName),
IndexName: types.StringValue(indexDetail.IndexName),
MarqoEndpoint: types.StringValue(indexDetail.MarqoEndpoint),
Timeouts: existingTimeouts,
Settings: IndexSettingsModel{
Type: types.StringValue(indexDetail.Type),
VectorNumericType: types.StringValue(indexDetail.VectorNumericType),
Expand Down Expand Up @@ -514,7 +535,7 @@ func (r *indicesResource) Read(ctx context.Context, req resource.ReadRequest, re
return
}

newState, found := r.findAndCreateState(indices, state.IndexName.ValueString())
newState, found := r.findAndCreateState(indices, state.IndexName.ValueString(), state.Timeouts)

// Handle inference_type field
if newState != nil {
Expand Down Expand Up @@ -796,6 +817,21 @@ func (r *indicesResource) Create(ctx context.Context, req resource.CreateRequest
//}
}

// Parse timeout duration
timeoutDuration := 30 * time.Minute // default timeout
if model.Timeouts != nil && model.Timeouts.Create.ValueString() != "" {
parsedTimeout, err := time.ParseDuration(model.Timeouts.Create.ValueString())
if err != nil {
resp.Diagnostics.AddError(
"Invalid Timeout Duration",
fmt.Sprintf("Could not parse timeout duration: %s. Expected format: '30m', '1h', etc.", err),
)
return
}
timeoutDuration = parsedTimeout
tflog.Info(ctx, fmt.Sprintf("Using configured timeout of %v", timeoutDuration))
}

err := r.marqoClient.CreateIndex(model.IndexName.ValueString(), settings)
if err != nil {
if strings.Contains(err.Error(), "already exists") {
Expand All @@ -807,7 +843,7 @@ func (r *indicesResource) Create(ctx context.Context, req resource.CreateRequest
return
}

existingState, found := r.findAndCreateState(indices, model.IndexName.ValueString())
existingState, found := r.findAndCreateState(indices, model.IndexName.ValueString(), model.Timeouts)
if !found {
resp.Diagnostics.AddError("Failed to Find Index", fmt.Sprintf("Index %s not found after creation", model.IndexName.ValueString()))
return
Expand Down Expand Up @@ -839,9 +875,138 @@ func (r *indicesResource) Create(ctx context.Context, req resource.CreateRequest
return
}

// Set the index name as the ID in the Terraform state
// Set initial state
model.MarqoEndpoint = types.StringValue("pending")
diags = resp.State.Set(ctx, &model)
resp.Diagnostics.Append(diags...)

// Wait for index to be ready
timeout := time.After(timeoutDuration)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

tflog.Info(ctx, fmt.Sprintf("Waiting up to %v for index %s to be ready...",
timeoutDuration,
model.IndexName.ValueString()))

start := time.Now()
for {
select {
case <-timeout:
// Check the current status of the index
indices, err := r.marqoClient.ListIndices()
if err != nil {
resp.Diagnostics.AddError(
"Failed to Check Index Status",
fmt.Sprintf("Could not check index status after timeout: %s", err),
)
return
}

var indexStatus string
for _, index := range indices {
if index.IndexName == model.IndexName.ValueString() {
indexStatus = index.IndexStatus
break
}
}

if indexStatus == "CREATING" {
resp.Diagnostics.AddError(
"Timeout Waiting for Index",
fmt.Sprintf("Index %s did not become ready within the %v timeout period. "+
"The index is still being created in the cloud and cannot be deleted at this time. "+
"You may need to manually delete it later using the Marqo console or API.",
model.IndexName.ValueString(),
timeoutDuration),
)
return
}

// If the index is in any other state, try to delete it
deleteErr := r.marqoClient.DeleteIndex(model.IndexName.ValueString())
if deleteErr != nil {
resp.Diagnostics.AddError(
"Cleanup Failed",
fmt.Sprintf("Index %s creation timed out after %v and cleanup failed: %s. "+
"Manual cleanup may be required.",
model.IndexName.ValueString(),
timeoutDuration,
deleteErr),
)
return
}

resp.Diagnostics.AddError(
"Timeout Waiting for Index",
fmt.Sprintf("Index %s did not become ready within the %v timeout period and has been deleted.",
model.IndexName.ValueString(),
timeoutDuration),
)
return
case <-ticker.C:
indices, err := r.marqoClient.ListIndices()
if err != nil {
resp.Diagnostics.AddError(
"Failed to Check Index Status",
fmt.Sprintf("Could not check index status: %s", err),
)
continue
}
for _, index := range indices {
if index.IndexName == model.IndexName.ValueString() {
tflog.Info(ctx, fmt.Sprintf("Index %s status: %s (elapsed: %v)",
model.IndexName.ValueString(),
index.IndexStatus,
time.Since(start)))
if index.IndexStatus == "READY" {
tflog.Info(ctx, fmt.Sprintf("Index %s is now ready (total time: %v)",
model.IndexName.ValueString(),
time.Since(start)))

// Do final read to get the complete state
readResp := resource.ReadResponse{State: resp.State}
r.Read(ctx, resource.ReadRequest{State: resp.State}, &readResp)

if readResp.Diagnostics.HasError() {
resp.Diagnostics.Append(readResp.Diagnostics...)
return
}

// Update the response state with the read state
resp.State = readResp.State
return
} else if index.IndexStatus == "FAILED" {
// Attempt to delete the failed index
deleteErr := r.marqoClient.DeleteIndex(model.IndexName.ValueString())
if deleteErr != nil {
resp.Diagnostics.AddError(
"Cleanup Failed",
fmt.Sprintf("Index %s creation failed and cleanup attempt failed: %s. "+
"Manual cleanup may be required.",
model.IndexName.ValueString(),
deleteErr),
)
return
}

resp.Diagnostics.AddError(
"Index Creation Failed",
fmt.Sprintf("Index %s creation failed after %v and has been deleted. "+
"Please check the Marqo logs for more details.",
model.IndexName.ValueString(),
time.Since(start)),
)
return
}
break
}
}
tflog.Info(ctx, fmt.Sprintf("Index %s not ready yet, continuing to wait... (elapsed: %v)",
model.IndexName.ValueString(),
time.Since(start)))
}
}
}

func (r *indicesResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
Expand Down Expand Up @@ -887,6 +1052,11 @@ func (r *indicesResource) Update(ctx context.Context, req resource.UpdateRequest
return
}

// Preserve computed/meta fields from current state
var state IndexResourceModel
model.MarqoEndpoint = state.MarqoEndpoint
model.Timeouts = state.Timeouts

// Set the index name as the ID in the Terraform state
diags = resp.State.Set(ctx, &model)
resp.Diagnostics.Append(diags...)
Expand Down

0 comments on commit 611396d

Please sign in to comment.