Skip to content

Commit

Permalink
[core] Enable RPC call logging in non-verbose operation
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jul 19, 2023
1 parent 25aaafa commit 077f9ee
Showing 1 changed file with 48 additions and 3 deletions.
51 changes: 48 additions & 3 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,27 @@ func NewServer(state *globalState) *grpc.Server {
}

func (m *RpcServer) logMethod() {
if !viper.GetBool("verbose") {
//if !viper.GetBool("verbose") {
// return
//}
pc, _, _, ok := runtime.Caller(1)
if !ok {
return
}
fun := runtime.FuncForPC(pc)
if fun == nil {
return
}
log.WithPrefix("rpcserver").
WithField("method", fun.Name()).
WithField("level", infologger.IL_Support).
Info("handling RPC request")
}

func (m *RpcServer) logMethodHandled() {
//if !viper.GetBool("verbose") {
// return
//}
pc, _, _, ok := runtime.Caller(1)
if !ok {
return
Expand All @@ -83,8 +101,8 @@ func (m *RpcServer) logMethod() {
}
log.WithPrefix("rpcserver").
WithField("method", fun.Name()).
WithField("level", infologger.IL_Trace).
Debug("handling RPC request")
WithField("level", infologger.IL_Support).
Info("handling RPC request DONE")
}

// Implements interface pb.ControlServer
Expand All @@ -96,6 +114,7 @@ type RpcServer struct {
func (m *RpcServer) GetIntegratedServices(ctx context.Context, empty *pb.Empty) (*pb.ListIntegratedServicesReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

r := &pb.ListIntegratedServicesReply{Services: nil}

Expand Down Expand Up @@ -143,6 +162,7 @@ func (*RpcServer) TrackStatus(*pb.StatusRequest, pb.Control_TrackStatusServer) e
func (m *RpcServer) GetFrameworkInfo(context.Context, *pb.GetFrameworkInfoRequest) (*pb.GetFrameworkInfoReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

maj, _ := strconv.ParseInt(product.VERSION_MAJOR, 10, 32)
min, _ := strconv.ParseInt(product.VERSION_MINOR, 10, 32)
Expand Down Expand Up @@ -198,6 +218,7 @@ func (*RpcServer) Teardown(context.Context, *pb.TeardownRequest) (*pb.TeardownRe
func (m *RpcServer) GetEnvironments(cxt context.Context, request *pb.GetEnvironmentsRequest) (*pb.GetEnvironmentsReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

r := &pb.GetEnvironmentsReply{
FrameworkId: m.state.taskman.GetFrameworkID(),
Expand Down Expand Up @@ -264,6 +285,8 @@ func (m *RpcServer) GetEnvironments(cxt context.Context, request *pb.GetEnvironm
func (m *RpcServer) NewEnvironment(cxt context.Context, request *pb.NewEnvironmentRequest) (reply *pb.NewEnvironmentReply, err error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

// NEW_ENVIRONMENT transition
// The following should
// 1) Create a new value of type Environment struct
Expand Down Expand Up @@ -367,6 +390,7 @@ func (m *RpcServer) NewEnvironment(cxt context.Context, request *pb.NewEnvironme
func (m *RpcServer) GetEnvironment(cxt context.Context, req *pb.GetEnvironmentRequest) (reply *pb.GetEnvironmentReply, err error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil || len(req.Id) == 0 {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -426,6 +450,7 @@ func (m *RpcServer) GetEnvironment(cxt context.Context, req *pb.GetEnvironmentRe
func (m *RpcServer) ControlEnvironment(cxt context.Context, req *pb.ControlEnvironmentRequest) (*pb.ControlEnvironmentReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil || len(req.Id) == 0 {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -485,6 +510,7 @@ func (*RpcServer) ModifyEnvironment(context.Context, *pb.ModifyEnvironmentReques
func (m *RpcServer) DestroyEnvironment(cxt context.Context, req *pb.DestroyEnvironmentRequest) (*pb.DestroyEnvironmentReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil || len(req.Id) == 0 {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -577,6 +603,8 @@ func (m *RpcServer) doTeardownAndCleanup(env *environment.Environment, force boo
func (m *RpcServer) GetActiveDetectors(_ context.Context, _ *pb.Empty) (*pb.GetActiveDetectorsReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

r := &pb.GetActiveDetectorsReply{
Detectors: make([]string, 0),
}
Expand All @@ -590,6 +618,7 @@ func (m *RpcServer) GetActiveDetectors(_ context.Context, _ *pb.Empty) (*pb.GetA
func (m *RpcServer) GetTasks(context.Context, *pb.GetTasksRequest) (*pb.GetTasksReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

tasks := m.state.taskman.GetTasks()
r := &pb.GetTasksReply{
Expand All @@ -602,6 +631,7 @@ func (m *RpcServer) GetTasks(context.Context, *pb.GetTasksRequest) (*pb.GetTasks
func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.GetTaskReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

task := m.state.taskman.GetTask(req.TaskId)
if task == nil {
Expand Down Expand Up @@ -653,6 +683,8 @@ func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.Ge
func (m *RpcServer) CleanupTasks(cxt context.Context, req *pb.CleanupTasksRequest) (*pb.CleanupTasksReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

idsToKill := req.GetTaskIds()

killed, running, err := m.doCleanupTasks(idsToKill)
Expand Down Expand Up @@ -683,6 +715,7 @@ func (m *RpcServer) doCleanupTasks(taskIds []string) (killedTaskInfos []*pb.Shor
func (m *RpcServer) GetRoles(cxt context.Context, req *pb.GetRolesRequest) (*pb.GetRolesReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil || len(req.EnvId) == 0 {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -710,6 +743,7 @@ func (m *RpcServer) GetRoles(cxt context.Context, req *pb.GetRolesRequest) (*pb.
func (m *RpcServer) GetWorkflowTemplates(cxt context.Context, req *pb.GetWorkflowTemplatesRequest) (*pb.GetWorkflowTemplatesReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -776,6 +810,7 @@ func (m *RpcServer) GetWorkflowTemplates(cxt context.Context, req *pb.GetWorkflo
func (m *RpcServer) ListRepos(cxt context.Context, req *pb.ListReposRequest) (*pb.ListReposReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand Down Expand Up @@ -804,6 +839,7 @@ func (m *RpcServer) ListRepos(cxt context.Context, req *pb.ListReposRequest) (*p
func (m *RpcServer) AddRepo(cxt context.Context, req *pb.AddRepoRequest) (*pb.AddRepoReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -829,6 +865,7 @@ func (m *RpcServer) AddRepo(cxt context.Context, req *pb.AddRepoRequest) (*pb.Ad
func (m *RpcServer) RemoveRepo(cxt context.Context, req *pb.RemoveRepoRequest) (*pb.RemoveRepoReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -846,6 +883,7 @@ func (m *RpcServer) RemoveRepo(cxt context.Context, req *pb.RemoveRepoRequest) (
func (m *RpcServer) RefreshRepos(cxt context.Context, req *pb.RefreshReposRequest) (*pb.Empty, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -867,6 +905,7 @@ func (m *RpcServer) RefreshRepos(cxt context.Context, req *pb.RefreshReposReques
func (m *RpcServer) SetDefaultRepo(cxt context.Context, req *pb.SetDefaultRepoRequest) (*pb.Empty, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -883,6 +922,7 @@ func (m *RpcServer) SetDefaultRepo(cxt context.Context, req *pb.SetDefaultRepoRe
func (m *RpcServer) SetGlobalDefaultRevision(cxt context.Context, req *pb.SetGlobalDefaultRevisionRequest) (*pb.Empty, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -899,6 +939,7 @@ func (m *RpcServer) SetGlobalDefaultRevision(cxt context.Context, req *pb.SetGlo
func (m *RpcServer) SetRepoDefaultRevision(cxt context.Context, req *pb.SetRepoDefaultRevisionRequest) (*pb.SetRepoDefaultRevisionReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

if req == nil {
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
Expand All @@ -916,6 +957,8 @@ func (m *RpcServer) SetRepoDefaultRevision(cxt context.Context, req *pb.SetRepoD
func (m *RpcServer) Subscribe(req *pb.SubscribeRequest, srv pb.Control_SubscribeServer) error {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

for {
ch, ok := m.streams.GetChannel(req.GetId())
if !ok {
Expand All @@ -940,6 +983,8 @@ func (m *RpcServer) Subscribe(req *pb.SubscribeRequest, srv pb.Control_Subscribe
func (m *RpcServer) NewAutoEnvironment(cxt context.Context, request *pb.NewAutoEnvironmentRequest) (*pb.NewAutoEnvironmentReply, error) {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("rpcserver"))
m.logMethod()
defer m.logMethodHandled()

ch := make(chan *pb.Event)
m.streams.add(request.GetId(), ch)
sub := environment.SubscribeToStream(ch)
Expand Down

0 comments on commit 077f9ee

Please sign in to comment.