From 3fc1ed105bc89221167525c65f9db5f8e00b10c9 Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Thu, 12 Oct 2023 21:01:26 -0700 Subject: [PATCH] checkpoint every 100 resources - for more frequent progress updates. --- clients/internal/base/fhir401_client.go | 47 +++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/clients/internal/base/fhir401_client.go b/clients/internal/base/fhir401_client.go index 9916bda1e..64f554c7a 100644 --- a/clients/internal/base/fhir401_client.go +++ b/clients/internal/base/fhir401_client.go @@ -64,12 +64,23 @@ func (c *SourceClientFHIR401) SyncAllByPatientEverythingBundle(db models.Databas //lookup table for every resource ID found by Fasten lookupResourceReferences := map[string]bool{} - for _, apiModel := range rawResourceModels { + for ndx, apiModel := range rawResourceModels { err = c.ProcessResource(db, apiModel, lookupResourceReferences, internalFragmentReferenceLookup, &summary) if err != nil { syncErrors[apiModel.SourceResourceType] = err continue } + + if ndx%100 == 0 { + db.BackgroundJobCheckpoint(c.Context, + map[string]interface{}{ + "stage": "EverythingBundle", + "stage_progress": ndx, + "summary": summary, + }, + map[string]interface{}{"errors": syncErrors}, + ) + } } //process any pending resources @@ -149,12 +160,27 @@ func (c *SourceClientFHIR401) SyncAllByResourceName(db models.DatabaseRepository } summary.TotalResources += len(rawResourceModels) - for _, apiModel := range rawResourceModels { + for ndx, apiModel := range rawResourceModels { err = c.ProcessResource(db, apiModel, lookupResourceReferences, internalFragmentReferenceLookup, &summary) if err != nil { syncErrors[resourceType] = err continue } + + if ndx%100 == 0 { + stageErrorData := map[string]interface{}{} + if len(syncErrors) > 0 { + stageErrorData["errors"] = syncErrors + } + db.BackgroundJobCheckpoint(c.Context, + map[string]interface{}{ + "stage": resourceType, + "stage_progress": ndx, + "summary": summary, + }, + stageErrorData, + ) + } } checkpointErrorData := map[string]interface{}{} @@ -222,7 +248,7 @@ func (c *SourceClientFHIR401) ProcessPendingResources(db models.DatabaseReposito //process pending resources summary.TotalResources += len(pendingResourceReferences) - for _, pendingResourceIdOrUri := range pendingResourceReferences { + for ndx, pendingResourceIdOrUri := range pendingResourceReferences { var resourceRaw map[string]interface{} resourceSourceUri, err := c.GetRequest(pendingResourceIdOrUri, &resourceRaw) @@ -268,6 +294,21 @@ func (c *SourceClientFHIR401) ProcessPendingResources(db models.DatabaseReposito syncErrors[pendingResourceIdOrUri] = err continue } + + if ndx%100 == 0 { + stageErrorData := map[string]interface{}{} + if len(syncErrors) > 0 { + stageErrorData["errors"] = syncErrors + } + db.BackgroundJobCheckpoint(c.Context, + map[string]interface{}{ + "stage": "PendingResources", + "stage_progress": ndx, + "summary": summary, + }, + stageErrorData, + ) + } } extractionLoopCount += 1 }