Skip to content

Commit

Permalink
Merge pull request #27 from kaspa-live/children-nav
Browse files Browse the repository at this point in the history
Navigate the DAG through block children
  • Loading branch information
tiram88 authored Apr 23, 2022
2 parents d7dd2e9 + 68e25ad commit 1603007
Show file tree
Hide file tree
Showing 17 changed files with 527 additions and 137 deletions.
94 changes: 56 additions & 38 deletions processing/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/kaspa-live/kaspa-graph-inspector/processing/database/model"
"github.com/kaspa-live/kaspa-graph-inspector/processing/database/utils/lrucache"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/pkg/errors"
)

type Database struct {
Expand All @@ -25,6 +26,13 @@ type blockBase struct {
Height uint64
}

func (bb *blockBase) Clone() *blockBase {
return &blockBase{
ID: bb.ID,
Height: bb.Height,
}
}

func New(pgDatabase *pg.DB) *Database {
database := &Database{
database: pgDatabase,
Expand All @@ -42,25 +50,25 @@ func (db *Database) RunInTransaction(transactionFunction func(*pg.Tx) error) err

// Load block infos into the memory cache for all blocks having a height geater or equal to minHeight
func (db *Database) LoadCache(databaseTransaction *pg.Tx, minHeight uint64) error {
var infos []struct {
var results []struct {
ID uint64
BlockHash string
Height uint64
}
_, err := databaseTransaction.Query(&infos, "SELECT id, block_hash, height FROM blocks WHERE height >= ?", minHeight)
_, err := databaseTransaction.Query(&results, "SELECT id, block_hash, height FROM blocks WHERE height >= ?", minHeight)
if err != nil {
return err
}
db.clearCache()
for _, info := range infos {
blockHash, err := externalapi.NewDomainHashFromString(info.BlockHash)
for _, result := range results {
blockHash, err := externalapi.NewDomainHashFromString(result.BlockHash)
if err != nil {
return err
}

bb := &blockBase{
ID: info.ID,
Height: info.Height,
ID: result.ID,
Height: result.Height,
}
db.blockBaseCache.Add(blockHash, bb)
}
Expand All @@ -78,23 +86,16 @@ func (db *Database) DoesBlockExist(databaseTransaction *pg.Tx, blockHash *extern
}

// Search database
var infos []struct {
ID uint64
Height uint64
}
_, err := databaseTransaction.Query(&infos, "SELECT id, height FROM blocks WHERE block_hash = ?", blockHash.String())
var results []blockBase

_, err := databaseTransaction.Query(&results, "SELECT id, height FROM blocks WHERE block_hash = ?", blockHash.String())
if err != nil {
return false, err
}
if len(infos) != 1 {
if len(results) != 1 {
return false, nil
}

bb := &blockBase{
ID: infos[0].ID,
Height: infos[0].Height,
}
db.blockBaseCache.Add(blockHash, bb)
db.blockBaseCache.Add(blockHash, results[0].Clone())

return true, nil
}
Expand All @@ -114,10 +115,11 @@ func (db *Database) InsertBlock(databaseTransaction *pg.Tx, blockHash *externala
return nil
}

// Get a block from the database by its ID
// GetBlock returns a block identified by `id`.
// Returns an error if the block `id` does not exist
func (db *Database) GetBlock(databaseTransaction *pg.Tx, id uint64) (*model.Block, error) {
result := new(model.Block)
_, err := databaseTransaction.Query(result, "SELECT * FROM blocks WHERE id = ?", id)
_, err := databaseTransaction.QueryOne(result, "SELECT * FROM blocks WHERE id = ?", id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -160,7 +162,7 @@ func (db *Database) UpdateBlockColors(databaseTransaction *pg.Tx, blockIDsToColo
return nil
}

// Update DAA Scores of blocks in the database
// UpdateBlockDAAScores updates DAA Scores of block ids
func (db *Database) UpdateBlockDAAScores(databaseTransaction *pg.Tx, blockIDsToDAAScores map[uint64]uint64) error {
for blockID, daaScore := range blockIDsToDAAScores {
_, err := databaseTransaction.Exec("UPDATE blocks SET daa_score = ? WHERE id = ?", daaScore, blockID)
Expand All @@ -171,6 +173,8 @@ func (db *Database) UpdateBlockDAAScores(databaseTransaction *pg.Tx, blockIDsToD
return nil
}

// blockBaseByHash returns the id of a block idendified by `blockHash`.
// Returns an error if `blockHash` does not exist in the database
func (db *Database) BlockIDByHash(databaseTransaction *pg.Tx, blockHash *externalapi.DomainHash) (uint64, error) {
bb, err := db.blockBaseByHash(databaseTransaction, blockHash)
if err != nil {
Expand All @@ -179,6 +183,8 @@ func (db *Database) BlockIDByHash(databaseTransaction *pg.Tx, blockHash *externa
return bb.ID, err
}

// blockBaseByHash returns the height of a block idendified by `blockHash`.
// Returns an error if `blockHash` does not exist in the database
func (db *Database) BlockHeightByHash(databaseTransaction *pg.Tx, blockHash *externalapi.DomainHash) (uint64, error) {
bb, err := db.blockBaseByHash(databaseTransaction, blockHash)
if err != nil {
Expand All @@ -187,31 +193,27 @@ func (db *Database) BlockHeightByHash(databaseTransaction *pg.Tx, blockHash *ext
return bb.Height, err
}

// blockBaseByHash returns a `blockBase` for a block idendified by `blockHash`.
// Returns an error if `blockHash` does not exist in the database
func (db *Database) blockBaseByHash(databaseTransaction *pg.Tx, blockHash *externalapi.DomainHash) (*blockBase, error) {
// Search cache
if cachedBlockBase, ok := db.blockBaseCache.Get(blockHash); ok {
return cachedBlockBase, nil
}

// Search database
var info struct {
ID uint64
Height uint64
}
_, err := databaseTransaction.Query(&info, "SELECT id, height FROM blocks WHERE block_hash = ?", blockHash.String())
var result blockBase
_, err := databaseTransaction.QueryOne(&result, "SELECT id, height FROM blocks WHERE block_hash = ?", blockHash.String())
if err != nil {
return nil, err
}

bb := &blockBase{
ID: info.ID,
Height: info.Height,
return nil, errors.Wrapf(err, "block hash %s not found in blocks table", blockHash.String())
}
db.blockBaseCache.Add(blockHash, bb)
db.blockBaseCache.Add(blockHash, &result)

return bb, nil
return &result, nil
}

// BlockIDsByHashes returns an arrays of ids for `blockHashes` hashes.
// Returns an error if any hash in `blockHash` does not exist in the database
func (db *Database) BlockIDsByHashes(databaseTransaction *pg.Tx, blockHashes []*externalapi.DomainHash) ([]uint64, error) {
blockIDs := make([]uint64, len(blockHashes))
for i, blockHash := range blockHashes {
Expand All @@ -224,6 +226,8 @@ func (db *Database) BlockIDsByHashes(databaseTransaction *pg.Tx, blockHashes []*
return blockIDs, nil
}

// BlockIDsAndHeightsByHashes returns two arrays, one of ids and one of heights
// for `blockHashes` hashes
func (db *Database) BlockIDsAndHeightsByHashes(databaseTransaction *pg.Tx, blockHashes []*externalapi.DomainHash) ([]uint64, []uint64, error) {
blockIDs := make([]uint64, len(blockHashes))
blockHeights := make([]uint64, len(blockHashes))
Expand All @@ -238,8 +242,9 @@ func (db *Database) BlockIDsAndHeightsByHashes(databaseTransaction *pg.Tx, block
return blockIDs, blockHeights, nil
}

// Find the index in a DAG ordered block hash array of the latest block hash
// that is stored in the database
// FindLatestStoredBlockIndex returns the index in a DAG ordered block hash
// array `blockHashes` of the latest block hash that is stored in the
// database
func (db *Database) FindLatestStoredBlockIndex(databaseTransaction *pg.Tx, blockHashes []*externalapi.DomainHash) (int, error) {
// We use binary search since hash array is ordered from oldest to latest and
// this ordering is also applied when storing blocks in the database
Expand All @@ -260,18 +265,31 @@ func (db *Database) FindLatestStoredBlockIndex(databaseTransaction *pg.Tx, block
return low, nil
}

// Find the block ID of the block having the closest DAA score to a given score.
// BlockIDByDAAScore returns the block ID of one block having the closest DAA
// score to `blockDAAScore`
func (db *Database) BlockIDByDAAScore(databaseTransaction *pg.Tx, blockDAAScore uint64) (uint64, error) {
var result struct {
ID uint64
}
_, err := databaseTransaction.Query(&result, "SELECT id FROM blocks ORDER BY ABS(daa_score-(?)) LIMIT 1", pg.In(blockDAAScore))
_, err := databaseTransaction.QueryOne(&result, "SELECT id FROM blocks ORDER BY ABS(daa_score-(?)) LIMIT 1", blockDAAScore)
if err != nil {
return 0, err
}
return result.ID, nil
}

// BlockCountAtDAAScore returns the number of blocks having a DAA Score of `blockDAAScore`
func (db *Database) BlockCountAtDAAScore(databaseTransaction *pg.Tx, blockDAAScore uint64) (uint32, error) {
var result struct {
N uint32
}
_, err := databaseTransaction.Query(&result, "SELECT COUNT(*) AS N FROM blocks WHERE daa_score = (?)", blockDAAScore)
if err != nil {
return 0, err
}
return result.N, nil
}

func (db *Database) HighestBlockHeight(databaseTransaction *pg.Tx, blockIDs []uint64) (uint64, error) {
var result struct {
Highest uint64
Expand Down
2 changes: 1 addition & 1 deletion processing/database/utils/lrucache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *LRUCache[T]) Add(key *externalapi.DomainHash, value *T) {
if len(c.cache) > c.capacity {
c.evictRandom()
}
}
}

// Get returns the entry for the given key, or (nil, false) otherwise
func (c *LRUCache[T]) Get(key *externalapi.DomainHash) (*T, bool) {
Expand Down
106 changes: 96 additions & 10 deletions processing/infrastructure/config/config.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,131 @@
package config

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/jessevdk/go-flags"
"github.com/kaspa-live/kaspa-graph-inspector/processing/infrastructure/logging"
kaspaConfigPackage "github.com/kaspanet/kaspad/infrastructure/config"
kaspaLogger "github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
)

const (
appDataDirectory = "kaspa-graph-inspector-processing"
appDataDirectory = "kgi-processing"
defaultLogDirname = "logs"
defaultLogLevel = "info"
defaultLogFilename = "kgi-processing.log"
defaultErrLogFilename = "kgi-processing_err.log"
)

var (
HomeDir = util.AppDir(appDataDirectory, false)
// DefaultAppDir is the default home directory for kaspad.
DefaultAppDir = util.AppDir(appDataDirectory, false)
defaultDataDir = filepath.Join(DefaultAppDir)
)

type Config struct {
type Flags struct {
AppDir string `short:"b" long:"appdir" description:"Directory to store data"`
LogDir string `long:"logdir" description:"Directory to log output."`
DatabaseConnectionString string `long:"connection-string" description:"Connection string for PostgrSQL database to connect to. Should be of the form: postgres://<username>:<password>@<host>:<port>/<database name>"`
ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"`
DNSSeed string `long:"dnsseed" description:"Override DNS seeds with specified hostname (Only 1 hostname allowed)"`
GRPCSeed string `long:"grpcseed" description:"Hostname of gRPC server for seeding peers"`
ClearDB bool `long:"clear-db" description:"Clear the postgres database and resync from scratch"`
Resync bool `long:"resync" description:"Force to resync all available node blocks with the PostgrSQL database -- Use if some recently added blocks have missing parents"`
ClearDB bool `long:"clear-db" description:"Clear the PostgrSQL database and sync from scratch"`
LogLevel string `short:"d" long:"loglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
kaspaConfigPackage.NetworkFlags
}

func Parse() (*Config, error) {
config := &Config{}
parser := flags.NewParser(config, flags.HelpFlag)
type Config struct {
*Flags
}

// cleanAndExpandPath expands environment variables and leading ~ in the
// passed path, cleans the result, and returns it.
func cleanAndExpandPath(path string) string {
// Expand initial ~ to OS specific home directory.
if strings.HasPrefix(path, "~") {
homeDir := filepath.Dir(DefaultAppDir)
path = strings.Replace(path, "~", homeDir, 1)
}

// NOTE: The os.ExpandEnv doesn't work with Windows-style %VARIABLE%,
// but they variables can still be expanded via POSIX-style $VARIABLE.
return filepath.Clean(os.ExpandEnv(path))
}

func defaultFlags() *Flags {
return &Flags{
AppDir: defaultDataDir,
LogLevel: defaultLogLevel,
}
}

func LoadConfig() (*Config, error) {
funcName := "loadConfig"
appName := filepath.Base(os.Args[0])
appName = strings.TrimSuffix(appName, filepath.Ext(appName))
usageMessage := fmt.Sprintf("Use %s -h to show usage", appName)

cfgFlags := defaultFlags()
parser := flags.NewParser(cfgFlags, flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
var flagsErr *flags.Error
if ok := errors.As(err, &flagsErr); !ok || flagsErr.Type != flags.ErrHelp {
return nil, errors.Wrapf(err, "Error parsing command line arguments: %s\n\n%s", err, usageMessage)
}
return nil, err
}
cfg := &Config{
Flags: cfgFlags,
}

if config.DatabaseConnectionString == "" {
if cfg.DatabaseConnectionString == "" {
return nil, errors.Errorf("--connection-string is required.")
}

err = config.ResolveNetwork(parser)
err = cfg.ResolveNetwork(parser)
if err != nil {
return nil, err
}

return config, nil
cfg.AppDir = cleanAndExpandPath(cfg.AppDir)
// Append the network type to the app directory so it is "namespaced"
// per network.
// All data is specific to a network, so namespacing the data directory
// means each individual piece of serialized data does not have to
// worry about changing names per network and such.
cfg.AppDir = filepath.Join(cfg.AppDir, cfg.NetParams().Name)

// Logs directory is usually under the home directory, unless otherwise specified
if cfg.LogDir == "" {
cfg.LogDir = filepath.Join(cfg.AppDir, defaultLogDirname)
}
cfg.LogDir = cleanAndExpandPath(cfg.LogDir)

// Special show command to list supported subsystems and exit.
if cfg.LogLevel == "show" {
fmt.Println("Supported subsystems", kaspaLogger.SupportedSubsystems())
os.Exit(0)
}

// Initialize log rotation. After log rotation has been initialized, the
// logger variables may be used.
logging.InitLog(filepath.Join(cfg.LogDir, defaultLogFilename), filepath.Join(cfg.LogDir, defaultErrLogFilename))

// Parse, validate, and set debug log level(s).
if err := kaspaLogger.ParseAndSetLogLevels(cfg.LogLevel); err != nil {
err := errors.Errorf("%s: %s", funcName, err.Error())
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, err
}

return cfg, nil
}
5 changes: 2 additions & 3 deletions processing/infrastructure/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ const (

var (
log = logging.Logger()

databaseDirectory = filepath.Join(config.HomeDir, databaseDirectoryName)
)

func Open() (database.Database, error) {
func Open(config *config.Config) (database.Database, error) {
databaseDirectory := filepath.Join(config.AppDir, databaseDirectoryName)
log.Infof("Loading database from '%s'", databaseDirectory)
return ldb.NewLevelDB(databaseDirectory, levelDBCacheSizeMiB)
}
Loading

0 comments on commit 1603007

Please sign in to comment.