Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor datasetIngestor command further #103

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 119 additions & 100 deletions cmd/commands/datasetIngestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("You are about to add a dataset to the === %s === data catalog environment...", env)
color.Unset()

// TODO: change pointer parameter types to values as they shouldn't be modified by the function
user, accessGroups := authenticate(RealAuthenticator{}, client, APIServer, userpass, token)

/* TODO Add info about policy settings and that autoarchive will take place or not */
Expand Down Expand Up @@ -208,12 +207,15 @@ For Windows you need instead to specify -user username:password on the command l
// log.Printf("Selected folders: %v\n", folders)

// test if a sourceFolder already used in the past and give warning
log.Println("Testing for existing source folders...")
foundList, err := datasetIngestor.TestForExistingSourceFolder(datasetPaths, client, APIServer, user["accessToken"])
if err != nil {
log.Fatal(err)
}
color.Set(color.FgYellow)
fmt.Println("Warning! The following datasets have been found with the same sourceFolder: ")
if len(foundList) > 0 {
fmt.Println("Warning! The following datasets have been found with the same sourceFolder: ")
}
for _, element := range foundList {
fmt.Printf(" - PID: \"%s\", sourceFolder: \"%s\"\n", element.Pid, element.SourceFolder)
}
Expand All @@ -235,8 +237,13 @@ For Windows you need instead to specify -user username:password on the command l
// a destination location is defined by the archive system
// for now let the user decide if he needs a copy

// now everything is prepared, start to loop over all folders
var skip = ""
// now everything is prepared, prepare to loop over all folders
if nocopyFlag {
copyFlag = false
}
checkCentralAvailability := !(cmd.Flags().Changed("copy") || cmd.Flags().Changed("nocopy") || beamlineAccount || copyFlag)
skip := ""

// check if skip flag is globally defined via flags:
if cmd.Flags().Changed("linkfiles") {
switch linkfiles {
Expand All @@ -249,8 +256,9 @@ For Windows you need instead to specify -user username:password on the command l
}
}

