Skip to content

Commit

Permalink
Merge pull request #15 from fastenhealth/ndjson_phr_support
Browse files Browse the repository at this point in the history
  • Loading branch information
AnalogJ authored Nov 9, 2023
2 parents dc761dd + 06e6f48 commit 08f4643
Show file tree
Hide file tree
Showing 16 changed files with 2,623 additions and 110 deletions.
174 changes: 74 additions & 100 deletions clients/internal/manual/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package manual

import (
"context"
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-sources/clients/internal/base"
"github.com/fastenhealth/fasten-sources/clients/models"
"github.com/fastenhealth/fasten-sources/pkg"
"github.com/fastenhealth/gofhir-models/fhir401"
fhir401utils "github.com/fastenhealth/gofhir-models/fhir401/utils"
"github.com/fastenhealth/gofhir-models/fhir430"
fhir430utils "github.com/fastenhealth/gofhir-models/fhir430/utils"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"io"
"net/http"
Expand Down Expand Up @@ -77,39 +75,27 @@ func (m ManualClient) SyncAllBundle(db models.DatabaseRepository, bundleFile *os
//lookup table for bundle internal references -> relative references (used heavily by file Bundles)
internalFragmentReferenceLookup := map[string]string{}

switch bundleType {
case pkg.FhirVersion430:
bundle430Data := fhir430.Bundle{}
err := base.UnmarshalJson(bundleFile, &bundle430Data)
if err != nil {
return summary, fmt.Errorf("an error occurred while parsing 4.3.0 bundle: %w", err)
}
client, err := base.GetSourceClientFHIR430(m.FastenEnv, m.Context, m.Logger, m.SourceCredential, http.DefaultClient)
if err != nil {
return summary, fmt.Errorf("an error occurred while creating 4.3.0 client: %w", err)
}
rawResourceList, err = client.ProcessBundle(bundle430Data)
if err != nil {
return summary, fmt.Errorf("an error occurred while processing 4.3.0 resources: %w", err)
}
//retrieve the FHIR client
client, err := base.GetSourceClientFHIR401(m.FastenEnv, m.Context, m.Logger, m.SourceCredential, http.DefaultClient)
if err != nil {
return summary, fmt.Errorf("an error occurred while creating 4.0.1 client: %w", err)
}

//parse the document
documentType, err := GetFileDocumentType(bundleFile)
if err != nil {
return summary, err
}
m.Logger.Infof("Begin processing document: %s", documentType)
switch documentType {
case pkg.DocumentTypeFhirBundle:

//for _, apiModel := range rawResourceList {
// err = client.ProcessResource(db, apiModel, lookupResourceReferences, &summary)
// if err != nil {
// syncErrors[apiModel.SourceResourceType] = err
// continue
// }
//}
case pkg.FhirVersion401:
bundle401Data := fhir401.Bundle{}
err := base.UnmarshalJson(bundleFile, &bundle401Data)
if err != nil {
return summary, fmt.Errorf("an error occurred while parsing 4.0.1 bundle: %w", err)
}
client, err := base.GetSourceClientFHIR401(m.FastenEnv, m.Context, m.Logger, m.SourceCredential, http.DefaultClient)
if err != nil {
return summary, fmt.Errorf("an error occurred while creating 4.0.1 client: %w", err)
}

rawResourceList, internalFragmentReferenceLookup, err = client.ProcessBundle(bundle401Data)
if err != nil {
return summary, fmt.Errorf("an error occurred while processing 4.0.1 resources: %w", err)
Expand All @@ -123,9 +109,54 @@ func (m ManualClient) SyncAllBundle(db models.DatabaseRepository, bundleFile *os
}
}

case pkg.DocumentTypeFhirNDJSON:
d := json.NewDecoder(bundleFile)
counter := 0
for {

var resource json.RawMessage
err := d.Decode(&resource)
if err != nil {
// io.EOF is expected at end of stream.
if err == io.EOF {
break //we're done
} else {
continue //skip this document, invalid json
}
}

resourceObj, err := fhir401utils.MapToResource(resource, false)
if err != nil {
syncErrors[fmt.Sprintf("index: %d", counter)] = err
continue
}

resourceObjTyped := resourceObj.(models.ResourceInterface)
resourceType, resourceId := resourceObjTyped.ResourceRef()
if resourceId == nil {
syncErrors[fmt.Sprintf("%s (index: %d)", resourceType, counter)] = fmt.Errorf("resource ID is nil, skipping")
continue
}

apiModel := models.RawResourceFhir{
SourceResourceID: *resourceId,
SourceResourceType: resourceType,
ResourceRaw: resource,
}
rawResourceList = append(rawResourceList, apiModel)
err = client.ProcessResource(db, apiModel, lookupResourceReferences, internalFragmentReferenceLookup, &summary)
if err != nil {
syncErrors[apiModel.SourceResourceType] = err
continue
}

counter += 1
}
}
summary.TotalResources = len(rawResourceList)

m.Logger.Infof("Completed document processing: %d resources", summary.TotalResources)

if len(syncErrors) > 0 {
//TODO: ignore errors.
m.Logger.Errorf("%d error(s) occurred during sync. \n %v", len(syncErrors), syncErrors)
Expand All @@ -134,29 +165,18 @@ func (m ManualClient) SyncAllBundle(db models.DatabaseRepository, bundleFile *os
}

func (m ManualClient) ExtractPatientId(bundleFile *os.File) (string, pkg.FhirVersion, error) {
// TODO: find a way to correctly detect bundle version

bundleType := pkg.FhirVersion401
patientIds, err := parse401Bundle(bundleFile)
bundleFile.Seek(0, io.SeekStart)

//fallback to 430 bundle
if err != nil || patientIds == nil || len(patientIds) == 0 {
bundleType = pkg.FhirVersion430

patientIds, err = parse430Bundle(bundleFile)
bundleFile.Seek(0, io.SeekStart)

}
documentType, err := GetFileDocumentType(bundleFile)
if err != nil {
//failed to parse the bundle as 401 and 430, return an error
return "", "", fmt.Errorf("could not determine bundle version: %v", err)
} else if patientIds == nil || len(patientIds) == 0 {
return "", "", fmt.Errorf("could not determine patient id")
} else {
//reset reader

return strings.TrimLeft(patientIds[0], "Patient/"), bundleType, nil
return "", pkg.FhirVersion401, err
}

switch documentType {
case pkg.DocumentTypeFhirBundle:
return extractPatientIdBundle(bundleFile)
case pkg.DocumentTypeFhirNDJSON:
return extractPatientIdNDJson(bundleFile)
default:
return "", pkg.FhirVersion401, fmt.Errorf("unsupported document type: %s", documentType)
}
}

Expand All @@ -169,52 +189,6 @@ func GetSourceClientManual(env pkg.FastenLighthouseEnvType, ctx context.Context,
}, nil
}

//TODO: find a better, more generic way to do this.

func parse401Bundle(bundleFile *os.File) ([]string, error) {
bundle401Data := fhir401.Bundle{}
//try parsing the bundle as a 401 bundle
if err := base.UnmarshalJson(bundleFile, &bundle401Data); err == nil {
patientIds := lo.FilterMap[fhir401.BundleEntry, string](bundle401Data.Entry, func(bundleEntry fhir401.BundleEntry, _ int) (string, bool) {
parsedResource, err := fhir401utils.MapToResource(bundleEntry.Resource, false)
if err != nil {
return "", false
}
typedResource := parsedResource.(models.ResourceInterface)
resourceType, resourceId := typedResource.ResourceRef()

if resourceId == nil || len(*resourceId) == 0 {
return "", false
}
return *resourceId, resourceType == fhir430.ResourceTypePatient.String()
})

return patientIds, nil
} else {
return nil, err
}

}

func parse430Bundle(bundleFile *os.File) ([]string, error) {
bundle430Data := fhir430.Bundle{}
//try parsing the bundle as a 430 bundle
if err := base.UnmarshalJson(bundleFile, &bundle430Data); err == nil {
patientIds := lo.FilterMap[fhir430.BundleEntry, string](bundle430Data.Entry, func(bundleEntry fhir430.BundleEntry, _ int) (string, bool) {
parsedResource, err := fhir430utils.MapToResource(bundleEntry.Resource, false)
if err != nil {
return "", false
}
typedResource := parsedResource.(models.ResourceInterface)
resourceType, resourceId := typedResource.ResourceRef()

if resourceId == nil || len(*resourceId) == 0 {
return "", false
}
return *resourceId, resourceType == fhir430.ResourceTypePatient.String()
})
return patientIds, nil
} else {
return nil, err
}
func cleanPatientIdPrefix(patientId string) string {
return strings.TrimLeft(patientId, "Patient/")
}
91 changes: 91 additions & 0 deletions clients/internal/manual/client_bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package manual

import (
"fmt"
"github.com/fastenhealth/fasten-sources/clients/internal/base"
"github.com/fastenhealth/fasten-sources/clients/models"
"github.com/fastenhealth/fasten-sources/pkg"
"github.com/fastenhealth/gofhir-models/fhir401"
fhir401utils "github.com/fastenhealth/gofhir-models/fhir401/utils"
"github.com/fastenhealth/gofhir-models/fhir430"
fhir430utils "github.com/fastenhealth/gofhir-models/fhir430/utils"
"github.com/samber/lo"
"io"
"os"
)

func extractPatientIdBundle(bundleFile *os.File) (string, pkg.FhirVersion, error) {
defer bundleFile.Seek(0, io.SeekStart)

// TODO: find a way to correctly detect bundle version

bundleType := pkg.FhirVersion401
patientIds, err := parse401Bundle(bundleFile)
bundleFile.Seek(0, io.SeekStart)

//fallback to 430 bundle
if err != nil || patientIds == nil || len(patientIds) == 0 {
bundleType = pkg.FhirVersion430

patientIds, err = parse430Bundle(bundleFile)

}
if err != nil {
//failed to parse the bundle as 401 and 430, return an error
return "", "", fmt.Errorf("could not determine bundle version: %v", err)
} else if patientIds == nil || len(patientIds) == 0 {
return "", "", fmt.Errorf("could not determine patient id")
} else {
return cleanPatientIdPrefix(patientIds[0]), bundleType, nil
}
}

//TODO: find a better, more generic way to do this.

func parse401Bundle(bundleFile *os.File) ([]string, error) {
bundle401Data := fhir401.Bundle{}
//try parsing the bundle as a 401 bundle
if err := base.UnmarshalJson(bundleFile, &bundle401Data); err == nil {
patientIds := lo.FilterMap[fhir401.BundleEntry, string](bundle401Data.Entry, func(bundleEntry fhir401.BundleEntry, _ int) (string, bool) {
parsedResource, err := fhir401utils.MapToResource(bundleEntry.Resource, false)
if err != nil {
return "", false
}
typedResource := parsedResource.(models.ResourceInterface)
resourceType, resourceId := typedResource.ResourceRef()

if resourceId == nil || len(*resourceId) == 0 {
return "", false
}
return *resourceId, resourceType == fhir430.ResourceTypePatient.String()
})

return patientIds, nil
} else {
return nil, err
}

}

func parse430Bundle(bundleFile *os.File) ([]string, error) {
bundle430Data := fhir430.Bundle{}
//try parsing the bundle as a 430 bundle
if err := base.UnmarshalJson(bundleFile, &bundle430Data); err == nil {
patientIds := lo.FilterMap[fhir430.BundleEntry, string](bundle430Data.Entry, func(bundleEntry fhir430.BundleEntry, _ int) (string, bool) {
parsedResource, err := fhir430utils.MapToResource(bundleEntry.Resource, false)
if err != nil {
return "", false
}
typedResource := parsedResource.(models.ResourceInterface)
resourceType, resourceId := typedResource.ResourceRef()

if resourceId == nil || len(*resourceId) == 0 {
return "", false
}
return *resourceId, resourceType == fhir430.ResourceTypePatient.String()
})
return patientIds, nil
} else {
return nil, err
}
}
50 changes: 50 additions & 0 deletions clients/internal/manual/client_ndjson.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package manual

import (
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-sources/clients/models"
"github.com/fastenhealth/fasten-sources/pkg"
fhir401utils "github.com/fastenhealth/gofhir-models/fhir401/utils"
"io"
"os"
)

func extractPatientIdNDJson(bundleFile *os.File) (string, pkg.FhirVersion, error) {
// TODO: find a way to correctly detect bundle version
defer bundleFile.Seek(0, io.SeekStart)

patientIds := []string{}
d := json.NewDecoder(bundleFile)
for {

var resource json.RawMessage
err := d.Decode(&resource)
if err != nil {
// io.EOF is expected at end of stream.
if err == io.EOF {
break //we're done
} else {
continue //skip this document, invalid json
}
}

resourceObj, err := fhir401utils.MapToResource(resource, false)
if err != nil {
continue
}

resourceObjTyped := resourceObj.(models.ResourceInterface)
currentResourceType, currentResourceId := resourceObjTyped.ResourceRef()

if currentResourceType == "Patient" {
patientIds = append(patientIds, *currentResourceId)
}
}

if len(patientIds) == 0 {
return "", "", fmt.Errorf("could not determine patient id")
} else {
return cleanPatientIdPrefix(patientIds[0]), pkg.FhirVersion401, nil
}
}
Loading

0 comments on commit 08f4643

Please sign in to comment.