-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Migrate files from github.com/sargassum-world/pslive/pkg/godest
- Loading branch information
Showing
17 changed files
with
2,884 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package database | ||
|
||
import ( | ||
"github.com/pkg/errors" | ||
"zombiezen.com/go/sqlite" | ||
|
||
"github.com/sargassum-world/godest/env" | ||
) | ||
|
||
// User-configured settings | ||
|
||
const envPrefix = "DATABASE_" | ||
|
||
type Config struct { | ||
URI string | ||
Flags sqlite.OpenFlags | ||
WritePoolSize int | ||
ReadPoolSize int | ||
} | ||
|
||
func GetConfig() (c Config, err error) { | ||
c.URI = env.GetString(envPrefix+"URI", "file:db.sqlite3") | ||
|
||
c.Flags = sqlite.OpenURI | sqlite.OpenNoMutex | sqlite.OpenSharedCache | sqlite.OpenWAL | ||
memory, err := env.GetBool(envPrefix + "MEMORY") | ||
if err != nil { | ||
return Config{}, errors.Wrap(err, "couldn't make SQLite in-memory config") | ||
} | ||
if memory { | ||
c.Flags |= sqlite.OpenMemory | ||
} | ||
|
||
const defaultWritePoolSize = 1 | ||
rawWritePoolSize, err := env.GetInt64(envPrefix+"WRITEPOOL", defaultWritePoolSize) | ||
if err != nil { | ||
return Config{}, errors.Wrap(err, "couldn't make SQLite write pool size config") | ||
} | ||
c.WritePoolSize = int(rawWritePoolSize) | ||
const defaultReadPoolSize = 16 | ||
rawReadPoolSize, err := env.GetInt64(envPrefix+"READPOOL", defaultReadPoolSize) | ||
if err != nil { | ||
return Config{}, errors.Wrap(err, "couldn't make SQLite read pool size config") | ||
} | ||
c.ReadPoolSize = int(rawReadPoolSize) | ||
return c, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package database | ||
|
||
import ( | ||
_ "embed" | ||
"strings" | ||
|
||
"github.com/pkg/errors" | ||
"zombiezen.com/go/sqlite" | ||
"zombiezen.com/go/sqlite/sqlitex" | ||
) | ||
|
||
//go:embed select-last-insert-rowid.sql | ||
var rawSelectLastInsertRowIDQuery string | ||
var selectLastInsertRowIDQuery string = strings.TrimSpace(rawSelectLastInsertRowIDQuery) | ||
|
||
func ExecuteInsertion( | ||
conn *sqlite.Conn, query string, namedParams map[string]interface{}, | ||
) (rowID int64, err error) { | ||
defer sqlitex.Save(conn)(&err) | ||
|
||
if err = sqlitex.Execute(conn, query, &sqlitex.ExecOptions{ | ||
Named: namedParams, | ||
}); err != nil { | ||
return 0, errors.Wrapf(err, "couldn't execute insertion statement") | ||
} | ||
|
||
if err = sqlitex.Execute(conn, selectLastInsertRowIDQuery, &sqlitex.ExecOptions{ | ||
ResultFunc: func(s *sqlite.Stmt) error { | ||
// TODO: instead | ||
rowID = s.GetInt64("row_id") | ||
return nil | ||
}, | ||
}); err != nil { | ||
return 0, errors.Wrapf(err, "couldn't look up id of inserted row") | ||
} | ||
return rowID, err | ||
} | ||
|
||
func ExecuteUpdate(conn *sqlite.Conn, query string, namedParams map[string]interface{}) error { | ||
return errors.Wrap( | ||
sqlitex.Execute(conn, query, &sqlitex.ExecOptions{ | ||
Named: namedParams, | ||
}), | ||
"couldn't execute update statement", | ||
) | ||
} | ||
|
||
func ExecuteDelete(conn *sqlite.Conn, query string, namedParams map[string]interface{}) error { | ||
return errors.Wrap( | ||
sqlitex.Execute(conn, query, &sqlitex.ExecOptions{ | ||
Named: namedParams, | ||
}), | ||
"couldn't execute delete statement", | ||
) | ||
} | ||
|
||
func ExecuteSelection( | ||
conn *sqlite.Conn, query string, namedParams map[string]interface{}, | ||
resultFunc func(s *sqlite.Stmt) error, | ||
) error { | ||
return errors.Wrap( | ||
sqlitex.Execute(conn, query, &sqlitex.ExecOptions{ | ||
Named: namedParams, | ||
ResultFunc: resultFunc, | ||
}), | ||
"couldn't execute selection statement", | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
// Package database provides a SQLite-backed store with migration support | ||
package database | ||
|
||
import ( | ||
"context" | ||
"io/fs" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/pkg/errors" | ||
"zombiezen.com/go/sqlite" | ||
"zombiezen.com/go/sqlite/sqlitemigration" | ||
"zombiezen.com/go/sqlite/sqlitex" | ||
) | ||
|
||
// Options | ||
|
||
type DBOption func(*DB) | ||
|
||
func WithPrepareConnQueries(queries fs.FS) DBOption { | ||
return func(db *DB) { | ||
db.prepareConnQueries = queries | ||
} | ||
} | ||
|
||
// DB | ||
|
||
type DB struct { | ||
Config Config | ||
prepareConnQueries fs.FS | ||
|
||
// Connection management | ||
connPools map[*sqlite.Conn]*sqlitex.Pool | ||
connPoolsL *sync.RWMutex | ||
writePool *sqlitex.Pool | ||
readPool *sqlitex.Pool | ||
} | ||
|
||
func NewDB(c Config, opts ...DBOption) (db *DB) { | ||
db = &DB{ | ||
Config: c, | ||
connPools: make(map[*sqlite.Conn]*sqlitex.Pool), | ||
connPoolsL: &sync.RWMutex{}, | ||
} | ||
|
||
for _, storeOption := range opts { | ||
storeOption(db) | ||
} | ||
return db | ||
} | ||
|
||
func (db *DB) Open() (err error) { | ||
if db.writePool, err = sqlitex.Open( | ||
db.Config.URI, db.Config.Flags|sqlite.OpenReadWrite|sqlite.OpenCreate, db.Config.WritePoolSize, | ||
); err != nil { | ||
return errors.Wrap(err, "couldn't open writer pool") | ||
} | ||
if db.readPool, err = sqlitex.Open( | ||
db.Config.URI, db.Config.Flags|sqlite.OpenReadOnly, db.Config.ReadPoolSize, | ||
); err != nil { | ||
return errors.Wrap(err, "couldn't open reader pool") | ||
} | ||
return nil | ||
} | ||
|
||
func (db *DB) Close() error { | ||
if err := db.writePool.Close(); err != nil { | ||
return errors.Wrap(err, "couldn't close writer pool") | ||
} | ||
if err := db.readPool.Close(); err != nil { | ||
return errors.Wrap(err, "couldn't close reader pool") | ||
} | ||
return nil | ||
} | ||
|
||
// Connection Acquisition | ||
|
||
func (db *DB) prepare(conn *sqlite.Conn, pool *sqlitex.Pool) error { | ||
if db.prepareConnQueries == nil { | ||
return nil | ||
} | ||
|
||
db.connPoolsL.RLock() | ||
_, initialized := db.connPools[conn] | ||
db.connPoolsL.RUnlock() | ||
if initialized { | ||
return nil | ||
} | ||
|
||
// TODO: embed the pragma queries for foreign keys, synchronous, and auto-checkpoint so that we | ||
// can parameterize them from environment variables | ||
queries, err := readQueries(db.prepareConnQueries, filterQuery) | ||
if err != nil { | ||
return errors.Wrap(err, "couldn't read connection preparation queries") | ||
} | ||
for _, query := range queries { | ||
// We run these as transient queries because non-transient query caching is per-connection, so | ||
// query caching provides no benefit for queries which are only run once per connection. | ||
if err := sqlitex.ExecuteTransient(conn, strings.TrimSpace(query), nil); err != nil { | ||
return errors.Wrap(err, "couldn't run connection preparation query") | ||
} | ||
} | ||
|
||
db.connPoolsL.Lock() | ||
defer db.connPoolsL.Unlock() | ||
|
||
db.connPools[conn] = pool | ||
return nil | ||
} | ||
|
||
func (db *DB) acquire(ctx context.Context, writable bool) (*sqlite.Conn, error) { | ||
pool := db.readPool | ||
if writable { | ||
pool = db.writePool | ||
} | ||
|
||
conn := pool.Get(ctx) | ||
if conn == nil { | ||
if err := ctx.Err(); err != nil { | ||
return nil, errors.Wrap(err, "couldn't get connection from pool") | ||
} | ||
return nil, errors.New("couldn't get connection from a closed pool") | ||
} | ||
if err := db.prepare(conn, pool); err != nil { | ||
pool.Put(conn) | ||
return nil, errors.Wrap(err, "couldn't prepare connection") | ||
} | ||
return conn, nil | ||
} | ||
|
||
func (db *DB) AcquireReader(ctx context.Context) (*sqlite.Conn, error) { | ||
conn, err := db.acquire(ctx, false) | ||
return conn, errors.Wrap(err, "couldn't acquire reader") | ||
} | ||
|
||
func (db *DB) AcquireWriter(ctx context.Context) (*sqlite.Conn, error) { | ||
conn, err := db.acquire(ctx, true) | ||
return conn, errors.Wrap(err, "couldn't acquire writer") | ||
} | ||
|
||
func (db *DB) ReleaseReader(conn *sqlite.Conn) { | ||
go db.readPool.Put(conn) | ||
} | ||
|
||
func (db *DB) ReleaseWriter(conn *sqlite.Conn) { | ||
// TODO: for writer connections, run the sqlite PRAGMA optimize command in the goroutine, with a | ||
// PRAGMA analysis_limit=1000 on the connection, if it hasn't been run on that connection for a | ||
// while. Log an error if it fails. | ||
go db.writePool.Put(conn) | ||
} | ||
|
||
func (db *DB) Migrate(ctx context.Context, schema sqlitemigration.Schema) error { | ||
// TODO: also implement down-migrations | ||
conn, err := db.AcquireWriter(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "couldn't acquire connection to migrate schemas") | ||
} | ||
defer db.ReleaseWriter(conn) | ||
|
||
err = sqlitemigration.Migrate(ctx, conn, schema) | ||
return errors.Wrap(err, "couldn't migrate schemas") | ||
} | ||
|
||
// Statement Execution | ||
|
||
func (db *DB) ExecuteInsertion( | ||
ctx context.Context, query string, namedParams map[string]interface{}, | ||
) (rowID int64, err error) { | ||
conn, err := db.AcquireWriter(ctx) | ||
if err != nil { | ||
return 0, errors.Wrap(err, "couldn't acquire writer to perform insertion") | ||
} | ||
defer db.ReleaseWriter(conn) | ||
|
||
return ExecuteInsertion(conn, query, namedParams) | ||
} | ||
|
||
func (db *DB) ExecuteUpdate( | ||
ctx context.Context, query string, namedParams map[string]interface{}, | ||
) error { | ||
conn, err := db.AcquireWriter(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "couldn't acquire writer to perform update") | ||
} | ||
defer db.ReleaseWriter(conn) | ||
|
||
return ExecuteUpdate(conn, query, namedParams) | ||
} | ||
|
||
func (db *DB) ExecuteDelete( | ||
ctx context.Context, query string, namedParams map[string]interface{}, | ||
) error { | ||
conn, err := db.AcquireWriter(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "couldn't acquire writer to perform delete") | ||
} | ||
defer db.ReleaseWriter(conn) | ||
|
||
return ExecuteDelete(conn, query, namedParams) | ||
} | ||
|
||
func (db *DB) ExecuteSelection( | ||
ctx context.Context, query string, namedParams map[string]interface{}, | ||
resultFunc func(s *sqlite.Stmt) error, | ||
) error { | ||
conn, err := db.AcquireReader(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "couldn't acquire reader to perform selection") | ||
} | ||
defer db.ReleaseReader(conn) | ||
|
||
return ExecuteSelection(conn, query, namedParams, resultFunc) | ||
} |
Oops, something went wrong.