Skip to content

Commit

Permalink
[payments] meterer structs and helpers (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Oct 16, 2024
1 parent abadc9b commit aa7b798
Show file tree
Hide file tree
Showing 14 changed files with 1,160 additions and 3 deletions.
15 changes: 15 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

import (
"context"
"errors"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -408,3 +409,17 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([

return items, nil
}

// TableExists checks if a table exists and can be described
func (c *Client) TableExists(ctx context.Context, name string) error {
if name == "" {
return errors.New("table name is empty")
}
_, err := c.dynamoClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(name),
})
if err != nil {
return err
}
return nil
}
4 changes: 2 additions & 2 deletions core/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo
denom := new(big.Int).Mul(big.NewInt(int64(info.ConfirmationThreshold-info.AdversaryThreshold)), totalStake)
maxChunkLength := uint(roundUpDivideBig(num, denom).Uint64())

maxChunkLength2 := roundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))
maxChunkLength2 := RoundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))

if maxChunkLength < maxChunkLength2 {
maxChunkLength = maxChunkLength2
Expand Down Expand Up @@ -271,7 +271,7 @@ func roundUpDivideBig(a, b *big.Int) *big.Int {

}

func roundUpDivide(a, b uint) uint {
func RoundUpDivide(a, b uint) uint {
return (a + b - 1) / b

}
Expand Down
49 changes: 48 additions & 1 deletion core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/binary"
"errors"
"fmt"
"math/big"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/ethereum/go-ethereum/crypto"
)

type AccountID = string
Expand Down Expand Up @@ -291,7 +293,7 @@ func (b *BlobHeader) EncodedSizeAllQuorums() int64 {
size := int64(0)
for _, quorum := range b.QuorumInfos {

size += int64(roundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
size += int64(RoundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
}
return size
}
Expand Down Expand Up @@ -470,3 +472,48 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) {
}
return c, nil
}

// PaymentMetadata represents the header information for a blob
type PaymentMetadata struct {
// Existing fields
AccountID string

// New fields
BinIndex uint32
// TODO: we are thinking the contract can use uint128 for cumulative payment,
// but the definition on v2 uses uint64. Double check with team.
CumulativePayment uint64
}

// Hash returns the Keccak256 hash of the PaymentMetadata
func (pm *PaymentMetadata) Hash() []byte {
// Create a byte slice to hold the serialized data
data := make([]byte, 0, len(pm.AccountID)+12)

data = append(data, []byte(pm.AccountID)...)

binIndexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex)
data = append(data, binIndexBytes...)

paymentBytes := make([]byte, 8)
binary.BigEndian.PutUint64(paymentBytes, pm.CumulativePayment)
data = append(data, paymentBytes...)

return crypto.Keccak256(data)
}

// OperatorInfo contains information about an operator which is stored on the blockchain state,
// corresponding to a particular quorum
type ActiveReservation struct {
DataRate uint64 // Bandwidth per reservation bin
StartTimestamp uint64 // Unix timestamp that's valid for basically eternity
EndTimestamp uint64

QuorumNumbers []uint8
QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100
}

type OnDemandPayment struct {
CumulativePayment *big.Int // Total amount deposited by the user
}
10 changes: 10 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,16 @@ func (t *Transactor) GetRequiredQuorumNumbers(ctx context.Context, blockNumber u
return requiredQuorums, nil
}

func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint32) (map[string]core.ActiveReservation, error) {
// contract is not implemented yet
return map[string]core.ActiveReservation{}, nil
}

func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint32) (map[string]core.OnDemandPayment, error) {
// contract is not implemented yet
return map[string]core.OnDemandPayment{}, nil
}

func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDAServiceManagerAddr gethcommon.Address) error {

contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(eigenDAServiceManagerAddr, t.EthClient)
Expand Down
53 changes: 53 additions & 0 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package meterer

import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
)