var datasetList []string
var archivableDatasetList []string
for _, datasetSourceFolder := range datasetPaths {
log.Printf("===== Ingesting: \"%s\" =====\n", datasetSourceFolder)
// ignore empty lines
if datasetSourceFolder == "" {
// NOTE if there are empty source folder(s), shouldn't we raise an error?
Expand All @@ -259,123 +267,133 @@ For Windows you need instead to specify -user username:password on the command l
metaDataMap["sourceFolder"] = datasetSourceFolder
log.Printf("Scanning files in dataset %s", datasetSourceFolder)

// get filelist of dataset
fullFileArray, startTime, endTime, owner, numFiles, totalSize, err :=
datasetIngestor.GetLocalFileList(datasetSourceFolder, datasetFileListTxt, &skip)
if err != nil {
log.Fatalf("Can't gather the filelist of \"%s\"", datasetSourceFolder)
}

//log.Printf("full fileListing: %v\n Start and end time: %s %s\n ", fullFileArray, startTime, endTime)
log.Printf("The dataset contains %v files with a total size of %v bytes.", numFiles, totalSize)

// filecount checks
if totalSize == 0 {
emptyDatasets++
color.Set(color.FgRed)
log.Println("This dataset contains no files and will therefore NOT be stored. ")
log.Printf("\"%s\" dataset cannot be ingested - contains no files\n", datasetSourceFolder)
color.Unset()
} else if numFiles > TOTAL_MAXFILES {
continue
}
if numFiles > TOTAL_MAXFILES {
tooLargeDatasets++
color.Set(color.FgRed)
log.Printf("This dataset exceeds the current filecount limit of the archive system of %v files and will therefore NOT be stored.\n", TOTAL_MAXFILES)
log.Printf("\"%s\" dataset cannot be ingested - too many files: has %d, max. %d\n", datasetSourceFolder, numFiles, TOTAL_MAXFILES)
color.Unset()
} else {
// TODO: change tapecopies param type of UpadateMetaData from pointer to regular int
// (it's not changed within the function)
datasetIngestor.UpdateMetaData(client, APIServer, user, originalMap, metaDataMap, startTime, endTime, owner, tapecopies)
pretty, _ := json.MarshalIndent(metaDataMap, "", " ")

log.Printf("Updated metadata object:\n%s\n", pretty)

// check if data is accesible at archive server, unless beamline account (assumed to be centrally available always)
// and unless copy flag defined via command line
if !copyFlag && !nocopyFlag { // NOTE this whole copyFlag, nocopyFlag ordeal makes no sense whatsoever
if !beamlineAccount {
sshErr, otherErr := datasetIngestor.CheckDataCentrallyAvailableSsh(user["username"], RSYNCServer, datasetSourceFolder, os.Stdout)
if otherErr != nil {
log.Fatalf("CheckDataCentrallyAvailableSsh returned an error: %v\n", otherErr)
}
if sshErr != nil {
color.Set(color.FgYellow)
log.Printf("The source folder %v is not centrally available (decentral use case).\nThe data must first be copied to a rsync cache server.\n ", datasetSourceFolder)
color.Unset()
copyFlag = true
// check if user account
if len(accessGroups) == 0 {
color.Set(color.FgRed)
log.Println("For the decentral case you must use a personal account. Beamline accounts are not supported.")
color.Unset()
os.Exit(1)
}
if !noninteractiveFlag {
log.Printf("Do you want to continue (Y/n)? ")
scanner.Scan()
continueFlag := scanner.Text()
if continueFlag == "n" {
log.Fatalln("Further ingests interrupted because decentral case detected, but no copy wanted.")
}
}
continue
}

// NOTE: only tapecopies=1 or 2 does something if set.
if tapecopies == 2 {
consolethinks marked this conversation as resolved.
Show resolved Hide resolved
color.Set(color.FgYellow)
log.Printf("Note: this dataset, if archived, will be copied to two tape copies")
color.Unset()
}
datasetIngestor.UpdateMetaData(client, APIServer, user, originalMap, metaDataMap, startTime, endTime, owner, tapecopies)
pretty, _ := json.MarshalIndent(metaDataMap, "", " ")

log.Printf("Updated metadata object:\n%s\n", pretty)

// check if data is accesible at archive server, unless beamline account (assumed to be centrally available always)
// and unless (no)copy flag defined via command line
if checkCentralAvailability {
consolethinks marked this conversation as resolved.
Show resolved Hide resolved
sshErr, otherErr := datasetIngestor.CheckDataCentrallyAvailableSsh(user["username"], RSYNCServer, datasetSourceFolder, os.Stdout)
if otherErr != nil {
log.Fatalln("Cannot check if data is centrally available:", otherErr)
}
// if the ssh command's error is not nil, the dataset is *likely* to be not centrally available (maybe should check the error returned)
if sshErr != nil {
color.Set(color.FgYellow)
log.Printf("The source folder %v is not centrally available.\nThe data must first be copied.\n ", datasetSourceFolder)
color.Unset()
copyFlag = true
// check if user account
if len(accessGroups) == 0 {
color.Set(color.FgRed)
log.Println("For copying, you must use a personal account. Beamline accounts are not supported.")
color.Unset()
os.Exit(1)
}
if !noninteractiveFlag {
log.Printf("Do you want to continue (Y/n)? ")
scanner.Scan()
continueFlag := scanner.Text()
if continueFlag == "n" {
log.Fatalln("Further ingests interrupted because copying is needed, but no copy wanted.")
}
} else {
copyFlag = false // beamline accounts don't need copying then, but is beamline account checking needed outside PSI?
}
}
}

if ingestFlag {
// create ingest . For decentral case delay setting status to archivable until data is copied
archivable := false
if _, ok := metaDataMap["datasetlifecycle"]; !ok {
metaDataMap["datasetlifecycle"] = map[string]interface{}{}
}
if copyFlag { // IDEA: maybe add a flag to indicate that we want to copy later?
// do not override existing fields
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = false
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "filesNotYetAvailable"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = false
} else {
if !copyFlag {
// NOTE *in this case* copyflag is ALWAYS false, nocopyFlag is ALWAYS true
// why is this not just an assignment to FALSE then?
copyFlag = !nocopyFlag
}
archivable = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "datasetCreated"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = true
}
if ingestFlag {
// create ingest . For decentral case delay setting status to archivable until data is copied
archivable := false
if _, ok := metaDataMap["datasetlifecycle"]; !ok {
metaDataMap["datasetlifecycle"] = map[string]interface{}{}
}
if copyFlag {
// do not override existing fields
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = false
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "filesNotYetAvailable"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = false
} else {
archivable = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["isOnCentralDisk"] = true
metaDataMap["datasetlifecycle"].(map[string]interface{})["archiveStatusMessage"] = "datasetCreated"
metaDataMap["datasetlifecycle"].(map[string]interface{})["archivable"] = true
}
datasetId, err := datasetIngestor.IngestDataset(client, APIServer, metaDataMap, fullFileArray, user)
log.Println("Ingesting dataset...")
datasetId, err := datasetIngestor.IngestDataset(client, APIServer, metaDataMap, fullFileArray, user)
if err != nil {
log.Fatal("Couldn't ingest dataset:", err)
}
log.Println("Dataset created:", datasetId)
// add attachment optionally
if addAttachment != "" {
log.Println("Adding attachment...")
err := datasetIngestor.AddAttachment(client, APIServer, datasetId, metaDataMap, user["accessToken"], addAttachment, addCaption)
if err != nil {
log.Fatalf("ingestion returned an error: %v\n", err)
log.Println("Couldn't add attachment:", err)
}
// add attachment optionally
if addAttachment != "" {
err := datasetIngestor.AddAttachment(client, APIServer, datasetId, metaDataMap, user["accessToken"], addAttachment, addCaption)
log.Printf("Attachment file %v added to dataset %v\n", addAttachment, datasetId)
}
if copyFlag {
// TODO rewrite SyncDataToFileserver
log.Println("Syncing files to cache server...")
err := datasetIngestor.SyncLocalDataToFileserver(datasetId, user, RSYNCServer, datasetSourceFolder, absFileListing, os.Stdout)
if err == nil {
// delayed enabling
archivable = true
err := datasetIngestor.MarkFilesReady(client, APIServer, datasetId, user)
if err != nil {
log.Println("Couldn't add attachment:", err)
}
log.Printf("Attachment file %v added to dataset %v\n", addAttachment, datasetId)
}
if copyFlag {
err := datasetIngestor.SyncLocalDataToFileserver(datasetId, user, RSYNCServer, datasetSourceFolder, absFileListing, os.Stdout)
if err == nil {
// delayed enabling
archivable = true
datasetIngestor.MarkFilesReady(client, APIServer, datasetId, user)
} else {
color.Set(color.FgRed)
log.Printf("The command to copy files exited with error %v \n", err)
log.Printf("The dataset %v is not yet in an archivable state\n", datasetId)
// TODO let user decide to delete dataset entry
// datasetIngestor.DeleteDatasetEntry(client, APIServer, datasetId, user["accessToken"])
color.Unset()
log.Fatal("Couldn't mark files ready:", err)
}
} else {
color.Set(color.FgRed)
log.Printf("The command to copy files exited with error %v \n", err)
log.Printf("The dataset %v is not yet in an archivable state\n", datasetId)
// TODO let user decide to delete dataset entry
// datasetIngestor.DeleteDatasetEntry(client, APIServer, datasetId, user["accessToken"])
color.Unset()
}

if archivable {
datasetList = append(datasetList, datasetId)
}
log.Println("Syncing files - DONE")
}
datasetIngestor.ResetUpdatedMetaData(originalMap, metaDataMap)

if archivable {
archivableDatasetList = append(archivableDatasetList, datasetId)
}
}
datasetIngestor.ResetUpdatedMetaData(originalMap, metaDataMap) // I don't really get this...
consolethinks marked this conversation as resolved.
Show resolved Hide resolved
}

if !ingestFlag {
Expand All @@ -392,7 +410,7 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("Number of datasets not stored because of too many files:%v\nPlease note that this will cancel any subsequent archive steps from this job !\n", tooLargeDatasets)
}
color.Unset()
datasetIngestor.PrintFileInfos()
datasetIngestor.PrintFileInfos() // TODO: move this into cmd portion

// stop here if empty datasets appeared
if emptyDatasets > 0 || tooLargeDatasets > 0 {
Expand All @@ -403,17 +421,18 @@ For Windows you need instead to specify -user username:password on the command l
log.Printf("Submitting Archive Job for the ingested datasets.\n")
// TODO: change param type from pointer to regular as it is unnecessary
// for it to be passed as pointer
_, err := datasetUtils.CreateArchivalJob(client, APIServer, user, datasetList, &tapecopies)
jobId, err := datasetUtils.CreateArchivalJob(client, APIServer, user, archivableDatasetList, &tapecopies)
if err != nil {
color.Set(color.FgRed)
log.Printf("Could not create the archival job for the ingested datasets: %s", err.Error())
color.Unset()
}
log.Println("Submitted job:", jobId)
}

// print out results to STDOUT, one line per dataset
for i := 0; i < len(datasetList); i++ {
fmt.Println(datasetList[i])
for i := 0; i < len(archivableDatasetList); i++ {
fmt.Println(archivableDatasetList[i])
}

},
Expand All @@ -438,5 +457,5 @@ func init() {
datasetIngestorCmd.Flags().String("addcaption", "", "Optional caption to be stored with attachment (single dataset case only)")

datasetIngestorCmd.MarkFlagsMutuallyExclusive("testenv", "devenv", "localenv", "tunnelenv")
//datasetIngestorCmd.MarkFlagsMutuallyExclusive("nocopy", "copy")
datasetIngestorCmd.MarkFlagsMutuallyExclusive("nocopy", "copy")
sbliven marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 1 addition & 2 deletions cmd/commands/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestMainFlags(t *testing.T) {
"tunnelenv": false,
"noninteractive": true,
"copy": true,
"nocopy": true,
"nocopy": false,
"autoarchive": true,
"allowexistingsource": true,
"version": true,
Expand All @@ -190,7 +190,6 @@ func TestMainFlags(t *testing.T) {
"--token",
"token",
"--copy",
"--nocopy",
"--tapecopies",
"6571579",
"--autoarchive",
Expand Down