Skip to content

Commit

Permalink
Merge pull request #55 from bento-platform/features/retrieve_last_ing…
Browse files Browse the repository at this point in the history
…ested

Features/retrieve_last_ingested
  • Loading branch information
noctillion authored Nov 20, 2023
2 parents 1a9760b + 2d87ee5 commit 3b29d39
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 28 deletions.
8 changes: 5 additions & 3 deletions src/api/models/indexes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexes

import (
c "gohan/api/models/constants"
"time"
)

type Variant struct {
Expand All @@ -17,9 +18,10 @@ type Variant struct {

Sample Sample `json:"sample"`

FileId string `json:"fileId"`
Dataset string `json:"dataset"`
AssemblyId c.AssemblyId `json:"assemblyId"`
FileId string `json:"fileId"`
Dataset string `json:"dataset"`
AssemblyId c.AssemblyId `json:"assemblyId"`
CreatedTime time.Time `json:"createdTime"`
}

type Info struct {
Expand Down
25 changes: 15 additions & 10 deletions src/api/mvc/data-types/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,31 @@ var variantDataTypeJson = map[string]interface{}{
"metadata_schema": schemas.OBJECT_SCHEMA,
}

func GetDataTypes(c echo.Context) error {
es := c.(*contexts.GohanContext).Es7Client
cfg := c.(*contexts.GohanContext).Config
func fetchVariantData(c echo.Context) (map[string]interface{}, error) {
gc := c.(*contexts.GohanContext)
cfg := gc.Config
es := gc.Es7Client

// accumulate number of variants associated with each
// sampleId fetched from the variants overview
resultsMap, err := variantService.GetVariantsOverview(es, cfg)
if err != nil {
return nil, err
}
variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])
variantDataTypeJson["last_ingested"] = resultsMap["last_ingested"]

return variantDataTypeJson, nil
}

func GetDataTypes(c echo.Context) error {
variantData, err := fetchVariantData(c)
if err != nil {
// Could not talk to Elasticsearch, return an error
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": err.Error(),
})
}

variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])

// Data types are basically stand-ins for schema blocks
return c.JSON(http.StatusOK, []map[string]interface{}{
variantDataTypeJson,
variantData,
})
}

Expand Down
57 changes: 47 additions & 10 deletions src/api/mvc/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,40 @@ func GetDatasetVariantsCount(c echo.Context) int {
return int(totalVariantsCount)
}

func GetLastCreatedVariantForDataset(c echo.Context) string {
gc := c.(*contexts.GohanContext)
cfg := gc.Config
es := gc.Es7Client

dataset := gc.Dataset
fmt.Printf("Fetching the last 'created' timestamp for dataset: %s\n", dataset)

var (
lastCreatedTimestamp string
g = new(errgroup.Group)
)

g.Go(func() error {
timestamp, timestampError := esRepo.GetMostRecentVariantTimestamp(cfg, es, dataset.String())
if timestampError != nil {
fmt.Printf("Failed to fetch the most recent 'created' timestamp for dataset %s. Error: %v\n", dataset, timestampError)
return timestampError
}

lastCreatedTimestamp = timestamp.Format(time.RFC3339)
fmt.Printf("Fetched timestamp for dataset %s is: %s\n", dataset, lastCreatedTimestamp)
return nil
})

// wait for the HTTP fetch to complete.
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error while fetching data: %v\n", err)
} else {
fmt.Printf("Successfully Obtained Dataset '%s' most recent 'created' timestamp: '%s' \n", dataset, lastCreatedTimestamp)
}
return lastCreatedTimestamp
}

func GetDatasetSummary(c echo.Context) error {

gc := c.(*contexts.GohanContext)
Expand Down Expand Up @@ -600,24 +634,27 @@ func ClearDataset(c echo.Context) error {
}

type DataTypeSummary struct {
Id string `json:"id"`
Label string `json:"label"`
Queryable bool `json:"queryable"`
Schema map[string]interface{} `json:"schema"`
Count int `json:"count"`
Id string `json:"id"`
Label string `json:"label"`
Queryable bool `json:"queryable"`
Schema map[string]interface{} `json:"schema"`
Count int `json:"count"`
LastCreated string `json:"last_ingested"`
}

type DataTypeResponseDto = []DataTypeSummary

func GetDatasetDataTypes(c echo.Context) error {
count := GetDatasetVariantsCount(c)
last_ingested := GetLastCreatedVariantForDataset(c)
return c.JSON(http.StatusOK, &DataTypeResponseDto{
DataTypeSummary{
Id: "variant",
Label: "Variants",
Queryable: true,
Schema: schemas.VARIANT_SCHEMA,
Count: count,
Id: "variant",
Label: "Variants",
Queryable: true,
Schema: schemas.VARIANT_SCHEMA,
Count: count,
LastCreated: last_ingested,
},
})
}
Expand Down
85 changes: 84 additions & 1 deletion src/api/repositories/elasticsearch/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,85 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, e
return result, nil
}

func GetMostRecentVariantTimestamp(cfg *models.Config, es *elasticsearch.Client, dataset string) (time.Time, error) {
// Initialize a zero-value timestamp
var mostRecentTimestamp time.Time

// Setup the Elasticsearch query to fetch the most recent 'created' timestamp
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"term": map[string]string{
"dataset.keyword": dataset,
},
},
"size": 1,
"sort": []map[string]interface{}{
{
"createdTime": map[string]string{
"order": "desc",
},
},
},
}

// Encode the query to JSON
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s\n", err)
return mostRecentTimestamp, err
}