// Config contains network parameters that should be published on-chain. We currently configure these params through disperser env vars.
type Config struct {
// GlobalBytesPerSecond is the rate limit in bytes per second for on-demand payments
GlobalBytesPerSecond uint64
// MinChargeableSize is the minimum size of a chargeable unit in bytes, used as a floor for on-demand payments
MinChargeableSize uint32
// PricePerChargeable is the price per chargeable unit in gwei, used for on-demand payments
PricePerChargeable uint32
// ReservationWindow is the duration of all reservations in seconds, used to calculate bin indices
ReservationWindow uint32

// ChainReadTimeout is the timeout for reading payment state from chain
ChainReadTimeout time.Duration
}

// Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header
// with payments information (CumulativePayments, BinIndex, and Signature). Disperser will pass the blob header to the meterer, which will check if the
// payments information is valid.
type Meterer struct {
Config

// ChainState reads on-chain payment state periodically and cache it in memory
ChainState OnchainPayment
// OffchainStore uses DynamoDB to track metering and used to validate requests
OffchainStore OffchainStore

logger logging.Logger
}

func NewMeterer(
config Config,
paymentChainState OnchainPayment,
offchainStore OffchainStore,
logger logging.Logger,
) (*Meterer, error) {
// TODO: create a separate thread to pull from the chain and update chain state
return &Meterer{
Config: config,

ChainState: paymentChainState,
OffchainStore: offchainStore,

logger: logger.With("component", "Meterer"),
}, nil
}
147 changes: 147 additions & 0 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package meterer_test

import (
"crypto/ecdsa"
"fmt"
"os"
"testing"
"time"

"github.com/Layr-Labs/eigenda/common"
commonaws "github.com/Layr-Labs/eigenda/common/aws"
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ory/dockertest/v3"

"github.com/Layr-Labs/eigensdk-go/logging"
)

var (
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
dynamoClient *commondynamodb.Client
clientConfig commonaws.ClientConfig
privateKey1 *ecdsa.PrivateKey
privateKey2 *ecdsa.PrivateKey
mt *meterer.Meterer

deployLocalStack bool
localStackPort = "4566"
paymentChainState = &mock.MockOnchainPaymentState{}
)

func TestMain(m *testing.M) {
setup(m)
code := m.Run()
teardown()
os.Exit(code)
}

func setup(_ *testing.M) {

deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false")
if !deployLocalStack {
localStackPort = os.Getenv("LOCALSTACK_PORT")
}

if deployLocalStack {
var err error
dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort)
if err != nil {
teardown()
panic("failed to start localstack container")
}
}

loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
if err != nil {
teardown()
panic("failed to create logger")
}

clientConfig = commonaws.ClientConfig{
Region: "us-east-1",
AccessKey: "localstack",
SecretAccessKey: "localstack",
EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort),
}

dynamoClient, err = commondynamodb.NewClient(clientConfig, logger)
if err != nil {
teardown()
panic("failed to create dynamodb client")
}

privateKey1, err = crypto.GenerateKey()
if err != nil {
teardown()
panic("failed to generate private key")
}
privateKey2, err = crypto.GenerateKey()
if err != nil {
teardown()
panic("failed to generate private key")
}

logger = logging.NewNoopLogger()
config := meterer.Config{
PricePerChargeable: 1,
MinChargeableSize: 1,
GlobalBytesPerSecond: 1000,
ReservationWindow: 60,
ChainReadTimeout: 3 * time.Second,
}

err = meterer.CreateReservationTable(clientConfig, "reservations")
if err != nil {
teardown()
panic("failed to create reservation table")
}
err = meterer.CreateOnDemandTable(clientConfig, "ondemand")
if err != nil {
teardown()
panic("failed to create ondemand table")
}
err = meterer.CreateGlobalReservationTable(clientConfig, "global")
if err != nil {
teardown()
panic("failed to create global reservation table")
}

store, err := meterer.NewOffchainStore(
clientConfig,
"reservations",
"ondemand",
"global",
logger,
)

if err != nil {
teardown()
panic("failed to create offchain store")
}

// add some default sensible configs
mt, err = meterer.NewMeterer(
config,
paymentChainState,
store,
logging.NewNoopLogger(),
// metrics.NewNoopMetrics(),
)

if err != nil {
teardown()
panic("failed to create meterer")
}
}

func teardown() {
if deployLocalStack {
deploy.PurgeDockertestResources(dockertestPool, dockertestResource)
}
}
Loading

0 comments on commit aa7b798

Please sign in to comment.