Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
lvphps authored and kevwan committed Sep 16, 2023
1 parent f82b652 commit 6f101b1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
20 changes: 18 additions & 2 deletions gateway/internal/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,42 @@ type EventHandler struct {
Status *status.Status
writer io.Writer
marshaler jsonpb.Marshaler

Message proto.Message
RespHandler func(writer io.Writer, status *status.Status, message proto.Message)
}

func NewEventHandler(writer io.Writer, resolver jsonpb.AnyResolver) *EventHandler {
return &EventHandler{
type HandlerOption func(handler *EventHandler)

func NewEventHandler(writer io.Writer, resolver jsonpb.AnyResolver, opts ...HandlerOption) *EventHandler {
handler := &EventHandler{
writer: writer,
marshaler: jsonpb.Marshaler{
EmitDefaults: true,
AnyResolver: resolver,
},
}
for _, opt := range opts {
opt(handler)
}
return handler
}

func (h *EventHandler) OnReceiveResponse(message proto.Message) {
if h.RespHandler != nil {
h.Message = message
return
}
if err := h.marshaler.Marshal(h.writer, message); err != nil {
logx.Error(err)
}
}

func (h *EventHandler) OnReceiveTrailers(status *status.Status, _ metadata.MD) {
h.Status = status
if h.RespHandler != nil {
h.RespHandler(h.writer, h.Status, h.Message)
}
}

func (h *EventHandler) OnResolveMethod(_ *desc.MethodDescriptor) {
Expand Down
12 changes: 11 additions & 1 deletion gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package gateway
import (
"context"
"fmt"
"io"
"net/http"
"strings"

"github.com/fullstorydev/grpcurl"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/zeromicro/go-zero/rest/httpx"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type (
Expand All @@ -25,6 +28,7 @@ type (
upstreams []Upstream
processHeader func(http.Header) []string
dialer func(conf zrpc.RpcClientConf) zrpc.Client
respHandler func(writer io.Writer, status *status.Status, message proto.Message)
}

// Option defines the method to customize Server.
Expand Down Expand Up @@ -55,6 +59,10 @@ func (s *Server) Stop() {
s.Server.Stop()
}

func (s *Server) SetRespHandler(handler func(writer io.Writer, status *status.Status, message proto.Message)) {
s.respHandler = handler
}

func (s *Server) build() error {
if err := s.ensureUpstreamNames(); err != nil {
return err
Expand Down Expand Up @@ -128,7 +136,9 @@ func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.A
}

w.Header().Set(httpx.ContentType, httpx.JsonContentType)
handler := internal.NewEventHandler(w, resolver)
handler := internal.NewEventHandler(w, resolver, func(eventHandler *internal.EventHandler) {
eventHandler.RespHandler = s.respHandler
})
if err := grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header),
handler, parser.Next); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
Expand Down

0 comments on commit 6f101b1

Please sign in to comment.