Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit - FR: realtime event listener for RTDB (#229) #245

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
115 changes: 115 additions & 0 deletions cmd/listen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"log"
"os"

"golang.org/x/net/context"
"google.golang.org/api/option"

firebase "firebase.google.com/go"
"firebase.google.com/go/db"
)

func main() {

// opt := option.WithCredentialsFile("c:/users/username/.firebase/firebase.json") // Windows
//
opt := option.WithCredentialsFile("/home/username/.firebase/firebase.json") // Linux, edit 1.

config := &firebase.Config{
DatabaseURL: "https://mydb.firebaseio.com", // edit 2.
}

app, err := firebase.NewApp(context.Background(), config, opt)
if err != nil {
log.Fatal(err)
}

ctx := context.Background()

// DatabaseWithURL
client, err := app.Database(ctx)

if err != nil {
log.Fatal(err)
}

testpath := "user1/path1"
ref := client.NewRef(testpath)

args := os.Args
if len(args) > 1 {
triggerEvent(ctx, client, testpath, args[1])
return // exit app
}

// SnapshotIterator
iter, err := ref.Listen(ctx)
if err != nil {
fmt.Printf(" Error: failed to create Listener %v\n", err)
return
}

fmt.Printf("client app | Ref Path: %s | iter.Snapshot = %v\n", ref.Path, iter.Snapshot)
fmt.Printf(" | Ref Key: %s \n", ref.Key)

defer iter.Stop()

go func() {
for {

if iter.Done() {
break
}

event, err := iter.Next()

if err != nil {
// Handle error here based on specific usecase
// We can continue Listening
log.Printf("%v\n", err)
continue // go back to beginning of for loop
}

fmt.Printf("client app | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref.Path, event.Path, event.Snapshot())
fmt.Printf("\n")
}
}()

fmt.Printf("\n >>> edit value of any key from %s in firebase console to trigger event\n\n", testpath)
fmt.Printf("\n >>> press <enter> to close http connection\n\n")
fmt.Printf("Waiting for events...\n\n")

fmt.Scanln()
iter.Stop()

fmt.Printf("\n >>> press <enter> to exit app\n\n\n")
fmt.Scanln()
}

func triggerEvent(ctx context.Context, client *db.Client, testpath string, val string) {

ref := client.NewRef(testpath + "/key1")

if err := ref.Set(ctx, val); err != nil {
log.Fatal(err)
} else {
fmt.Printf("OK - Set %s to %v\n", testpath+"/key1", val)
}
}
182 changes: 182 additions & 0 deletions cmd/listen2/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"log"
"os"

"golang.org/x/net/context"
"google.golang.org/api/option"

firebase "firebase.google.com/go"
"firebase.google.com/go/db"
)

// Key is a json-serializable type.
type Key struct {
Key1 string `json:"key1"`
}

func main() {

// opt := option.WithCredentialsFile("c:/users/username/.firebase/firebase.json") // Windows
//
opt := option.WithCredentialsFile("/home/username/.firebase/firebase.json") // Linux, edit 1.

config := &firebase.Config{
DatabaseURL: "https://databaseName.firebaseio.com", // edit 2.
}

app, err := firebase.NewApp(context.Background(), config, opt)
if err != nil {
log.Fatal(err)
}

ctx := context.Background()

// DatabaseWithURL
client, err := app.Database(ctx)

if err != nil {
log.Fatal(err)
}

// https://firebase.google.com/docs/reference/js/firebase.database.Reference.html#key
//
// key = The last part of the Reference's path.

testpath := "user1/path1"
ref := client.NewRef(testpath)

args := os.Args
if len(args) > 1 {
triggerEvent(ctx, client, testpath, args[1])
return // exit app
}

// SnapshotIterator
iter, err := ref.Listen(ctx)
if err != nil {
fmt.Printf(" Error: failed to create Listener %v\n", err)
return // exit app
}

fmt.Printf("Initial snapshots:\n")

fmt.Printf("1st Listener | Ref Path: %s | iter.Snapshot = %v\n", ref.Path, iter.Snapshot)
fmt.Printf(" | Ref Key: %s \n", ref.Key)

defer iter.Stop()

var key Key

go func() {
for {

if iter.Done() {
break
}

event, err := iter.Next()

if err != nil {
// Handle error here based on specific usecase
// We can continue Listening
log.Printf("%v\n", err)
continue // go back to beginning of for loop
}

err = event.Unmarshal(&key)

if err != nil {
fmt.Printf("1st Listener | Error: Unmarshal %v\n", err)
} else {
fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Unmarshal(&key) key.Key1 = %s\n", ref.Path, event.Path, key.Key1)
fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Unmarshal(&key) key = %v\n", ref.Path, event.Path, key)
}

fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref.Path, event.Path, event.Snapshot())
fmt.Printf("\n")
}
}()

