From 5599d6b455d231f17290efadbc259dfbd0a99263 Mon Sep 17 00:00:00 2001 From: Julian Date: Fri, 29 Sep 2023 14:44:28 -0400 Subject: [PATCH 01/21] Add CreatedTime to Variant struct --- src/api/models/indexes/main.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/api/models/indexes/main.go b/src/api/models/indexes/main.go index a53d101..0e84fe9 100644 --- a/src/api/models/indexes/main.go +++ b/src/api/models/indexes/main.go @@ -2,6 +2,7 @@ package indexes import ( c "gohan/api/models/constants" + "time" ) type Variant struct { @@ -17,9 +18,10 @@ type Variant struct { Sample Sample `json:"sample"` - FileId string `json:"fileId"` - Dataset string `json:"dataset"` - AssemblyId c.AssemblyId `json:"assemblyId"` + FileId string `json:"fileId"` + Dataset string `json:"dataset"` + AssemblyId c.AssemblyId `json:"assemblyId"` + CreatedTime time.Time `json:"createdTime"` } type Info struct { From b6c515128a0f26c1d35f2c72f8f9e59f4ff66276 Mon Sep 17 00:00:00 2001 From: Julian Date: Fri, 29 Sep 2023 14:58:37 -0400 Subject: [PATCH 02/21] Implement fetch the most recent variant timestamp. --- src/api/mvc/variants/main.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/api/mvc/variants/main.go b/src/api/mvc/variants/main.go index f8f6831..5b1ef9e 100644 --- a/src/api/mvc/variants/main.go +++ b/src/api/mvc/variants/main.go @@ -491,6 +491,40 @@ func GetDatasetVariantsCount(c echo.Context) int { return int(totalVariantsCount) } +func GetLastCreatedVariantForDataset(c echo.Context) string { + gc := c.(*contexts.GohanContext) + cfg := gc.Config + es := gc.Es7Client + + dataset := gc.Dataset + fmt.Printf("Fetching the last 'created' timestamp for dataset: %s\n", dataset) + + var ( + lastCreatedTimestamp string + g = new(errgroup.Group) + ) + + g.Go(func() error { + timestamp, timestampError := esRepo.GetMostRecentVariantTimestamp(cfg, es, dataset.String()) + if timestampError != nil { + fmt.Printf("Failed to fetch the most recent 'created' timestamp for dataset %s. Error: %v\n", dataset, timestampError) + return timestampError + } + + lastCreatedTimestamp = timestamp.Format(time.RFC3339) + fmt.Printf("Fetched timestamp for dataset %s is: %s\n", dataset, lastCreatedTimestamp) + return nil + }) + + // wait for the HTTP fetch to complete. + if err := g.Wait(); err != nil { + fmt.Printf("Encountered an error while fetching data: %v\n", err) + } else { + fmt.Printf("Successfully Obtained Dataset '%s' most recent 'created' timestamp: '%s' \n", dataset, lastCreatedTimestamp) + } + return lastCreatedTimestamp +} + func GetDatasetSummary(c echo.Context) error { gc := c.(*contexts.GohanContext) From 868445e4b570d69a7c8dd1844ca8a8104db5402e Mon Sep 17 00:00:00 2001 From: Julian Date: Fri, 29 Sep 2023 14:59:27 -0400 Subject: [PATCH 03/21] Update DataTypeSummary struct with LastCreated field. --- src/api/mvc/variants/main.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/api/mvc/variants/main.go b/src/api/mvc/variants/main.go index 5b1ef9e..08b9596 100644 --- a/src/api/mvc/variants/main.go +++ b/src/api/mvc/variants/main.go @@ -634,24 +634,27 @@ func ClearDataset(c echo.Context) error { } type DataTypeSummary struct { - Id string `json:"id"` - Label string `json:"label"` - Queryable bool `json:"queryable"` - Schema map[string]interface{} `json:"schema"` - Count int `json:"count"` + Id string `json:"id"` + Label string `json:"label"` + Queryable bool `json:"queryable"` + Schema map[string]interface{} `json:"schema"` + Count int `json:"count"` + LastCreated string `json:"last_created"` } type DataTypeResponseDto = []DataTypeSummary func GetDatasetDataTypes(c echo.Context) error { count := GetDatasetVariantsCount(c) + last_created := GetLastCreatedVariantForDataset(c) return c.JSON(http.StatusOK, &DataTypeResponseDto{ DataTypeSummary{ - Id: "variant", - Label: "Variants", - Queryable: true, - Schema: schemas.VARIANT_SCHEMA, - Count: count, + Id: "variant", + Label: "Variants", + Queryable: true, + Schema: schemas.VARIANT_SCHEMA, + Count: count, + LastCreated: last_created, }, }) } From bdc61e6a43b3ba7f441530599ec476c6d492d719 Mon Sep 17 00:00:00 2001 From: Julian Date: Fri, 29 Sep 2023 15:02:55 -0400 Subject: [PATCH 04/21] Add function to retrieve the latest 'createdTime' for a given dataset. --- .../repositories/elasticsearch/variants.go | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index 32b1f96..0cb65e8 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -322,6 +322,83 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, e return result, nil } +func GetMostRecentVariantTimestamp(cfg *models.Config, es *elasticsearch.Client, dataset string) (time.Time, error) { + // Initialize a zero-value timestamp + var mostRecentTimestamp time.Time + + // Setup the Elasticsearch query to fetch the most recent 'created' timestamp + var buf bytes.Buffer + query := map[string]interface{}{ + "query": map[string]interface{}{ + "term": map[string]string{ + "dataset.keyword": dataset, + }, + }, + "size": 1, + "sort": []map[string]interface{}{ + { + "createdTime": map[string]string{ + "order": "desc", + }, + }, + }, + } + + // Encode the query to JSON + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Fatalf("Error encoding query: %s\n", err) + return mostRecentTimestamp, err + } + + // Print the constructed query for debugging + fmt.Println("Constructed Elasticsearch Query:", string(buf.Bytes())) + + // Execute the query against Elasticsearch + res, searchErr := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(wildcardVariantsIndex), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + + if searchErr != nil { + fmt.Printf("Error executing search request: %s\n", searchErr) + return mostRecentTimestamp, searchErr + } + defer res.Body.Close() + + // Parse the response + var result map[string]interface{} + decoder := json.NewDecoder(res.Body) + if err := decoder.Decode(&result); err != nil { + fmt.Printf("Error unmarshalling Elasticsearch response: %s\n", err) + return mostRecentTimestamp, err + } + + // Extract the 'created' timestamp from the first hit (if available) + if hits, found := result["hits"].(map[string]interface{}); found { + if hitSlice, hitFound := hits["hits"].([]interface{}); hitFound && len(hitSlice) > 0 { + if firstHit, firstHitFound := hitSlice[0].(map[string]interface{}); firstHitFound { + if source, sourceFound := firstHit["_source"].(map[string]interface{}); sourceFound { + if created, createdFound := source["createdTime"].(string); createdFound { + parsedTime, err := time.Parse(time.RFC3339, created) + if err == nil { + mostRecentTimestamp = parsedTime + } else { + fmt.Printf("Error parsing 'createdTime' timestamp: %s\n", err) + } + } + } + } + } else { + fmt.Println("No hits found for dataset:", dataset) + } + } + + return mostRecentTimestamp, nil +} + func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, es *elasticsearch.Client, chromosome string, lowerBound int, upperBound int, variantId string, sampleId string, datasetString string, From 5340a7fc69ce1c8b343815d04fb2734816a1e338 Mon Sep 17 00:00:00 2001 From: Julian Date: Fri, 29 Sep 2023 15:05:53 -0400 Subject: [PATCH 05/21] Add timestamp for variant processing in VCF ingestion --- src/api/services/ingestion.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 056a66e..239d883 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -827,6 +827,8 @@ func (i *IngestionService) ProcessVcf( var resultingVariant indexes.Variant mapstructure.Decode(tmpVariant, &resultingVariant) + resultingVariant.CreatedTime = time.Now() + // pass variant (along with a waitgroup) to the channel i.IngestionBulkIndexingQueue <- &structs.IngestionQueueStructure{ Variant: &resultingVariant, From 3a89736d78aee1ef2a5541e526ee261e1e79fd76 Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 08:41:31 -0400 Subject: [PATCH 06/21] temporal changes --- src/api/mvc/data-types/main.go | 15 +++++++++-- src/api/mvc/variants/main.go | 4 +++ .../repositories/elasticsearch/variants.go | 7 +++++ src/api/services/variants/main.go | 26 +++++++++++++------ 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index 9484056..dd8765a 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -1,6 +1,7 @@ package dataTypes import ( + "fmt" "net/http" "gohan/api/contexts" @@ -20,13 +21,16 @@ var variantDataTypeJson = map[string]interface{}{ } func GetDataTypes(c echo.Context) error { - es := c.(*contexts.GohanContext).Es7Client - cfg := c.(*contexts.GohanContext).Config + gc := c.(*contexts.GohanContext) + cfg := gc.Config + es := gc.Es7Client // accumulate number of variants associated with each // sampleId fetched from the variants overview resultsMap, err := variantService.GetVariantsOverview(es, cfg) + fmt.Printf("resultsMapDDD: %v\n", resultsMap) + if err != nil { // Could not talk to Elasticsearch, return an error return c.JSON(http.StatusInternalServerError, map[string]interface{}{ @@ -36,6 +40,13 @@ func GetDataTypes(c echo.Context) error { variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"]) + // Fetch the last_created value from resultsMap and add to variantDataTypeJson + if latestCreated, ok := resultsMap["last_created_time"].(string); ok { + variantDataTypeJson["last_created"] = latestCreated + } + + fmt.Printf("variantDataTypeJson: %v\n", variantDataTypeJson) + // Data types are basically stand-ins for schema blocks return c.JSON(http.StatusOK, []map[string]interface{}{ variantDataTypeJson, diff --git a/src/api/mvc/variants/main.go b/src/api/mvc/variants/main.go index 08b9596..738191c 100644 --- a/src/api/mvc/variants/main.go +++ b/src/api/mvc/variants/main.go @@ -506,6 +506,10 @@ func GetLastCreatedVariantForDataset(c echo.Context) string { g.Go(func() error { timestamp, timestampError := esRepo.GetMostRecentVariantTimestamp(cfg, es, dataset.String()) + fmt.Printf("timestamp: %v\n", timestamp) + fmt.Printf("timestampError: %v\n", timestampError) + fmt.Printf("timestampError == nil: %v\n", es) + fmt.Printf("timestampError == nil: %v\n", dataset.String()) if timestampError != nil { fmt.Printf("Failed to fetch the most recent 'created' timestamp for dataset %s. Error: %v\n", dataset, timestampError) return timestampError diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index 0cb65e8..661d7da 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -589,6 +589,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword string) (map[string]interface{}, error) { // begin building the request body. + fmt.Printf("Query StartKEYWORD: %s\n", keyword) var buf bytes.Buffer aggMap := map[string]interface{}{ "size": "0", @@ -602,6 +603,12 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k }, }, }, + "latest_created": map[string]interface{}{ + "max": map[string]interface{}{ + "field": "createdTime", + "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", + }, + }, }, } diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index 68d0c92..f63f631 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -33,6 +33,7 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri defer _wg.Done() results, bucketsError := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword) + fmt.Printf("resultsCFCFCF: %v\n", results) if bucketsError != nil { resultsMux.Lock() defer resultsMux.Unlock() @@ -45,14 +46,17 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri // retrieve aggregations.items.buckets bucketsMapped := []interface{}{} - if aggs, aggsOk := results["aggregations"]; aggsOk { - aggsMapped := aggs.(map[string]interface{}) - - if items, itemsOk := aggsMapped["items"]; itemsOk { - itemsMapped := items.(map[string]interface{}) - - if buckets, bucketsOk := itemsMapped["buckets"]; bucketsOk { - bucketsMapped = buckets.([]interface{}) + if aggs, aggsOk := results["aggregations"].(map[string]interface{}); aggsOk { + if latest, latestOk := aggs["latest_created"].(map[string]interface{}); latestOk { + if valueAsString, valOk := latest["value_as_string"].(string); valOk { + resultsMux.Lock() + resultsMap["last_created_time"] = valueAsString + resultsMux.Unlock() + } + } + if items, itemsOk := aggs["items"].(map[string]interface{}); itemsOk { + if buckets, bucketsOk := items["buckets"].([]interface{}); bucketsOk { + bucketsMapped = buckets } } } @@ -77,6 +81,12 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri return nil, errors.New("could not contact Elasticsearch - make sure it's running") } + // Extract latest created time + if latest, exists := resultsMap["last_created"].(map[string]interface{}); exists { + latestCreatedTime := latest["value_as_string"].(string) + resultsMap["last_created_time"] = latestCreatedTime + } + // get distribution of chromosomes wg.Add(1) go callGetBucketsByKeyword("chromosomes", "chrom.keyword", &wg) From b1d3180e930b3cd6476c3e39cc50969961e6ca2e Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 10:49:06 -0400 Subject: [PATCH 07/21] remove logs --- src/api/mvc/data-types/main.go | 2 -- src/api/mvc/variants/main.go | 4 ---- src/api/repositories/elasticsearch/variants.go | 1 - src/api/services/variants/main.go | 1 - 4 files changed, 8 deletions(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index dd8765a..7654e84 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -29,8 +29,6 @@ func GetDataTypes(c echo.Context) error { // sampleId fetched from the variants overview resultsMap, err := variantService.GetVariantsOverview(es, cfg) - fmt.Printf("resultsMapDDD: %v\n", resultsMap) - if err != nil { // Could not talk to Elasticsearch, return an error return c.JSON(http.StatusInternalServerError, map[string]interface{}{ diff --git a/src/api/mvc/variants/main.go b/src/api/mvc/variants/main.go index 738191c..08b9596 100644 --- a/src/api/mvc/variants/main.go +++ b/src/api/mvc/variants/main.go @@ -506,10 +506,6 @@ func GetLastCreatedVariantForDataset(c echo.Context) string { g.Go(func() error { timestamp, timestampError := esRepo.GetMostRecentVariantTimestamp(cfg, es, dataset.String()) - fmt.Printf("timestamp: %v\n", timestamp) - fmt.Printf("timestampError: %v\n", timestampError) - fmt.Printf("timestampError == nil: %v\n", es) - fmt.Printf("timestampError == nil: %v\n", dataset.String()) if timestampError != nil { fmt.Printf("Failed to fetch the most recent 'created' timestamp for dataset %s. Error: %v\n", dataset, timestampError) return timestampError diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index 661d7da..a4b394a 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -589,7 +589,6 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword string) (map[string]interface{}, error) { // begin building the request body. - fmt.Printf("Query StartKEYWORD: %s\n", keyword) var buf bytes.Buffer aggMap := map[string]interface{}{ "size": "0", diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index f63f631..4f6d766 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -33,7 +33,6 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri defer _wg.Done() results, bucketsError := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword) - fmt.Printf("resultsCFCFCF: %v\n", results) if bucketsError != nil { resultsMux.Lock() defer resultsMux.Unlock() From 65b122d3cf258b4a9e11b231b02c438109a38f20 Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 13:54:02 -0400 Subject: [PATCH 08/21] add reduced data-types endpoint for public --- src/api/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/main.go b/src/api/main.go index 87dae68..608a88a 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -134,6 +134,7 @@ func main() { // -- Data-Types e.GET("/data-types", dataTypesMvc.GetDataTypes) + e.GET("/public/data-types", dataTypesMvc.GetReducedDataTypes) e.GET("/data-types/variant", dataTypesMvc.GetVariantDataType) e.GET("/data-types/variant/schema", dataTypesMvc.GetVariantDataTypeSchema) e.GET("/data-types/variant/metadata_schema", dataTypesMvc.GetVariantDataTypeMetadataSchema) From 1898fbae7d855c9cd96b0ff95ad4d62096e22f89 Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 13:56:05 -0400 Subject: [PATCH 09/21] Refactor GetDataTypes --- src/api/mvc/data-types/main.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index 7654e84..cae807d 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -20,30 +20,36 @@ var variantDataTypeJson = map[string]interface{}{ "metadata_schema": schemas.OBJECT_SCHEMA, } -func GetDataTypes(c echo.Context) error { +func fetchVariantData(c echo.Context) (map[string]interface{}, error) { gc := c.(*contexts.GohanContext) cfg := gc.Config es := gc.Es7Client - // accumulate number of variants associated with each - // sampleId fetched from the variants overview resultsMap, err := variantService.GetVariantsOverview(es, cfg) - if err != nil { - // Could not talk to Elasticsearch, return an error - return c.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": err.Error(), - }) + return nil, err } variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"]) - - // Fetch the last_created value from resultsMap and add to variantDataTypeJson if latestCreated, ok := resultsMap["last_created_time"].(string); ok { - variantDataTypeJson["last_created"] = latestCreated + variantDataTypeJson["last_ingested"] = latestCreated + } + + return variantDataTypeJson, nil +} + +func GetDataTypes(c echo.Context) error { + variantData, err := fetchVariantData(c) + if err != nil { + return c.JSON(http.StatusInternalServerError, map[string]interface{}{ + "error": err.Error(), + }) } - fmt.Printf("variantDataTypeJson: %v\n", variantDataTypeJson) + return c.JSON(http.StatusOK, []map[string]interface{}{ + variantData, + }) +} // Data types are basically stand-ins for schema blocks return c.JSON(http.StatusOK, []map[string]interface{}{ From a9d701bf5bd8fee585ba108c895e169a1f8416af Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 13:56:45 -0400 Subject: [PATCH 10/21] Add function to retrieve reduced dataTypes --- src/api/mvc/data-types/main.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index cae807d..183d3be 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -51,9 +51,37 @@ func GetDataTypes(c echo.Context) error { }) } - // Data types are basically stand-ins for schema blocks +func GetReducedDataTypes(c echo.Context) error { + variantData, err := fetchVariantData(c) + if err != nil { + return c.JSON(http.StatusInternalServerError, map[string]interface{}{ + "error": err.Error(), + }) + } + + if variantData == nil { + return c.JSON(http.StatusInternalServerError, map[string]interface{}{ + "error": "Failed to retrieve variant data.", + }) + } + + count, _ := variantData["count"] + id, _ := variantData["id"].(string) + label, _ := variantData["label"].(string) + last_ingested, _ := variantData["last_ingested"].(string) + queryable, _ := variantData["queryable"].(bool) + + // Create a reduced response + reducedResponse := map[string]interface{}{ + "count": count, + "id": id, + "label": label, + "last_ingested": last_ingested, + "queryable": queryable, + } + return c.JSON(http.StatusOK, []map[string]interface{}{ - variantDataTypeJson, + reducedResponse, }) } From ff561edc197e179532de0a857ecba364d49b0d86 Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 13:57:32 -0400 Subject: [PATCH 11/21] remove import fmt --- src/api/mvc/data-types/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index 183d3be..7886db4 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -1,7 +1,6 @@ package dataTypes import ( - "fmt" "net/http" "gohan/api/contexts" From 2767c32df232bff3beea7d142186b059d6406d42 Mon Sep 17 00:00:00 2001 From: Julian Date: Tue, 10 Oct 2023 13:58:32 -0400 Subject: [PATCH 12/21] rename dataTypes key name --- src/api/mvc/variants/main.go | 6 +++--- src/api/services/variants/main.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/mvc/variants/main.go b/src/api/mvc/variants/main.go index 08b9596..ca2acd7 100644 --- a/src/api/mvc/variants/main.go +++ b/src/api/mvc/variants/main.go @@ -639,14 +639,14 @@ type DataTypeSummary struct { Queryable bool `json:"queryable"` Schema map[string]interface{} `json:"schema"` Count int `json:"count"` - LastCreated string `json:"last_created"` + LastCreated string `json:"last_ingested"` } type DataTypeResponseDto = []DataTypeSummary func GetDatasetDataTypes(c echo.Context) error { count := GetDatasetVariantsCount(c) - last_created := GetLastCreatedVariantForDataset(c) + last_ingested := GetLastCreatedVariantForDataset(c) return c.JSON(http.StatusOK, &DataTypeResponseDto{ DataTypeSummary{ Id: "variant", @@ -654,7 +654,7 @@ func GetDatasetDataTypes(c echo.Context) error { Queryable: true, Schema: schemas.VARIANT_SCHEMA, Count: count, - LastCreated: last_created, + LastCreated: last_ingested, }, }) } diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index 4f6d766..044685e 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -81,7 +81,7 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri } // Extract latest created time - if latest, exists := resultsMap["last_created"].(map[string]interface{}); exists { + if latest, exists := resultsMap["last_ingested"].(map[string]interface{}); exists { latestCreatedTime := latest["value_as_string"].(string) resultsMap["last_created_time"] = latestCreatedTime } From a1406d9a8947ed1d31cdf0c275c62b3cbf43e8cf Mon Sep 17 00:00:00 2001 From: Julian Date: Thu, 12 Oct 2023 00:32:12 -0400 Subject: [PATCH 13/21] remove reduced data-types response --- src/api/main.go | 1 - src/api/mvc/data-types/main.go | 34 ---------------------------------- 2 files changed, 35 deletions(-) diff --git a/src/api/main.go b/src/api/main.go index 608a88a..87dae68 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -134,7 +134,6 @@ func main() { // -- Data-Types e.GET("/data-types", dataTypesMvc.GetDataTypes) - e.GET("/public/data-types", dataTypesMvc.GetReducedDataTypes) e.GET("/data-types/variant", dataTypesMvc.GetVariantDataType) e.GET("/data-types/variant/schema", dataTypesMvc.GetVariantDataTypeSchema) e.GET("/data-types/variant/metadata_schema", dataTypesMvc.GetVariantDataTypeMetadataSchema) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index 7886db4..010236e 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -50,40 +50,6 @@ func GetDataTypes(c echo.Context) error { }) } -func GetReducedDataTypes(c echo.Context) error { - variantData, err := fetchVariantData(c) - if err != nil { - return c.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": err.Error(), - }) - } - - if variantData == nil { - return c.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": "Failed to retrieve variant data.", - }) - } - - count, _ := variantData["count"] - id, _ := variantData["id"].(string) - label, _ := variantData["label"].(string) - last_ingested, _ := variantData["last_ingested"].(string) - queryable, _ := variantData["queryable"].(bool) - - // Create a reduced response - reducedResponse := map[string]interface{}{ - "count": count, - "id": id, - "label": label, - "last_ingested": last_ingested, - "queryable": queryable, - } - - return c.JSON(http.StatusOK, []map[string]interface{}{ - reducedResponse, - }) -} - func GetVariantDataType(c echo.Context) error { return c.JSON(http.StatusOK, variantDataTypeJson) } From cb110857eafa80b8935189c0572d80a971dd1b1a Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 1 Nov 2023 11:17:52 -0400 Subject: [PATCH 14/21] format bucket agregation last ingestion time --- src/api/repositories/elasticsearch/variants.go | 3 +-- src/api/services/variants/main.go | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index a4b394a..c5868d5 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -604,8 +604,7 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k }, "latest_created": map[string]interface{}{ "max": map[string]interface{}{ - "field": "createdTime", - "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", + "field": "createdTime", }, }, }, diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index 044685e..8069d96 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -6,6 +6,7 @@ import ( "gohan/api/models" esRepo "gohan/api/repositories/elasticsearch" "sync" + "time" "github.com/elastic/go-elasticsearch/v7" ) @@ -47,9 +48,12 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri bucketsMapped := []interface{}{} if aggs, aggsOk := results["aggregations"].(map[string]interface{}); aggsOk { if latest, latestOk := aggs["latest_created"].(map[string]interface{}); latestOk { - if valueAsString, valOk := latest["value_as_string"].(string); valOk { + if timestamp, timeStampOk := latest["value"].(float64); timeStampOk { + // convert the Unix timestamp time.Time object + formattedTime := time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339) + resultsMux.Lock() - resultsMap["last_created_time"] = valueAsString + resultsMap["last_created_time"] = formattedTime resultsMux.Unlock() } } From 81ec9bcad4968aff221f0c723b7f311bbc54b1c7 Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 8 Nov 2023 10:18:03 -0500 Subject: [PATCH 15/21] refactor last ingested for variant data retrieval --- src/api/mvc/data-types/main.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/api/mvc/data-types/main.go b/src/api/mvc/data-types/main.go index 010236e..40c71af 100644 --- a/src/api/mvc/data-types/main.go +++ b/src/api/mvc/data-types/main.go @@ -28,11 +28,8 @@ func fetchVariantData(c echo.Context) (map[string]interface{}, error) { if err != nil { return nil, err } - variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"]) - if latestCreated, ok := resultsMap["last_created_time"].(string); ok { - variantDataTypeJson["last_ingested"] = latestCreated - } + variantDataTypeJson["last_ingested"] = resultsMap["last_ingested"] return variantDataTypeJson, nil } From dbcfccd62bb5bdd3fca5022f5e3e5075ff8d5d45 Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 8 Nov 2023 10:20:59 -0500 Subject: [PATCH 16/21] fix returns in las created timestap --- src/api/repositories/elasticsearch/variants.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index c5868d5..ed4f8a0 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -387,12 +387,14 @@ func GetMostRecentVariantTimestamp(cfg *models.Config, es *elasticsearch.Client, mostRecentTimestamp = parsedTime } else { fmt.Printf("Error parsing 'createdTime' timestamp: %s\n", err) + return mostRecentTimestamp, err } } } } } else { - fmt.Println("No hits found for dataset:", dataset) + fmt.Printf("No hits found for dataset: %s\n", dataset) + return mostRecentTimestamp, nil } } From 30af96556aef7fe78f9e94858bea1d173bde87a7 Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 8 Nov 2023 10:22:10 -0500 Subject: [PATCH 17/21] refactor last ingested in variants bucket retrieval --- src/api/repositories/elasticsearch/variants.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/api/repositories/elasticsearch/variants.go b/src/api/repositories/elasticsearch/variants.go index ed4f8a0..bd82313 100644 --- a/src/api/repositories/elasticsearch/variants.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -604,7 +604,7 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k }, }, }, - "latest_created": map[string]interface{}{ + "last_ingested": map[string]interface{}{ "max": map[string]interface{}{ "field": "createdTime", }, @@ -753,7 +753,6 @@ func GetVariantsBucketsByKeywordAndDataset(cfg *models.Config, es *elasticsearch } func DeleteVariantsByDatasetId(cfg *models.Config, es *elasticsearch.Client, dataset string) (map[string]interface{}, error) { - var buf bytes.Buffer query := map[string]interface{}{ "query": map[string]interface{}{ From cf49d4388e3b9bd2d76c8a753ab4a3dc4a32a8d9 Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 8 Nov 2023 10:23:21 -0500 Subject: [PATCH 18/21] refactor add of last ingested field to variants overview --- src/api/services/variants/main.go | 58 ++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index 8069d96..db9447a 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -30,6 +30,32 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri resultsMux := sync.RWMutex{} var wg sync.WaitGroup + + callProcessLatestCreated := func(key string, keyword string, _wg *sync.WaitGroup) { + defer _wg.Done() + + results, err := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword) + if err != nil { + resultsMux.Lock() + resultsMap[key+"_error"] = "Failed to get latest created time." + resultsMux.Unlock() + return + } + + var formattedTime string + if aggs, ok := results["aggregations"].(map[string]interface{}); ok { + if latest, ok := aggs["last_ingested"].(map[string]interface{}); ok { + if timestamp, ok := latest["value"].(float64); ok { + formattedTime = time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339) + } + } + } + + resultsMux.Lock() + resultsMap[key] = formattedTime + resultsMux.Unlock() + } + callGetBucketsByKeyword := func(key string, keyword string, _wg *sync.WaitGroup) { defer _wg.Done() @@ -46,20 +72,14 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri // retrieve aggregations.items.buckets bucketsMapped := []interface{}{} - if aggs, aggsOk := results["aggregations"].(map[string]interface{}); aggsOk { - if latest, latestOk := aggs["latest_created"].(map[string]interface{}); latestOk { - if timestamp, timeStampOk := latest["value"].(float64); timeStampOk { - // convert the Unix timestamp time.Time object - formattedTime := time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339) - - resultsMux.Lock() - resultsMap["last_created_time"] = formattedTime - resultsMux.Unlock() - } - } - if items, itemsOk := aggs["items"].(map[string]interface{}); itemsOk { - if buckets, bucketsOk := items["buckets"].([]interface{}); bucketsOk { - bucketsMapped = buckets + if aggs, aggsOk := results["aggregations"]; aggsOk { + aggsMapped := aggs.(map[string]interface{}) + + if items, itemsOk := aggsMapped["items"]; itemsOk { + itemsMapped := items.(map[string]interface{}) + + if buckets, bucketsOk := itemsMapped["buckets"]; bucketsOk { + bucketsMapped = buckets.([]interface{}) } } } @@ -84,12 +104,6 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri return nil, errors.New("could not contact Elasticsearch - make sure it's running") } - // Extract latest created time - if latest, exists := resultsMap["last_ingested"].(map[string]interface{}); exists { - latestCreatedTime := latest["value_as_string"].(string) - resultsMap["last_created_time"] = latestCreatedTime - } - // get distribution of chromosomes wg.Add(1) go callGetBucketsByKeyword("chromosomes", "chrom.keyword", &wg) @@ -110,6 +124,10 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri wg.Add(1) go callGetBucketsByKeyword("datasets", "dataset.keyword", &wg) + // get last ingested variant + wg.Add(1) + go callProcessLatestCreated("last_ingested", "last_ingested.keyword", &wg) + wg.Wait() return resultsMap, nil From 430d2a3ac4dec3b4f8f0fdbc50b52849065d0925 Mon Sep 17 00:00:00 2001 From: Julian Date: Mon, 13 Nov 2023 09:43:28 -0500 Subject: [PATCH 19/21] Add last_ingested to GetVariantsOverview test --- src/api/tests/common/common.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/api/tests/common/common.go b/src/api/tests/common/common.go index 0e14c11..2c33e68 100644 --- a/src/api/tests/common/common.go +++ b/src/api/tests/common/common.go @@ -114,6 +114,10 @@ func GetVariantsOverview(_t *testing.T, _cfg *models.Config) map[string]interfac assert.True(_t, sidkOk) assert.NotNil(_t, sampleIDsKey) + lastIngestionKey, likOk := overviewRespJson["last_ingested"] + assert.True(_t, likOk) + assert.NotNil(_t, lastIngestionKey) + return overviewRespJson } From 8f86f0890fc1eb5d7929eeb5a1ad1336e43c4315 Mon Sep 17 00:00:00 2001 From: Julian Date: Mon, 13 Nov 2023 09:54:05 -0500 Subject: [PATCH 20/21] Handle'last_ingested as a string in test --- src/api/tests/build/api/variants_test.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/api/tests/build/api/variants_test.go b/src/api/tests/build/api/variants_test.go index 6d10a3e..df4c625 100644 --- a/src/api/tests/build/api/variants_test.go +++ b/src/api/tests/build/api/variants_test.go @@ -169,13 +169,22 @@ func TestDemoVcfIngestion(t *testing.T) { for oK, oV := range overviewJson { assert.NotNil(t, oV) - assert.NotNil(t, overviewJson[oK]) - assert.NotNil(t, overviewJson[oK].(map[string]interface{})) + // handle 'last_ingested' as a string + if oK == "last_ingested" { + _, ok := oV.(string) + assert.True(t, ok) + continue + } + + // assert the value is a map for other keys + mapValue, ok := oV.(map[string]interface{}) + assert.True(t, ok) - for k, v := range oV.(map[string]interface{}) { + for k, v := range mapValue { key := k assert.NotNil(t, v) - value := v.(float64) + value, ok := v.(float64) + assert.True(t, ok) assert.NotNil(t, key) assert.NotEmpty(t, key) assert.NotEmpty(t, value) From 2d87ee508f617a9c7301023fbf34f2b345904a6d Mon Sep 17 00:00:00 2001 From: Julian Date: Wed, 15 Nov 2023 14:42:06 -0500 Subject: [PATCH 21/21] Add comment to callProcessLatestCreated --- src/api/services/variants/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/services/variants/main.go b/src/api/services/variants/main.go index db9447a..7a34076 100644 --- a/src/api/services/variants/main.go +++ b/src/api/services/variants/main.go @@ -31,6 +31,8 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri var wg sync.WaitGroup + /* callProcessLatestCreated performs a Elasticsearch query for the latest created + time of variant and updates resultsMap with the formatted time.*/ callProcessLatestCreated := func(key string, keyword string, _wg *sync.WaitGroup) { defer _wg.Done()