// Print the constructed query for debugging
fmt.Println("Constructed Elasticsearch Query:", string(buf.Bytes()))

// Execute the query against Elasticsearch
res, searchErr := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(wildcardVariantsIndex),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)

if searchErr != nil {
fmt.Printf("Error executing search request: %s\n", searchErr)
return mostRecentTimestamp, searchErr
}
defer res.Body.Close()

// Parse the response
var result map[string]interface{}
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&result); err != nil {
fmt.Printf("Error unmarshalling Elasticsearch response: %s\n", err)
return mostRecentTimestamp, err
}

// Extract the 'created' timestamp from the first hit (if available)
if hits, found := result["hits"].(map[string]interface{}); found {
if hitSlice, hitFound := hits["hits"].([]interface{}); hitFound && len(hitSlice) > 0 {
if firstHit, firstHitFound := hitSlice[0].(map[string]interface{}); firstHitFound {
if source, sourceFound := firstHit["_source"].(map[string]interface{}); sourceFound {
if created, createdFound := source["createdTime"].(string); createdFound {
parsedTime, err := time.Parse(time.RFC3339, created)
if err == nil {
mostRecentTimestamp = parsedTime
} else {
fmt.Printf("Error parsing 'createdTime' timestamp: %s\n", err)
return mostRecentTimestamp, err
}
}
}
}
} else {
fmt.Printf("No hits found for dataset: %s\n", dataset)
return mostRecentTimestamp, nil
}
}

return mostRecentTimestamp, nil
}

func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, es *elasticsearch.Client,
chromosome string, lowerBound int, upperBound int,
variantId string, sampleId string, datasetString string,
Expand Down Expand Up @@ -525,6 +604,11 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k
},
},
},
"last_ingested": map[string]interface{}{
"max": map[string]interface{}{
"field": "createdTime",
},
},
},
}

Expand Down Expand Up @@ -669,7 +753,6 @@ func GetVariantsBucketsByKeywordAndDataset(cfg *models.Config, es *elasticsearch
}

func DeleteVariantsByDatasetId(cfg *models.Config, es *elasticsearch.Client, dataset string) (map[string]interface{}, error) {

var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
Expand Down
2 changes: 2 additions & 0 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ func (i *IngestionService) ProcessVcf(
var resultingVariant indexes.Variant
mapstructure.Decode(tmpVariant, &resultingVariant)

resultingVariant.CreatedTime = time.Now()

// pass variant (along with a waitgroup) to the channel
i.IngestionBulkIndexingQueue <- &structs.IngestionQueueStructure{
Variant: &resultingVariant,
Expand Down
33 changes: 33 additions & 0 deletions src/api/services/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"gohan/api/models"
esRepo "gohan/api/repositories/elasticsearch"
"sync"
"time"

"github.com/elastic/go-elasticsearch/v7"
)
Expand All @@ -29,6 +30,34 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
resultsMux := sync.RWMutex{}

var wg sync.WaitGroup

/* callProcessLatestCreated performs a Elasticsearch query for the latest created
time of variant and updates resultsMap with the formatted time.*/
callProcessLatestCreated := func(key string, keyword string, _wg *sync.WaitGroup) {
defer _wg.Done()

results, err := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword)
if err != nil {
resultsMux.Lock()
resultsMap[key+"_error"] = "Failed to get latest created time."
resultsMux.Unlock()
return
}

var formattedTime string
if aggs, ok := results["aggregations"].(map[string]interface{}); ok {
if latest, ok := aggs["last_ingested"].(map[string]interface{}); ok {
if timestamp, ok := latest["value"].(float64); ok {
formattedTime = time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339)
}
}
}

resultsMux.Lock()
resultsMap[key] = formattedTime
resultsMux.Unlock()
}

callGetBucketsByKeyword := func(key string, keyword string, _wg *sync.WaitGroup) {
defer _wg.Done()

Expand Down Expand Up @@ -97,6 +126,10 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
wg.Add(1)
go callGetBucketsByKeyword("datasets", "dataset.keyword", &wg)

// get last ingested variant
wg.Add(1)
go callProcessLatestCreated("last_ingested", "last_ingested.keyword", &wg)

wg.Wait()

return resultsMap, nil
Expand Down
17 changes: 13 additions & 4 deletions src/api/tests/build/api/variants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,22 @@ func TestDemoVcfIngestion(t *testing.T) {
for oK, oV := range overviewJson {
assert.NotNil(t, oV)

assert.NotNil(t, overviewJson[oK])
assert.NotNil(t, overviewJson[oK].(map[string]interface{}))
// handle 'last_ingested' as a string
if oK == "last_ingested" {
_, ok := oV.(string)
assert.True(t, ok)
continue
}

// assert the value is a map for other keys
mapValue, ok := oV.(map[string]interface{})
assert.True(t, ok)

for k, v := range oV.(map[string]interface{}) {
for k, v := range mapValue {
key := k
assert.NotNil(t, v)
value := v.(float64)
value, ok := v.(float64)
assert.True(t, ok)
assert.NotNil(t, key)
assert.NotEmpty(t, key)
assert.NotEmpty(t, value)
Expand Down
4 changes: 4 additions & 0 deletions src/api/tests/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func GetVariantsOverview(_t *testing.T, _cfg *models.Config) map[string]interfac
assert.True(_t, sidkOk)
assert.NotNil(_t, sampleIDsKey)

lastIngestionKey, likOk := overviewRespJson["last_ingested"]
assert.True(_t, likOk)
assert.NotNil(_t, lastIngestionKey)

return overviewRespJson
}

Expand Down

0 comments on commit 3b29d39

Please sign in to comment.