Skip to content

Commit

Permalink
[core] Add support for odc_extract_topology_resources flag
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Aug 11, 2023
1 parent a03c5c9 commit 0dddd64
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 28 deletions.
37 changes: 30 additions & 7 deletions core/integration/odc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -570,7 +571,10 @@ func handleRun(ctx context.Context, odcClient *RpcClient, isManualXml bool, argu
defer log.Trace("END handleRun")

// RUN request, includes INITIALIZE+SUBMIT+ACTIVATE
var topology, script, plugin, resources string
var (
topology, script, plugin, resources, extractTopoResourcesS string
extractTopoResources bool
)
exists := false

topology, exists = arguments["topology"]
Expand All @@ -581,20 +585,39 @@ func handleRun(ctx context.Context, odcClient *RpcClient, isManualXml bool, argu
if !isManualXml && (!exists || len(script) == 0) {
return errors.New("empty script received")
}
extractTopoResourcesS, exists = arguments["extractTopoResources"]
if exists && len(extractTopoResourcesS) > 0 {
var err error
extractTopoResources, err = strconv.ParseBool(extractTopoResourcesS)
if err != nil {
return errors.New("invalid extractTopoResources value received")
}
}

// absence of plugin and resources is only a problem if we don't extract resources from topology
plugin, exists = arguments["plugin"]
if !exists || len(plugin) == 0 {
if !extractTopoResources && (!exists || len(plugin) == 0) {
return errors.New("empty plugin received")
}
resources, exists = arguments["resources"]
if !exists || len(resources) == 0 {
if !extractTopoResources && (!exists || len(resources) == 0) {
return errors.New("empty resources received")
}

runRequest := &odcpb.RunRequest{
Partitionid: envId,
Plugin: plugin,
Resources: resources,
var runRequest *odcpb.RunRequest
if extractTopoResources {
runRequest = &odcpb.RunRequest{
Partitionid: envId,
ExtractTopoResources: extractTopoResources,
}
} else {
runRequest = &odcpb.RunRequest{
Partitionid: envId,
Plugin: plugin,
Resources: resources,
}
}

// We ask this ODC call to complete within our own DEADLINE, minus 1 second
ctxDeadline, ok := ctx.Deadline()
if ok {
Expand Down
64 changes: 43 additions & 21 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,11 +911,26 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

var (
pdpConfigOption, script, topology, plugin, resources string
extractTopoResources bool
)
ok := false
isManualXml := false
callFailedStr := "EPN PartitionInitialize call failed"

extractTopoResourcesS, extractTopoResourcesSok := varStack["odc_extract_topology_resources"]
if extractTopoResourcesSok && extractTopoResourcesS != "" { // if set and true, plugin and resources are not to be included in requests
extractTopoResources, err = strconv.ParseBool(extractTopoResourcesS)
if err != nil {
msg := "cannot parse odc_extract_topology_resources"
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
Error(msg)
call.VarStack["__call_error_reason"] = msg
call.VarStack["__call_error"] = callFailedStr
return
}
}

pdpConfigOption, ok = varStack["pdp_config_option"]
if !ok {
msg := "cannot acquire PDP workflow configuration mode"
Expand Down Expand Up @@ -966,26 +981,32 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

plugin, ok = varStack["odc_plugin"]
if !ok {
msg := "cannot acquire ODC RMS plugin declaration"
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
Error(msg)
call.VarStack["__call_error_reason"] = msg
call.VarStack["__call_error"] = callFailedStr
return
}
if !extractTopoResources {
plugin, ok = varStack["odc_plugin"]
if !ok {
msg := "cannot acquire ODC RMS plugin declaration"
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
Error(msg)
call.VarStack["__call_error_reason"] = msg
call.VarStack["__call_error"] = callFailedStr
return
}

resources, ok = varStack["odc_resources"]
if !ok {
msg := "cannot acquire ODC resources declaration"
resources, ok = varStack["odc_resources"]
if !ok {
msg := "cannot acquire ODC resources declaration"
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
Error(msg)
call.VarStack["__call_error_reason"] = msg
call.VarStack["__call_error"] = callFailedStr
return
}
} else {
log.WithField("partition", envId).
WithField("call", "PartitionInitialize").
Error(msg)
call.VarStack["__call_error_reason"] = msg
call.VarStack["__call_error"] = callFailedStr
return
Info("odc_extract_topology_resources is set to true, plugin and resources will not be included in the ODC Run request")
}

timeout := callable.AcquireTimeout(ODC_PARTITIONINITIALIZE_TIMEOUT, varStack, "PartitionInitialize", envId)
Expand All @@ -994,10 +1015,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
defer cancel()

err = handleRun(ctx, p.odcClient, isManualXml, map[string]string{
"topology": topology,
"script": script,
"plugin": plugin,
"resources": resources,
"topology": topology,
"script": script,
"plugin": plugin,
"resources": resources,
"extractTopoResources": strconv.FormatBool(extractTopoResources),
},
paddingTimeout, envId)
if err != nil {
Expand Down

0 comments on commit 0dddd64

Please sign in to comment.