Skip to content

Commit

Permalink
[core] Emit environment events from CreateAutoEnvironment
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Sep 3, 2024
1 parent d0fc313 commit 157f3b3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
44 changes: 44 additions & 0 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,21 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
}
}

lastRequestUser := &evpb.User{}
lastRequestUserJ, ok := userVars["last_request_user"]
if ok {
_ = json.Unmarshal([]byte(lastRequestUserJ), lastRequestUser)
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: "PENDING",
Transition: "CREATE",
TransitionStep: "before_CREATE",
Message: "instantiating",
LastRequestUser: lastRequestUser,
})

env, err := newEnvironment(envUserVars, newId)
newEnvId := uid.NilID()
if err == nil && env != nil {
Expand All @@ -1173,6 +1188,15 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
"partition": newEnvId.String(),
}).Info("creating new automatic environment")

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: "PENDING",
Transition: "CREATE",
TransitionStep: "before_CREATE",
Message: "running hooks",
LastRequestUser: lastRequestUser,
})

env.addSubscription(sub)
defer env.closeStream()

Expand All @@ -1193,6 +1217,15 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str
Warn("parse workflow public info failed.")
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: "PENDING",
Transition: "CREATE",
TransitionStep: "CREATE",
Message: "loading workflow",
LastRequestUser: lastRequestUser,
})

env.workflow, err = envs.loadWorkflow(workflowPath, env.wfAdapter, workflowUserVars, env.BaseConfigStack)
if err != nil {
err = fmt.Errorf("cannot load workflow template: %w", err)
Expand All @@ -1202,6 +1235,17 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str

env.Public, env.Description, _ = parseWorkflowPublicInfo(workflowPath)

cvs, _ := env.Workflow().ConsolidatedVarStack()
the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: env.CurrentState(),
Transition: "CREATE",
TransitionStep: "after_CREATE",
Message: "workflow loaded",
Vars: cvs, // we push the full var stack of the root role in the workflow loaded event
LastRequestUser: lastRequestUser,
})

log.WithField("method", "CreateAutoEnvironment").
WithField("level", infologger.IL_Devel).
Debug("envman write lock")
Expand Down
11 changes: 10 additions & 1 deletion core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,16 @@ func (m *RpcServer) NewAutoEnvironment(cxt context.Context, request *pb.NewAutoE
m.envStreams.add(request.GetId(), ch)
sub := environment.SubscribeToStream(ch)
id := uid.New()
go m.state.environments.CreateAutoEnvironment(request.GetWorkflowTemplate(), request.GetVars(), id, sub)

inputUserVars := request.GetVars()
if len(inputUserVars) == 0 {
inputUserVars = make(map[string]string)
}
// we store the last known request user in the environment
lastRequestUserJ, _ := json.Marshal(request.RequestUser)
inputUserVars["last_request_user"] = string(lastRequestUserJ[:])

go m.state.environments.CreateAutoEnvironment(request.GetWorkflowTemplate(), inputUserVars, id, sub)
r := &pb.NewAutoEnvironmentReply{}
return r, nil
}

0 comments on commit 157f3b3

Please sign in to comment.