diff --git a/core/routing.go b/core/routing.go index 89d3db8..1b6f700 100644 --- a/core/routing.go +++ b/core/routing.go @@ -45,6 +45,11 @@ var pmPeerErrors = prometheus.NewCounter(prometheus.CounterOpts{ Name: "peer_errors", Help: "Number of peer connection errors", }) +var pmActiveUpstreamConnections = prometheus.NewGauge(prometheus.GaugeOpts{ + Subsystem: "route", + Name: "active_upstream_connections", + Help: "Number of active upstream peer connections", +}) var pmSubscriptions = prometheus.NewGauge(prometheus.GaugeOpts{ Subsystem: "route", Name: "active_subscriptions", @@ -128,6 +133,8 @@ type Terminus struct { uplinkConns map[string]*PeerConnection uplinkConnMu sync.RWMutex + + activeUplink int64 } type PeerConnection struct { @@ -598,8 +605,6 @@ func (t *Terminus) downstreamPeer(ctx context.Context, q *Queue) (err error) { } } func (t *Terminus) beginUpstreamPeering(q *Queue, dr *DesignatedRouter) { - //TODO add transport credentials using auth manager - //Do TLS handshake with signature on the self-signed cert, ala BW2 for { ctx, cancel := context.WithCancel(context.Background()) err := t.upstreamPeer(ctx, q, dr) @@ -623,6 +628,10 @@ func (t *Terminus) GetDesignatedRouterConnection(namespace string) *PeerConnecti return conn } +func (t *Terminus) ConnectionStatus() (int64, int64) { + return t.activeUplink, int64(len(t.drnamespaces)) +} + func (t *Terminus) upstreamPeer(ctx context.Context, q *Queue, dr *DesignatedRouter) (err error) { t.uplinkConnMu.Lock() delete(t.uplinkConns, dr.Namespace) @@ -645,7 +654,11 @@ func (t *Terminus) upstreamPeer(ctx context.Context, q *Queue, dr *DesignatedRou err = e.(error) } conn.Close() + v := atomic.AddInt64(&t.activeUplink, -1) + pmActiveUpstreamConnections.Set(float64(v)) }() + v := atomic.AddInt64(&t.activeUplink, 1) + pmActiveUpstreamConnections.Set(float64(v)) peerConn := &PeerConnection{ Conn: conn, Ctx: ctx, diff --git a/mqpb/wavemq.pb.go b/mqpb/wavemq.pb.go index cc02c40..ac4c1ef 100644 --- a/mqpb/wavemq.pb.go +++ b/mqpb/wavemq.pb.go @@ -27,6 +27,82 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +type ConnectionStatusParams struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ConnectionStatusParams) Reset() { *m = ConnectionStatusParams{} } +func (m *ConnectionStatusParams) String() string { return proto.CompactTextString(m) } +func (*ConnectionStatusParams) ProtoMessage() {} +func (*ConnectionStatusParams) Descriptor() ([]byte, []int) { + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{0} +} +func (m *ConnectionStatusParams) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ConnectionStatusParams.Unmarshal(m, b) +} +func (m *ConnectionStatusParams) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ConnectionStatusParams.Marshal(b, m, deterministic) +} +func (dst *ConnectionStatusParams) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConnectionStatusParams.Merge(dst, src) +} +func (m *ConnectionStatusParams) XXX_Size() int { + return xxx_messageInfo_ConnectionStatusParams.Size(m) +} +func (m *ConnectionStatusParams) XXX_DiscardUnknown() { + xxx_messageInfo_ConnectionStatusParams.DiscardUnknown(m) +} + +var xxx_messageInfo_ConnectionStatusParams proto.InternalMessageInfo + +type ConnectionStatusResponse struct { + TotalPeers int32 `protobuf:"varint,1,opt,name=totalPeers,proto3" json:"totalPeers,omitempty"` + ConnectedPeers int32 `protobuf:"varint,2,opt,name=connectedPeers,proto3" json:"connectedPeers,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ConnectionStatusResponse) Reset() { *m = ConnectionStatusResponse{} } +func (m *ConnectionStatusResponse) String() string { return proto.CompactTextString(m) } +func (*ConnectionStatusResponse) ProtoMessage() {} +func (*ConnectionStatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{1} +} +func (m *ConnectionStatusResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ConnectionStatusResponse.Unmarshal(m, b) +} +func (m *ConnectionStatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ConnectionStatusResponse.Marshal(b, m, deterministic) +} +func (dst *ConnectionStatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConnectionStatusResponse.Merge(dst, src) +} +func (m *ConnectionStatusResponse) XXX_Size() int { + return xxx_messageInfo_ConnectionStatusResponse.Size(m) +} +func (m *ConnectionStatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ConnectionStatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ConnectionStatusResponse proto.InternalMessageInfo + +func (m *ConnectionStatusResponse) GetTotalPeers() int32 { + if m != nil { + return m.TotalPeers + } + return 0 +} + +func (m *ConnectionStatusResponse) GetConnectedPeers() int32 { + if m != nil { + return m.ConnectedPeers + } + return 0 +} + type QueryParams struct { Perspective *Perspective `protobuf:"bytes,1,opt,name=perspective,proto3" json:"perspective,omitempty"` Namespace []byte `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` @@ -42,7 +118,7 @@ func (m *QueryParams) Reset() { *m = QueryParams{} } func (m *QueryParams) String() string { return proto.CompactTextString(m) } func (*QueryParams) ProtoMessage() {} func (*QueryParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{0} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{2} } func (m *QueryParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_QueryParams.Unmarshal(m, b) @@ -102,7 +178,7 @@ func (m *QueryMessage) Reset() { *m = QueryMessage{} } func (m *QueryMessage) String() string { return proto.CompactTextString(m) } func (*QueryMessage) ProtoMessage() {} func (*QueryMessage) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{1} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{3} } func (m *QueryMessage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_QueryMessage.Unmarshal(m, b) @@ -151,7 +227,7 @@ func (m *PeerQueryParams) Reset() { *m = PeerQueryParams{} } func (m *PeerQueryParams) String() string { return proto.CompactTextString(m) } func (*PeerQueryParams) ProtoMessage() {} func (*PeerQueryParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{2} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{4} } func (m *PeerQueryParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerQueryParams.Unmarshal(m, b) @@ -218,7 +294,7 @@ func (m *PeerUnsubscribeParams) Reset() { *m = PeerUnsubscribeParams{} } func (m *PeerUnsubscribeParams) String() string { return proto.CompactTextString(m) } func (*PeerUnsubscribeParams) ProtoMessage() {} func (*PeerUnsubscribeParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{3} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{5} } func (m *PeerUnsubscribeParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerUnsubscribeParams.Unmarshal(m, b) @@ -263,7 +339,7 @@ func (m *PeerUnsubscribeResponse) Reset() { *m = PeerUnsubscribeResponse func (m *PeerUnsubscribeResponse) String() string { return proto.CompactTextString(m) } func (*PeerUnsubscribeResponse) ProtoMessage() {} func (*PeerUnsubscribeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{4} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{6} } func (m *PeerUnsubscribeResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerUnsubscribeResponse.Unmarshal(m, b) @@ -301,7 +377,7 @@ func (m *PeerPublishParams) Reset() { *m = PeerPublishParams{} } func (m *PeerPublishParams) String() string { return proto.CompactTextString(m) } func (*PeerPublishParams) ProtoMessage() {} func (*PeerPublishParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{5} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{7} } func (m *PeerPublishParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerPublishParams.Unmarshal(m, b) @@ -329,11 +405,7 @@ func (m *PeerPublishParams) GetMsg() *Message { } type PeerPublishResponse struct { - Error *Error `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - // The number of bytes free in the queue (after putting this message in) - Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` - // The number of items free in the queue (after putting this message in) - Length int64 `protobuf:"varint,3,opt,name=length,proto3" json:"length,omitempty"` + Error *Error `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -343,7 +415,7 @@ func (m *PeerPublishResponse) Reset() { *m = PeerPublishResponse{} } func (m *PeerPublishResponse) String() string { return proto.CompactTextString(m) } func (*PeerPublishResponse) ProtoMessage() {} func (*PeerPublishResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{6} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{8} } func (m *PeerPublishResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerPublishResponse.Unmarshal(m, b) @@ -370,20 +442,6 @@ func (m *PeerPublishResponse) GetError() *Error { return nil } -func (m *PeerPublishResponse) GetSize() int64 { - if m != nil { - return m.Size - } - return 0 -} - -func (m *PeerPublishResponse) GetLength() int64 { - if m != nil { - return m.Length - } - return 0 -} - type PeerSubscriptionTBS struct { SourceEntity []byte `protobuf:"bytes,1,opt,name=sourceEntity,proto3" json:"sourceEntity,omitempty"` Namespace []byte `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` @@ -404,7 +462,7 @@ func (m *PeerSubscriptionTBS) Reset() { *m = PeerSubscriptionTBS{} } func (m *PeerSubscriptionTBS) String() string { return proto.CompactTextString(m) } func (*PeerSubscriptionTBS) ProtoMessage() {} func (*PeerSubscriptionTBS) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{7} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{9} } func (m *PeerSubscriptionTBS) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerSubscriptionTBS.Unmarshal(m, b) @@ -482,7 +540,7 @@ func (m *PeerSubscribeParams) Reset() { *m = PeerSubscribeParams{} } func (m *PeerSubscribeParams) String() string { return proto.CompactTextString(m) } func (*PeerSubscribeParams) ProtoMessage() {} func (*PeerSubscribeParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{8} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{10} } func (m *PeerSubscribeParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PeerSubscribeParams.Unmarshal(m, b) @@ -550,7 +608,7 @@ func (m *PublishParams) Reset() { *m = PublishParams{} } func (m *PublishParams) String() string { return proto.CompactTextString(m) } func (*PublishParams) ProtoMessage() {} func (*PublishParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{9} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{11} } func (m *PublishParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PublishParams.Unmarshal(m, b) @@ -630,7 +688,7 @@ func (m *PublishResponse) Reset() { *m = PublishResponse{} } func (m *PublishResponse) String() string { return proto.CompactTextString(m) } func (*PublishResponse) ProtoMessage() {} func (*PublishResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{10} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{12} } func (m *PublishResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PublishResponse.Unmarshal(m, b) @@ -674,7 +732,7 @@ func (m *MessageTBS) Reset() { *m = MessageTBS{} } func (m *MessageTBS) String() string { return proto.CompactTextString(m) } func (*MessageTBS) ProtoMessage() {} func (*MessageTBS) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{11} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{13} } func (m *MessageTBS) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageTBS.Unmarshal(m, b) @@ -754,7 +812,7 @@ func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{12} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{14} } func (m *Message) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Message.Unmarshal(m, b) @@ -842,7 +900,7 @@ func (m *PayloadObject) Reset() { *m = PayloadObject{} } func (m *PayloadObject) String() string { return proto.CompactTextString(m) } func (*PayloadObject) ProtoMessage() {} func (*PayloadObject) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{13} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{15} } func (m *PayloadObject) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PayloadObject.Unmarshal(m, b) @@ -902,7 +960,7 @@ func (m *SubscribeParams) Reset() { *m = SubscribeParams{} } func (m *SubscribeParams) String() string { return proto.CompactTextString(m) } func (*SubscribeParams) ProtoMessage() {} func (*SubscribeParams) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{14} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{16} } func (m *SubscribeParams) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SubscribeParams.Unmarshal(m, b) @@ -983,7 +1041,7 @@ func (m *SubscriptionMessage) Reset() { *m = SubscriptionMessage{} } func (m *SubscriptionMessage) String() string { return proto.CompactTextString(m) } func (*SubscriptionMessage) ProtoMessage() {} func (*SubscriptionMessage) Descriptor() ([]byte, []int) { - return fileDescriptor_wavemq_172e7fa8c7eb1b25, []int{15} + return fileDescriptor_wavemq_39f8c2d85be4999d, []int{17} } func (m *SubscriptionMessage) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SubscriptionMessage.Unmarshal(m, b) @@ -1018,6 +1076,8 @@ func (m *SubscriptionMessage) GetMessage() *Message { } func init() { + proto.RegisterType((*ConnectionStatusParams)(nil), "mqpb.ConnectionStatusParams") + proto.RegisterType((*ConnectionStatusResponse)(nil), "mqpb.ConnectionStatusResponse") proto.RegisterType((*QueryParams)(nil), "mqpb.QueryParams") proto.RegisterType((*QueryMessage)(nil), "mqpb.QueryMessage") proto.RegisterType((*PeerQueryParams)(nil), "mqpb.PeerQueryParams") @@ -1051,6 +1111,7 @@ type WAVEMQClient interface { Publish(ctx context.Context, in *PublishParams, opts ...grpc.CallOption) (*PublishResponse, error) Subscribe(ctx context.Context, in *SubscribeParams, opts ...grpc.CallOption) (WAVEMQ_SubscribeClient, error) Query(ctx context.Context, in *QueryParams, opts ...grpc.CallOption) (WAVEMQ_QueryClient, error) + ConnectionStatus(ctx context.Context, in *ConnectionStatusParams, opts ...grpc.CallOption) (*ConnectionStatusResponse, error) } type wAVEMQClient struct { @@ -1134,11 +1195,21 @@ func (x *wAVEMQQueryClient) Recv() (*QueryMessage, error) { return m, nil } +func (c *wAVEMQClient) ConnectionStatus(ctx context.Context, in *ConnectionStatusParams, opts ...grpc.CallOption) (*ConnectionStatusResponse, error) { + out := new(ConnectionStatusResponse) + err := c.cc.Invoke(ctx, "/mqpb.WAVEMQ/ConnectionStatus", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // WAVEMQServer is the server API for WAVEMQ service. type WAVEMQServer interface { Publish(context.Context, *PublishParams) (*PublishResponse, error) Subscribe(*SubscribeParams, WAVEMQ_SubscribeServer) error Query(*QueryParams, WAVEMQ_QueryServer) error + ConnectionStatus(context.Context, *ConnectionStatusParams) (*ConnectionStatusResponse, error) } func RegisterWAVEMQServer(s *grpc.Server, srv WAVEMQServer) { @@ -1205,6 +1276,24 @@ func (x *wAVEMQQueryServer) Send(m *QueryMessage) error { return x.ServerStream.SendMsg(m) } +func _WAVEMQ_ConnectionStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConnectionStatusParams) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WAVEMQServer).ConnectionStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mqpb.WAVEMQ/ConnectionStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WAVEMQServer).ConnectionStatus(ctx, req.(*ConnectionStatusParams)) + } + return interceptor(ctx, in, info, handler) +} + var _WAVEMQ_serviceDesc = grpc.ServiceDesc{ ServiceName: "mqpb.WAVEMQ", HandlerType: (*WAVEMQServer)(nil), @@ -1213,6 +1302,10 @@ var _WAVEMQ_serviceDesc = grpc.ServiceDesc{ MethodName: "Publish", Handler: _WAVEMQ_Publish_Handler, }, + { + MethodName: "ConnectionStatus", + Handler: _WAVEMQ_ConnectionStatus_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -1447,65 +1540,68 @@ var _WAVEMQPeering_serviceDesc = grpc.ServiceDesc{ Metadata: "wavemq.proto", } -func init() { proto.RegisterFile("wavemq.proto", fileDescriptor_wavemq_172e7fa8c7eb1b25) } - -var fileDescriptor_wavemq_172e7fa8c7eb1b25 = []byte{ - // 904 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0x4f, 0x8f, 0xe3, 0x34, - 0x14, 0x57, 0x92, 0xfe, 0x99, 0xbe, 0xb6, 0x33, 0xb3, 0x1e, 0x96, 0xcd, 0x94, 0x05, 0x4a, 0x0e, - 0x50, 0x09, 0x31, 0x1a, 0x75, 0x97, 0x03, 0x12, 0x12, 0x1a, 0xb4, 0x15, 0x5a, 0xc1, 0x88, 0xae, - 0x97, 0x05, 0x89, 0x5b, 0x9a, 0x7a, 0x3b, 0x46, 0x4d, 0x9c, 0xb1, 0x9d, 0x85, 0xf2, 0x19, 0xb8, - 0x23, 0x71, 0xe1, 0xc2, 0x0d, 0x71, 0xe1, 0x83, 0xf1, 0x11, 0x10, 0xb2, 0x9d, 0xb4, 0x76, 0xda, - 0x41, 0xd3, 0xc3, 0x20, 0x6e, 0x79, 0x7f, 0x1c, 0xff, 0xde, 0xef, 0xfd, 0xf1, 0x83, 0xde, 0xf7, - 0xf1, 0x2b, 0x92, 0x5e, 0x9f, 0xe5, 0x9c, 0x49, 0x86, 0x1a, 0xe9, 0x75, 0x3e, 0x1b, 0x00, 0x89, - 0x73, 0x6a, 0x34, 0xd1, 0x2f, 0x1e, 0x74, 0x9f, 0x15, 0x84, 0xaf, 0xa6, 0x31, 0x8f, 0x53, 0x81, - 0x1e, 0x41, 0x37, 0x27, 0x5c, 0xe4, 0x24, 0x91, 0xf4, 0x15, 0x09, 0xbd, 0xa1, 0x37, 0xea, 0x8e, - 0xef, 0x9d, 0xa9, 0x73, 0x67, 0xd3, 0x8d, 0x01, 0xdb, 0x5e, 0xe8, 0x21, 0x74, 0xb2, 0x38, 0x25, - 0x22, 0x8f, 0x13, 0x12, 0xfa, 0x43, 0x6f, 0xd4, 0xc3, 0x1b, 0x05, 0x3a, 0x86, 0xa0, 0xe0, 0x34, - 0x0c, 0x86, 0xde, 0xa8, 0x83, 0xd5, 0x27, 0x7a, 0x17, 0x0e, 0x93, 0x42, 0x48, 0x96, 0x4e, 0x39, - 0x63, 0x2f, 0x9f, 0x4c, 0x70, 0xd8, 0xd0, 0x87, 0x6a, 0xda, 0xe8, 0x5b, 0xe8, 0x69, 0x6c, 0x97, - 0x44, 0x88, 0x78, 0x41, 0xd0, 0x3b, 0xd0, 0x24, 0x9c, 0x33, 0x5e, 0xc2, 0xea, 0x1a, 0x58, 0x13, - 0xa5, 0xc2, 0xc6, 0x82, 0xde, 0x83, 0x76, 0x6a, 0xbc, 0x35, 0x90, 0xee, 0xb8, 0x6f, 0x9c, 0xca, - 0x5f, 0xe0, 0xca, 0x1a, 0xfd, 0xea, 0xc1, 0xd1, 0x94, 0x10, 0x6e, 0x07, 0x1f, 0x41, 0x4f, 0xb0, - 0x82, 0x27, 0x64, 0x92, 0x49, 0x2a, 0x57, 0xfa, 0x9a, 0x1e, 0x76, 0x74, 0x7b, 0xc7, 0xfa, 0x10, - 0x3a, 0x82, 0x2e, 0xb2, 0x58, 0x16, 0x9c, 0x94, 0x61, 0x6e, 0x14, 0x68, 0x00, 0x07, 0x79, 0xc5, - 0x41, 0x53, 0x1b, 0xd7, 0x72, 0xf4, 0x39, 0xdc, 0x57, 0x00, 0x5f, 0x64, 0xa2, 0x98, 0x89, 0x84, - 0xd3, 0x19, 0xd9, 0x03, 0xe6, 0x21, 0xf8, 0x74, 0xae, 0xf1, 0x75, 0xb0, 0x4f, 0xe7, 0xd1, 0xc7, - 0xf0, 0xa0, 0xf6, 0x33, 0x4c, 0x44, 0xce, 0x32, 0x71, 0x1b, 0x56, 0xa3, 0xc7, 0x70, 0x4f, 0x9d, - 0x9e, 0x16, 0xb3, 0x25, 0x15, 0x57, 0x25, 0x8c, 0xb7, 0x21, 0x48, 0xc5, 0xa2, 0x3c, 0x55, 0xa3, - 0x59, 0x59, 0xa2, 0x39, 0x9c, 0x58, 0xa7, 0xf6, 0xb8, 0x0f, 0x21, 0x68, 0x08, 0xfa, 0xa3, 0xe1, - 0x37, 0xc0, 0xfa, 0x1b, 0xbd, 0x0e, 0xad, 0x25, 0xc9, 0x16, 0xf2, 0x4a, 0xb3, 0x1b, 0xe0, 0x52, - 0x8a, 0xfe, 0xf0, 0xcc, 0x35, 0xcf, 0x4d, 0x60, 0xb9, 0xa4, 0x2c, 0xfb, 0xea, 0xd3, 0xe7, 0x77, - 0x92, 0x4c, 0xc3, 0x6a, 0xa3, 0x62, 0x55, 0xa5, 0x8f, 0xb3, 0x42, 0x12, 0xfe, 0xf4, 0x89, 0x4e, - 0x5f, 0x07, 0xaf, 0x65, 0x85, 0x97, 0xfc, 0x90, 0x53, 0xbe, 0x0a, 0x5b, 0x06, 0xaf, 0x91, 0xa2, - 0xdf, 0x5c, 0xbc, 0xeb, 0xac, 0xbe, 0x0f, 0x81, 0x9c, 0x89, 0x92, 0x94, 0xd3, 0xaa, 0xe3, 0xb6, - 0xe2, 0xc2, 0xca, 0xcb, 0xad, 0x2a, 0xff, 0xdf, 0xaa, 0x2a, 0x70, 0xab, 0x4a, 0xf5, 0x5e, 0x3c, - 0x13, 0x6c, 0x59, 0x48, 0x32, 0x31, 0xf0, 0x1a, 0x1a, 0x5e, 0x4d, 0x1b, 0xfd, 0xec, 0x43, 0xdf, - 0xcd, 0xf7, 0x7f, 0x32, 0x1a, 0x3e, 0x80, 0x76, 0xc2, 0x32, 0x49, 0x32, 0x19, 0x36, 0x86, 0xc1, - 0xa8, 0x3b, 0x3e, 0x29, 0x2f, 0x88, 0x57, 0x4b, 0x16, 0xcf, 0xbf, 0x9c, 0x7d, 0x47, 0x12, 0x89, - 0x2b, 0x1f, 0x74, 0x0e, 0x27, 0x24, 0x4b, 0xf8, 0x4a, 0xb3, 0x33, 0x8d, 0xb9, 0xa4, 0xea, 0x23, - 0x6c, 0x0e, 0x83, 0x51, 0x0f, 0xef, 0x32, 0xa1, 0x10, 0xda, 0x0a, 0x1f, 0x15, 0x52, 0xe7, 0xe5, - 0x00, 0x57, 0xe2, 0x8e, 0xa9, 0xd4, 0xde, 0x39, 0x95, 0x1e, 0xc3, 0xd1, 0xfe, 0x25, 0x1d, 0xfd, - 0xe5, 0x01, 0x94, 0xdd, 0x71, 0xdb, 0xea, 0xfc, 0x08, 0x0e, 0x8d, 0xfc, 0x05, 0x4b, 0x62, 0x1d, - 0x97, 0x6f, 0x73, 0x5e, 0x69, 0x5f, 0xe0, 0xa7, 0xb8, 0xe6, 0xe8, 0xd2, 0x1e, 0xdc, 0x40, 0x7b, - 0xc3, 0xa1, 0x3d, 0x37, 0x0c, 0x6b, 0xee, 0x6e, 0xa2, 0xbd, 0xf4, 0x51, 0xe8, 0x19, 0xa7, 0x0b, - 0x9a, 0x61, 0x5d, 0xed, 0x9a, 0xc9, 0x0e, 0x76, 0x74, 0xd1, 0x4f, 0x3e, 0xb4, 0xad, 0xc1, 0xad, - 0x0b, 0xd0, 0xe5, 0x47, 0x33, 0x8a, 0x8d, 0xc5, 0xa9, 0x59, 0xbf, 0x56, 0xb3, 0x91, 0x69, 0x8d, - 0x40, 0x1f, 0x3e, 0x76, 0x26, 0xcd, 0xee, 0x8e, 0xd8, 0x9a, 0xb3, 0xfb, 0xd7, 0xc9, 0x5b, 0x00, - 0x92, 0xa6, 0x44, 0xc8, 0x38, 0xcd, 0x45, 0xd8, 0x1a, 0x06, 0xa3, 0x00, 0x5b, 0x1a, 0xf4, 0x1a, - 0x34, 0xe7, 0x9c, 0xe5, 0x22, 0x6c, 0x6b, 0x93, 0x11, 0xec, 0xea, 0x3a, 0x70, 0xaa, 0x2b, 0xba, - 0x80, 0xbe, 0x43, 0xa6, 0x9a, 0x0f, 0x22, 0xb9, 0x22, 0x69, 0xac, 0x49, 0xe9, 0xe0, 0x52, 0x52, - 0xbf, 0xa8, 0x3a, 0xc0, 0xf0, 0x50, 0x89, 0xd1, 0xdf, 0x1e, 0x1c, 0xd5, 0xa7, 0xc6, 0xff, 0xe9, - 0xbd, 0x56, 0x9c, 0xd1, 0x39, 0xc9, 0x24, 0x7d, 0x49, 0x09, 0x2f, 0x07, 0xa2, 0xa5, 0xb9, 0x69, - 0x24, 0xee, 0x98, 0x49, 0xed, 0x9d, 0x33, 0x29, 0x86, 0x13, 0x7b, 0x1a, 0xde, 0xc1, 0x5a, 0x30, - 0xfe, 0xd3, 0x83, 0xd6, 0x37, 0x17, 0x5f, 0x4f, 0x2e, 0x9f, 0xa1, 0x0f, 0xa1, 0x5d, 0xf6, 0x39, - 0xaa, 0xba, 0xc1, 0x9e, 0x87, 0x83, 0xfb, 0x8e, 0x72, 0x3d, 0x0b, 0x3e, 0x81, 0xce, 0x3a, 0x49, - 0xa8, 0xf4, 0xa9, 0x65, 0x6d, 0x70, 0xea, 0xa8, 0xed, 0x60, 0xce, 0x3d, 0x74, 0x0e, 0x4d, 0xbd, - 0x94, 0xa0, 0x32, 0x8d, 0xd6, 0x86, 0x32, 0x40, 0x96, 0x6a, 0x7d, 0x62, 0xfc, 0xbb, 0x0f, 0x7d, - 0x03, 0x5a, 0x3d, 0x18, 0x34, 0x5b, 0xa0, 0x0b, 0xe8, 0x5a, 0x4f, 0x2f, 0x7a, 0xb0, 0x79, 0x4e, - 0xdc, 0x18, 0x4e, 0xb7, 0x0c, 0xeb, 0x38, 0x3e, 0x83, 0xbe, 0xf3, 0x4c, 0xa1, 0xed, 0x37, 0xe9, - 0x76, 0xf1, 0x5c, 0x9a, 0x45, 0xcb, 0x5a, 0x3d, 0xd0, 0x1b, 0x9b, 0x5f, 0x6d, 0xad, 0x37, 0x83, - 0x37, 0x77, 0x1a, 0x2d, 0x7e, 0x8f, 0xd7, 0x7b, 0x1b, 0x26, 0xd7, 0x05, 0x11, 0xb2, 0xa2, 0xb9, - 0xb6, 0xcf, 0xed, 0x66, 0x6b, 0xd6, 0xd2, 0x9b, 0xef, 0xa3, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, - 0xd4, 0x58, 0xc6, 0x2c, 0x1b, 0x0b, 0x00, 0x00, +func init() { proto.RegisterFile("wavemq.proto", fileDescriptor_wavemq_39f8c2d85be4999d) } + +var fileDescriptor_wavemq_39f8c2d85be4999d = []byte{ + // 947 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xcf, 0x8f, 0xdb, 0x44, + 0x14, 0x96, 0xed, 0xfc, 0xd8, 0xbc, 0x24, 0xbb, 0xdb, 0x59, 0xda, 0x7a, 0xc3, 0x52, 0x82, 0x0f, + 0x10, 0x09, 0xb1, 0x5a, 0xa5, 0x45, 0x02, 0x09, 0x09, 0x2d, 0x34, 0x42, 0x15, 0xac, 0x48, 0x67, + 0x29, 0x48, 0xdc, 0xc6, 0xce, 0x34, 0x1d, 0xb4, 0xf6, 0x78, 0x67, 0xc6, 0x85, 0xdc, 0x39, 0x72, + 0x47, 0xe2, 0xc2, 0x85, 0x1b, 0xe2, 0x5f, 0xe3, 0x4f, 0x40, 0x68, 0x66, 0xec, 0xc4, 0x76, 0xbc, + 0xd0, 0x20, 0x15, 0xf5, 0xe6, 0xf9, 0xde, 0x1b, 0xcf, 0x37, 0xdf, 0xfb, 0x31, 0x0f, 0x06, 0xdf, + 0x93, 0xe7, 0x34, 0xbe, 0x3e, 0x4d, 0x05, 0x57, 0x1c, 0xb5, 0xe2, 0xeb, 0x34, 0x1c, 0x01, 0x25, + 0x29, 0xb3, 0x48, 0xe0, 0xc3, 0x9d, 0x4f, 0x79, 0x92, 0xd0, 0x48, 0x31, 0x9e, 0x5c, 0x2a, 0xa2, + 0x32, 0x39, 0x27, 0x82, 0xc4, 0x32, 0x08, 0xc1, 0xaf, 0x5b, 0x30, 0x95, 0x29, 0x4f, 0x24, 0x45, + 0xf7, 0x00, 0x14, 0x57, 0xe4, 0x6a, 0x4e, 0xa9, 0x90, 0xbe, 0x33, 0x76, 0x26, 0x6d, 0x5c, 0x42, + 0xd0, 0xdb, 0xb0, 0x1f, 0xd9, 0xbd, 0x74, 0x61, 0x7d, 0x5c, 0xe3, 0x53, 0x43, 0x83, 0x5f, 0x1c, + 0xe8, 0x3f, 0xce, 0xa8, 0x58, 0xd9, 0x33, 0xd1, 0x7d, 0xe8, 0xa7, 0x54, 0xc8, 0x54, 0x1f, 0xfa, + 0x9c, 0x9a, 0x1f, 0xf7, 0xa7, 0xb7, 0x4e, 0x35, 0xeb, 0xd3, 0xf9, 0xc6, 0x80, 0xcb, 0x5e, 0xe8, + 0x04, 0x7a, 0x09, 0x89, 0xa9, 0x4c, 0x49, 0x44, 0xcd, 0x39, 0x03, 0xbc, 0x01, 0xd0, 0x21, 0x78, + 0x99, 0x60, 0xbe, 0x37, 0x76, 0x26, 0x3d, 0xac, 0x3f, 0x0d, 0xb9, 0x4c, 0x2a, 0x1e, 0xcf, 0x05, + 0xe7, 0x4f, 0x1f, 0xce, 0xb0, 0xdf, 0x32, 0x9b, 0x6a, 0x68, 0xf0, 0x2d, 0x0c, 0x0c, 0xb7, 0x0b, + 0x2a, 0x25, 0x59, 0x52, 0xf4, 0x16, 0xb4, 0xa9, 0x10, 0x5c, 0xe4, 0xb4, 0xfa, 0x96, 0xd6, 0x4c, + 0x43, 0xd8, 0x5a, 0xd0, 0x3b, 0xd0, 0x8d, 0xad, 0xb7, 0x21, 0xd2, 0x9f, 0x0e, 0xad, 0x53, 0xfe, + 0x0b, 0x5c, 0x58, 0x83, 0x5f, 0x1d, 0x38, 0xd0, 0x12, 0x94, 0x2f, 0x1f, 0xc0, 0x40, 0xf2, 0x4c, + 0x44, 0x74, 0x96, 0x28, 0xa6, 0x56, 0xe6, 0x98, 0x01, 0xae, 0x60, 0x3b, 0xdf, 0xf5, 0x04, 0x7a, + 0x92, 0x2d, 0x13, 0xa2, 0x32, 0x41, 0xf3, 0x6b, 0x6e, 0x00, 0x34, 0x82, 0xbd, 0xb4, 0xd0, 0xa0, + 0x6d, 0x8c, 0xeb, 0x75, 0xf0, 0x39, 0xdc, 0xd6, 0x04, 0x9f, 0x24, 0x32, 0x0b, 0x65, 0x24, 0x58, + 0x48, 0x77, 0xa0, 0xb9, 0x0f, 0x2e, 0x5b, 0x18, 0x7e, 0x3d, 0xec, 0xb2, 0x45, 0xf0, 0x11, 0xdc, + 0xad, 0xfd, 0x6c, 0x9d, 0x4a, 0xff, 0xae, 0x6a, 0xf0, 0x00, 0x6e, 0xe9, 0xdd, 0xf3, 0x2c, 0xbc, + 0x62, 0xf2, 0x59, 0x4e, 0xe3, 0x4d, 0xf0, 0x62, 0xb9, 0xcc, 0x77, 0xd5, 0x64, 0xd6, 0x96, 0xe0, + 0x03, 0x38, 0x2a, 0xed, 0xda, 0xe5, 0xbc, 0x3f, 0x1c, 0xbb, 0xf5, 0xd2, 0x92, 0x4d, 0x75, 0x01, + 0x7c, 0xf5, 0xc9, 0xe5, 0x4b, 0x09, 0x90, 0x55, 0xaa, 0x55, 0x28, 0xa5, 0x43, 0x22, 0x78, 0xa6, + 0xa8, 0x78, 0xf4, 0xd0, 0x84, 0xa4, 0x87, 0xd7, 0x6b, 0x74, 0x07, 0x3a, 0xf4, 0x87, 0x94, 0x89, + 0x95, 0xdf, 0x19, 0x3b, 0x13, 0x0f, 0xe7, 0xab, 0xe0, 0xb7, 0x2a, 0xdf, 0x75, 0xa4, 0xde, 0x05, + 0x4f, 0x85, 0x32, 0xbf, 0xe8, 0x71, 0x51, 0x45, 0x5b, 0xf7, 0xc2, 0xda, 0xab, 0x9a, 0x29, 0xee, + 0x3f, 0x65, 0x8a, 0x57, 0xcd, 0x14, 0x5d, 0x4f, 0x24, 0x94, 0xfc, 0x2a, 0x53, 0x74, 0x66, 0xe9, + 0xb5, 0x0c, 0xbd, 0x1a, 0x1a, 0xfc, 0xec, 0xc2, 0xb0, 0x1a, 0xc3, 0xff, 0xa5, 0xdc, 0xdf, 0x83, + 0x6e, 0xc4, 0x13, 0x45, 0x13, 0xe5, 0xb7, 0xc6, 0xde, 0xa4, 0x3f, 0x3d, 0xca, 0x0f, 0x20, 0xab, + 0x2b, 0x4e, 0x16, 0x5f, 0x86, 0xdf, 0xd1, 0x48, 0xe1, 0xc2, 0x07, 0x9d, 0xc1, 0x11, 0x4d, 0x22, + 0xb1, 0x32, 0xea, 0xcc, 0x89, 0x50, 0x4c, 0x7f, 0xf8, 0xed, 0xb1, 0x37, 0x19, 0xe0, 0x26, 0x13, + 0xf2, 0xa1, 0xab, 0xf9, 0x31, 0xa9, 0x4c, 0x5c, 0xf6, 0x70, 0xb1, 0x6c, 0xe8, 0x34, 0xdd, 0xc6, + 0x4e, 0xf3, 0x00, 0x0e, 0xfe, 0x43, 0x9a, 0xfe, 0xe9, 0x00, 0xe4, 0x19, 0xff, 0xa2, 0xd9, 0xf9, + 0x21, 0xec, 0xdb, 0xf5, 0x17, 0x3c, 0x22, 0xe6, 0x5e, 0x6e, 0x59, 0xf3, 0x02, 0x7d, 0x82, 0x1f, + 0xe1, 0x9a, 0x63, 0x55, 0x76, 0xef, 0x06, 0xd9, 0x5b, 0x15, 0xd9, 0x53, 0xab, 0xb0, 0xd1, 0xee, + 0x26, 0xd9, 0x73, 0x1f, 0xcd, 0x9e, 0x0b, 0xb6, 0x64, 0x09, 0x36, 0xd9, 0x6e, 0x94, 0xec, 0xe1, + 0x0a, 0x16, 0xfc, 0xe4, 0x42, 0xb7, 0xd4, 0x8c, 0x4d, 0x02, 0x56, 0xf5, 0x31, 0x8a, 0x62, 0x6b, + 0xa9, 0xe4, 0xac, 0x5b, 0xcb, 0xd9, 0xc0, 0x96, 0x86, 0x67, 0x36, 0x1f, 0x56, 0xba, 0x47, 0x73, + 0x45, 0x6c, 0xf5, 0xce, 0xdd, 0xf3, 0x44, 0x3f, 0x9a, 0x2c, 0xa6, 0x52, 0x91, 0x38, 0x95, 0x7e, + 0x67, 0xec, 0x4d, 0x3c, 0x5c, 0x42, 0xd0, 0x6b, 0xd0, 0x5e, 0x08, 0x9e, 0x4a, 0xbf, 0x6b, 0x4c, + 0x76, 0x51, 0xce, 0xae, 0xbd, 0x4a, 0x76, 0x05, 0xe7, 0x30, 0xac, 0x88, 0xa9, 0xfb, 0x83, 0x8c, + 0x9e, 0xd1, 0x98, 0x18, 0x51, 0x7a, 0x38, 0x5f, 0xe9, 0x5f, 0x14, 0x15, 0x60, 0x75, 0x28, 0x96, + 0xc1, 0x5f, 0x0e, 0x1c, 0xd4, 0xbb, 0xc6, 0xab, 0xf4, 0x06, 0x6b, 0xcd, 0xd8, 0x82, 0x26, 0x8a, + 0x3d, 0x65, 0x54, 0xe4, 0x0d, 0xb1, 0x84, 0xdc, 0xd4, 0x12, 0x1b, 0x7a, 0x52, 0xb7, 0xb1, 0x27, + 0x11, 0x38, 0x2a, 0x77, 0xc3, 0x97, 0xf0, 0xd4, 0x4f, 0x7f, 0x74, 0xa1, 0xf3, 0xcd, 0xf9, 0xd7, + 0xb3, 0x8b, 0xc7, 0xe8, 0x7d, 0xe8, 0xe6, 0x75, 0x8e, 0x8a, 0x6a, 0x28, 0xf7, 0xc3, 0xd1, 0xed, + 0x0a, 0xb8, 0xee, 0x05, 0x1f, 0x43, 0x6f, 0x1d, 0x24, 0x94, 0xfb, 0xd4, 0xa2, 0x36, 0x3a, 0xae, + 0xc0, 0xe5, 0xcb, 0x9c, 0x39, 0xe8, 0x0c, 0xda, 0x66, 0xd0, 0x40, 0x79, 0x18, 0x4b, 0x53, 0xc7, + 0x08, 0x95, 0xa0, 0xcd, 0x8e, 0x39, 0x1c, 0xd6, 0x87, 0x3f, 0x74, 0x62, 0x3d, 0x9b, 0xc7, 0xc5, + 0xd1, 0xbd, 0x66, 0x6b, 0x71, 0x89, 0xe9, 0xef, 0x2e, 0x0c, 0xad, 0x0c, 0xfa, 0x09, 0x62, 0xc9, + 0x12, 0x9d, 0x43, 0xbf, 0xf4, 0x40, 0xa3, 0xbb, 0x9b, 0x07, 0xaa, 0xaa, 0xca, 0xf1, 0x96, 0x61, + 0xad, 0xcc, 0x67, 0x30, 0xac, 0x3c, 0x7c, 0x68, 0xfb, 0x95, 0x7b, 0x31, 0x85, 0x2e, 0xec, 0x38, + 0x56, 0x1a, 0x50, 0xd0, 0xeb, 0x9b, 0x5f, 0x6d, 0x0d, 0x41, 0xa3, 0x37, 0x1a, 0x8d, 0xa5, 0x88, + 0x1d, 0xae, 0xa7, 0x3b, 0x4c, 0xaf, 0x33, 0x2a, 0x55, 0x11, 0xb8, 0xda, 0xd4, 0xd7, 0xac, 0x7f, + 0xd8, 0x31, 0xd3, 0xf9, 0xfd, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x41, 0xcc, 0x4d, 0x87, 0xbf, + 0x0b, 0x00, 0x00, } diff --git a/mqpb/wavemq.proto b/mqpb/wavemq.proto index 536fb56..88a6784 100644 --- a/mqpb/wavemq.proto +++ b/mqpb/wavemq.proto @@ -8,7 +8,9 @@ service WAVEMQ { rpc Publish(PublishParams) returns (PublishResponse); rpc Subscribe(SubscribeParams) returns (stream SubscriptionMessage); rpc Query(QueryParams) returns (stream QueryMessage); + rpc ConnectionStatus(ConnectionStatusParams) returns (ConnectionStatusResponse); } + service WAVEMQPeering { rpc PeerPublish(PeerPublishParams) returns (PeerPublishResponse); rpc PeerSubscribe(PeerSubscribeParams) returns (stream SubscriptionMessage); @@ -16,6 +18,15 @@ service WAVEMQPeering { rpc PeerQueryRequest(PeerQueryParams) returns (stream QueryMessage); } +message ConnectionStatusParams { + +} + +message ConnectionStatusResponse { + int32 totalPeers = 1; + int32 connectedPeers = 2; +} + message QueryParams { Perspective perspective = 1; bytes namespace = 2; diff --git a/server/local.go b/server/local.go index 922ccd5..e506c5a 100644 --- a/server/local.go +++ b/server/local.go @@ -101,6 +101,14 @@ func NewLocalServer(tm *core.Terminus, am *core.AuthModule, cfg *LocalServerConf return s } +func (s *srv) ConnectionStatus(ctx context.Context, p *pb.ConnectionStatusParams) (*pb.ConnectionStatusResponse, error) { + a, b := s.tm.ConnectionStatus() + return &pb.ConnectionStatusResponse{ + TotalPeers: int32(b), + ConnectedPeers: int32(a), + }, nil +} + func (s *srv) Publish(ctx context.Context, p *pb.PublishParams) (*pb.PublishResponse, error) { m, err := s.am.FormMessage(p, s.tm.RouterID()) if err != nil {