Skip to content

Commit

Permalink
Add conn status
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Andersen committed Oct 10, 2018
1 parent fdbed51 commit 0459c97
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 98 deletions.
17 changes: 15 additions & 2 deletions core/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ var pmPeerErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "peer_errors",
Help: "Number of peer connection errors",
})
var pmActiveUpstreamConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: "route",
Name: "active_upstream_connections",
Help: "Number of active upstream peer connections",
})
var pmSubscriptions = prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: "route",
Name: "active_subscriptions",
Expand Down Expand Up @@ -128,6 +133,8 @@ type Terminus struct {

uplinkConns map[string]*PeerConnection
uplinkConnMu sync.RWMutex

activeUplink int64
}

type PeerConnection struct {
Expand Down Expand Up @@ -598,8 +605,6 @@ func (t *Terminus) downstreamPeer(ctx context.Context, q *Queue) (err error) {
}
}
func (t *Terminus) beginUpstreamPeering(q *Queue, dr *DesignatedRouter) {
//TODO add transport credentials using auth manager
//Do TLS handshake with signature on the self-signed cert, ala BW2
for {
ctx, cancel := context.WithCancel(context.Background())
err := t.upstreamPeer(ctx, q, dr)
Expand All @@ -623,6 +628,10 @@ func (t *Terminus) GetDesignatedRouterConnection(namespace string) *PeerConnecti
return conn
}

func (t *Terminus) ConnectionStatus() (int64, int64) {
return t.activeUplink, int64(len(t.drnamespaces))
}

func (t *Terminus) upstreamPeer(ctx context.Context, q *Queue, dr *DesignatedRouter) (err error) {
t.uplinkConnMu.Lock()
delete(t.uplinkConns, dr.Namespace)
Expand All @@ -645,7 +654,11 @@ func (t *Terminus) upstreamPeer(ctx context.Context, q *Queue, dr *DesignatedRou
err = e.(error)
}
conn.Close()
v := atomic.AddInt64(&t.activeUplink, -1)
pmActiveUpstreamConnections.Set(float64(v))
}()
v := atomic.AddInt64(&t.activeUplink, 1)
pmActiveUpstreamConnections.Set(float64(v))
peerConn := &PeerConnection{
Conn: conn,
Ctx: ctx,
Expand Down
Loading

0 comments on commit 0459c97

Please sign in to comment.