Skip to content

Commit

Permalink
Improve performance with no internet connectivity
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Andersen committed Oct 3, 2018
1 parent f3c26cd commit fdbed51
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 77 deletions.
17 changes: 10 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions core/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ type QueryElement struct {
}

func NewTerminus(qm *QManager, am *AuthModule, cfg *RoutingConfig) (*Terminus, error) {

//First validate the config
err := am.SetRouterEntityFile(cfg.RouterEntityFile)
if err != nil {
Expand All @@ -183,7 +182,6 @@ func NewTerminus(qm *QManager, am *AuthModule, cfg *RoutingConfig) (*Terminus, e
uplinkConns: make(map[string]*PeerConnection),
drnamespaces: weRoute,
}

rv.namespaces = make(map[string]*DesignatedRouter)
for _, r := range cfg.Router {
dr := r
Expand Down
176 changes: 113 additions & 63 deletions core/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (am *AuthModule) SetRouterEntityFile(filename string) error {
DER: contents,
},
}

//Check perspective is okay by doing a resync
resp, err := am.wave.ResyncPerspectiveGraph(context.Background(), &eapipb.ResyncPerspectiveGraphParams{
Perspective: am.ourPerspective,
Expand All @@ -210,15 +209,13 @@ func (am *AuthModule) SetRouterEntityFile(filename string) error {
if resp.Error != nil {
return fmt.Errorf("could not sync router entity file: %v", resp.Error.Message)
}

//Wait for sync, for the fun of it
err = am.wave.WaitForSyncCompleteHack(&eapipb.SyncParams{
Perspective: am.ourPerspective,
})
if err != nil {
return fmt.Errorf("could not sync router entity file: %v", err)
}

//also inspect so we can learn our hash
iresp, err := am.wave.Inspect(context.Background(), &eapipb.InspectParams{
Content: contents,
Expand All @@ -230,7 +227,6 @@ func (am *AuthModule) SetRouterEntityFile(filename string) error {
return fmt.Errorf("could not inspect router entity file: %v", resp.Error.Message)
}
am.perspectiveHash = iresp.Entity.Hash

return nil
}

Expand Down Expand Up @@ -666,6 +662,27 @@ func (am *AuthModule) FormSubRequest(p *pb.SubscribeParams, routerID string) (*p
return nil, wve.Err(wve.InvalidParameter, "missing perspective")
}

perspectiveHash := murmur.Murmur3(p.Perspective.EntitySecret.DER)
am.phashcachemu.RLock()
realhash, ok := am.phashcache[perspectiveHash]
am.phashcachemu.RUnlock()
if !ok {
//We need our entity hash
iresp, err := am.wave.Inspect(context.Background(), &eapipb.InspectParams{
Content: p.Perspective.EntitySecret.DER,
})
if err != nil {
return nil, wve.ErrW(wve.NoProofFound, "failed validate perspective", err)
}
if iresp.Error != nil {
return nil, wve.Err(wve.NoProofFound, "failed validate perspective: "+iresp.Error.Message)
}
am.phashcachemu.Lock()
am.phashcache[perspectiveHash] = iresp.Entity.Hash
am.phashcachemu.Unlock()
realhash = iresp.Entity.Hash
}

hash := sha3.New256()
hash.Write(p.Namespace)
hash.Write([]byte(p.Uri))
Expand All @@ -680,16 +697,6 @@ func (am *AuthModule) FormSubRequest(p *pb.SubscribeParams, routerID string) (*p
},
}

iresp, err := am.wave.Inspect(context.Background(), &eapipb.InspectParams{
Content: p.Perspective.EntitySecret.DER,
})
if err != nil {
return nil, wve.ErrW(wve.NoProofFound, "failed validate perspective", err)
}
if iresp.Error != nil {
return nil, wve.Err(wve.NoProofFound, "failed validate perspective: "+iresp.Error.Message)
}

signresp, err := am.wave.Sign(context.Background(), &eapipb.SignParams{
Perspective: perspective,
Content: digest,
Expand All @@ -701,60 +708,103 @@ func (am *AuthModule) FormSubRequest(p *pb.SubscribeParams, routerID string) (*p
return nil, wve.Err(wve.InvalidSignature, signresp.Error.Message)
}

if p.CustomProofDER == nil {
//Build a proof
proofresp, err := am.wave.BuildRTreeProof(context.Background(), &eapipb.BuildRTreeProofParams{
Perspective: perspective,
Namespace: p.Namespace,
Statements: []*eapipb.RTreePolicyStatement{
{
PermissionSet: []byte(WAVEMQPermissionSet),
Permissions: []string{WAVEMQSubscribe},
Resource: p.Uri,
},
},
ResyncFirst: true,
})
if err != nil {
return nil, wve.ErrW(wve.NoProofFound, "failed to build", err)
}
if proofresp.Error != nil {
return nil, wve.Err(wve.NoProofFound, proofresp.Error.Message)
bk := bcacheKey{}
copy(bk.Namespace[:], p.Namespace)
copy(bk.Target[:], realhash)

policyhash := sha3.New256()
policyhash.Write([]byte(WAVEMQSubscribe))
policyhash.Write([]byte("onuri="))
policyhash.Write([]byte(p.Uri))
poldigest := policyhash.Sum(nil)
copy(bk.PolicyHash[:], poldigest)

am.bcachemu.RLock()
cachedproof, ok := am.bcache[bk]
am.bcachemu.RUnlock()

var proofder []byte
var expiry time.Time

if p.CustomProofDER != nil {
proofder = p.CustomProofDER
} else {
rebuildproof := true
if ok {
if cachedproof.CacheExpiry.After(time.Now()) {
rebuildproof = false
}
}

expiry := time.Unix(0, proofresp.Result.Expiry*1e6)
if p.AbsoluteExpiry != 0 && expiry.After(time.Unix(0, p.AbsoluteExpiry)) {
expiry = time.Unix(0, p.AbsoluteExpiry)
if rebuildproof {
//Build a proof
proofresp, err := am.wave.BuildRTreeProof(context.Background(), &eapipb.BuildRTreeProofParams{
Perspective: perspective,
Namespace: p.Namespace,
Statements: []*eapipb.RTreePolicyStatement{
{
PermissionSet: []byte(WAVEMQPermissionSet),
Permissions: []string{WAVEMQSubscribe},
Resource: p.Uri,
},
},
ResyncFirst: true,
})
if err != nil {
return nil, wve.ErrW(wve.NoProofFound, "failed to build", err)
}
if proofresp.Error != nil {
ci := &bcacheItem{
CacheExpiry: time.Now().Add(FailedProofCacheTime),
Valid: false,
}
am.bcachemu.Lock()
am.bcache[bk] = ci
am.bcachemu.Unlock()
return nil, wve.Err(wve.NoProofFound, proofresp.Error.Message)
}

proofder = proofresp.ProofDER
ci := &bcacheItem{
CacheExpiry: time.Now().Add(SuccessfulProofCacheTime),
Valid: true,
DER: proofresp.ProofDER,
ProofExpiry: time.Unix(0, proofresp.Result.Expiry*1e6),
}
if ci.ProofExpiry.Before(ci.CacheExpiry) {
ci.CacheExpiry = ci.ProofExpiry
}
am.bcachemu.Lock()
am.bcache[bk] = ci
am.bcachemu.Unlock()

expiry = time.Unix(0, proofresp.Result.Expiry*1e6)
if p.AbsoluteExpiry != 0 && expiry.After(time.Unix(0, p.AbsoluteExpiry)) {
expiry = time.Unix(0, p.AbsoluteExpiry)
}
} else {
proofder = cachedproof.DER
expiry = cachedproof.ProofExpiry
if p.AbsoluteExpiry != 0 && expiry.After(time.Unix(0, p.AbsoluteExpiry)) {
expiry = time.Unix(0, p.AbsoluteExpiry)
}
}
return &pb.PeerSubscribeParams{
Tbs: &pb.PeerSubscriptionTBS{
Expiry: p.Expiry,
SourceEntity: iresp.Entity.Hash,
Namespace: p.Namespace,
Uri: p.Uri,
Id: p.Identifier,
RouterID: routerID,
},
Signature: signresp.Signature,
ProofDER: proofresp.ProofDER,
AbsoluteExpiry: expiry.UnixNano(),
}, nil
} else {
return &pb.PeerSubscribeParams{
Tbs: &pb.PeerSubscriptionTBS{
Expiry: p.Expiry,
SourceEntity: iresp.Entity.Hash,
Namespace: p.Namespace,
Uri: p.Uri,
Id: p.Identifier,
RouterID: routerID,
},
Signature: signresp.Signature,
ProofDER: p.CustomProofDER,
AbsoluteExpiry: p.AbsoluteExpiry,
}, nil
}

return &pb.PeerSubscribeParams{
Tbs: &pb.PeerSubscriptionTBS{
Expiry: p.Expiry,
SourceEntity: realhash,
Namespace: p.Namespace,
Uri: p.Uri,
Id: p.Identifier,
RouterID: routerID,
},
Signature: signresp.Signature,
ProofDER: proofder,
AbsoluteExpiry: expiry.UnixNano(),
}, nil

}

func (am *AuthModule) FormQueryRequest(p *pb.QueryParams, routerID string) (*pb.PeerQueryParams, wve.WVE) {
Expand Down
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func main() {
fmt.Printf("configuration loaded\n")

consts.DefaultToUnrevoked = conf.WaveConfig.DefaultToUnrevoked

qm, err := core.NewQManager(&conf.QueueConfig)
if err != nil {
fmt.Printf("failed to initialize queues: %v\n", err)
Expand All @@ -78,10 +77,8 @@ func main() {
fmt.Printf("failed to initialize routing: %v\n", err)
os.Exit(1)
}

server.NewLocalServer(tm, am, &conf.LocalConfig)
server.NewPeerServer(tm, am, &conf.PeerConfig)

sigchan := make(chan os.Signal, 30)
signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
<-sigchan
Expand Down
2 changes: 1 addition & 1 deletion server/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type ShutdownAble interface {
}

func NewLocalServer(tm *core.Terminus, am *core.AuthModule, cfg *LocalServerConfig) ShutdownAble {
fmt.Printf("Listening on %s\n", cfg.ListenAddr)
fmt.Printf("Listening for local connections on %s\n", cfg.ListenAddr)
l, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion server/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type PeerServerConfig struct {

func NewPeerServer(tm *core.Terminus, am *core.AuthModule, cfg *PeerServerConfig) ShutdownAble {
//TODO add the code for verifying key exchange
fmt.Printf("Listening on %s\n", cfg.ListenAddr)
fmt.Printf("Listening for peering connections on %s\n", cfg.ListenAddr)
l, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
panic(err)
Expand Down

0 comments on commit fdbed51

Please sign in to comment.