// 2nd listener
testpath2 := "user1/path1/path2"
ref2 := client.NewRef(testpath2)

iter2, err := ref2.Listen(ctx)
if err != nil {
fmt.Printf(" Error: failed to create Listener %v\n", err)
return
}

fmt.Printf("2nd Listener | Ref Path: %s | iter.Snapshot = %v\n", ref2.Path, iter2.Snapshot)
fmt.Printf(" | Ref Key: %s \n", ref2.Key)

defer iter2.Stop()

go func() {
for {

if iter2.Done() {
break
}

event, err := iter2.Next()

if err != nil {
// Handle error here based on specific usecase
// We can continue Listening
log.Printf("%v\n", err)
continue // go back to beginning of for loop
}

fmt.Printf("2nd Listener | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref2.Path, event.Path, event.Snapshot())
fmt.Printf("\n")
}
}()

fmt.Printf("\n >>> open a new separate command line terminal, to trigger events, run: go run . anyvalue\n")
fmt.Printf("\n >>> OR edit value of any key from %s in firebase console to trigger events\n\n", testpath)
fmt.Printf("\n >>> press <enter> to stop 1st Listener and close http connection\n\n")
fmt.Printf("Waiting for events...\n\n")

fmt.Scanln()
iter.Stop()

fmt.Printf("\n >>> press <enter> to stop 2nd Listener and close http connection\n\n")
fmt.Scanln()
iter2.Stop()

fmt.Printf("\n >>> press <enter> to exit app\n\n\n")
fmt.Scanln()
}

func triggerEvent(ctx context.Context, client *db.Client, testpath string, val string) {

var key Key

key.Key1 = val

ref := client.NewRef(testpath + "/path2/path3")

if err := ref.Set(ctx, key); err != nil {
log.Fatal(err)
} else {
fmt.Printf("OK - Set %s to key.Key1=%v\n", testpath+"/path2/path3", val)
}
}
98 changes: 98 additions & 0 deletions db/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package db

import (
"encoding/json"
"errors"
"net/http"
)

// SSE = Sever-Sent Events = ssevent

// EventType specific event type changes
type EventType uint

// EventType ...
const (
ChildChanged EventType = iota
ChildAdded // to be implemented
ChildRemoved // to be implemented
ValueChanged // to be implemented
)

// Event Sever-Sent Event object
type Event struct {
EventType EventType // ChildChanged, ValueChanged, ChildAdded, ChildRemoved

Data string // JSON-encoded snapshot
Path string // snapshot path
}

// SnapshotIterator iterator for continuous events
type SnapshotIterator struct {
Snapshot string // initial snapshot, JSON-encoded, returned from http Respoonse, server sent event, data part
SSEDataChan <-chan string // continuous event snapshot, channel receive only, directional channel
done *bool // Done listening to events, also used to prevent channel block
resp *http.Response // http connection keep alive
}

// Snapshot ssevent data, data part
func (e *Event) Snapshot() string {

return e.Data // ssevent data (snapshot), snapshot only, data part of ssevent data
}

// Unmarshal current snapshot Event.Data
func (e *Event) Unmarshal(v interface{}) error {

return json.Unmarshal([]byte(e.Data), v)
}

// Next realtime event
func (it *SnapshotIterator) Next() (*Event, error) {

// prevent channel block
if *it.done == true {
return nil, errors.New("SnapshotIterator is done or no longer active")
}

sseDataString := <-it.SSEDataChan

// todo: determine EventType

path, snapshot, err := splitSSEData(sseDataString)

return &Event{
EventType: ChildChanged,
Data: snapshot,
Path: path,
}, err

} // Next()

// Stop listening for realtime events
// close http connection
func (it *SnapshotIterator) Stop() {
*it.done = true
if it.resp != nil {
it.resp.Body.Close()
}
}

// Done can be used to check if Stop() have been called
func (it *SnapshotIterator) Done() bool {
return *it.done
}
Loading