From 0dddd64dae731c6e24a32b207163edd33b0afb54 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Fri, 11 Aug 2023 11:15:46 +0200 Subject: [PATCH] [core] Add support for odc_extract_topology_resources flag --- core/integration/odc/handlers.go | 37 ++++++++++++++---- core/integration/odc/plugin.go | 64 +++++++++++++++++++++----------- 2 files changed, 73 insertions(+), 28 deletions(-) diff --git a/core/integration/odc/handlers.go b/core/integration/odc/handlers.go index fe480673e..980a5ea00 100644 --- a/core/integration/odc/handlers.go +++ b/core/integration/odc/handlers.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "sync" "time" @@ -570,7 +571,10 @@ func handleRun(ctx context.Context, odcClient *RpcClient, isManualXml bool, argu defer log.Trace("END handleRun") // RUN request, includes INITIALIZE+SUBMIT+ACTIVATE - var topology, script, plugin, resources string + var ( + topology, script, plugin, resources, extractTopoResourcesS string + extractTopoResources bool + ) exists := false topology, exists = arguments["topology"] @@ -581,20 +585,39 @@ func handleRun(ctx context.Context, odcClient *RpcClient, isManualXml bool, argu if !isManualXml && (!exists || len(script) == 0) { return errors.New("empty script received") } + extractTopoResourcesS, exists = arguments["extractTopoResources"] + if exists && len(extractTopoResourcesS) > 0 { + var err error + extractTopoResources, err = strconv.ParseBool(extractTopoResourcesS) + if err != nil { + return errors.New("invalid extractTopoResources value received") + } + } + + // absence of plugin and resources is only a problem if we don't extract resources from topology plugin, exists = arguments["plugin"] - if !exists || len(plugin) == 0 { + if !extractTopoResources && (!exists || len(plugin) == 0) { return errors.New("empty plugin received") } resources, exists = arguments["resources"] - if !exists || len(resources) == 0 { + if !extractTopoResources && (!exists || len(resources) == 0) { return errors.New("empty resources received") } - runRequest := &odcpb.RunRequest{ - Partitionid: envId, - Plugin: plugin, - Resources: resources, + var runRequest *odcpb.RunRequest + if extractTopoResources { + runRequest = &odcpb.RunRequest{ + Partitionid: envId, + ExtractTopoResources: extractTopoResources, + } + } else { + runRequest = &odcpb.RunRequest{ + Partitionid: envId, + Plugin: plugin, + Resources: resources, + } } + // We ask this ODC call to complete within our own DEADLINE, minus 1 second ctxDeadline, ok := ctx.Deadline() if ok { diff --git a/core/integration/odc/plugin.go b/core/integration/odc/plugin.go index fc781698e..980b723a1 100644 --- a/core/integration/odc/plugin.go +++ b/core/integration/odc/plugin.go @@ -911,11 +911,26 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var ( pdpConfigOption, script, topology, plugin, resources string + extractTopoResources bool ) ok := false isManualXml := false callFailedStr := "EPN PartitionInitialize call failed" + extractTopoResourcesS, extractTopoResourcesSok := varStack["odc_extract_topology_resources"] + if extractTopoResourcesSok && extractTopoResourcesS != "" { // if set and true, plugin and resources are not to be included in requests + extractTopoResources, err = strconv.ParseBool(extractTopoResourcesS) + if err != nil { + msg := "cannot parse odc_extract_topology_resources" + log.WithField("partition", envId). + WithField("call", "PartitionInitialize"). + Error(msg) + call.VarStack["__call_error_reason"] = msg + call.VarStack["__call_error"] = callFailedStr + return + } + } + pdpConfigOption, ok = varStack["pdp_config_option"] if !ok { msg := "cannot acquire PDP workflow configuration mode" @@ -966,26 +981,32 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - plugin, ok = varStack["odc_plugin"] - if !ok { - msg := "cannot acquire ODC RMS plugin declaration" - log.WithField("partition", envId). - WithField("call", "PartitionInitialize"). - Error(msg) - call.VarStack["__call_error_reason"] = msg - call.VarStack["__call_error"] = callFailedStr - return - } + if !extractTopoResources { + plugin, ok = varStack["odc_plugin"] + if !ok { + msg := "cannot acquire ODC RMS plugin declaration" + log.WithField("partition", envId). + WithField("call", "PartitionInitialize"). + Error(msg) + call.VarStack["__call_error_reason"] = msg + call.VarStack["__call_error"] = callFailedStr + return + } - resources, ok = varStack["odc_resources"] - if !ok { - msg := "cannot acquire ODC resources declaration" + resources, ok = varStack["odc_resources"] + if !ok { + msg := "cannot acquire ODC resources declaration" + log.WithField("partition", envId). + WithField("call", "PartitionInitialize"). + Error(msg) + call.VarStack["__call_error_reason"] = msg + call.VarStack["__call_error"] = callFailedStr + return + } + } else { log.WithField("partition", envId). WithField("call", "PartitionInitialize"). - Error(msg) - call.VarStack["__call_error_reason"] = msg - call.VarStack["__call_error"] = callFailedStr - return + Info("odc_extract_topology_resources is set to true, plugin and resources will not be included in the ODC Run request") } timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId) @@ -994,10 +1015,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { defer cancel() err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{ - "topology": topology, - "script": script, - "plugin": plugin, - "resources": resources, + "topology": topology, + "script": script, + "plugin": plugin, + "resources": resources, + "extractTopoResources": strconv.FormatBool(extractTopoResources), }, paddingTimeout, envId) if err != nil {