Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix price per session #2892

Merged
merged 3 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Balances struct {
type balance struct {
lastUpdate time.Time // Unix time since last update
amount *big.Rat // Balance represented as a big.Rat
fixedPrice *big.Rat // Fixed price for the session
}

// NewBalances creates a Balances instance with the given ttl
Expand Down Expand Up @@ -181,6 +182,27 @@ func (b *Balances) Balance(id ManifestID) *big.Rat {
return b.balances[id].amount
}

// FixedPrice retrieves the price fixed the given session
func (b *Balances) FixedPrice(id ManifestID) *big.Rat {
b.mtx.RLock()
defer b.mtx.RUnlock()
if b.balances[id] == nil {
return nil
}
return b.balances[id].fixedPrice
}

// SetFixedPrice sets fixed price for the given session
func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
b.balances[id].fixedPrice = fixedPrice
b.balances[id].lastUpdate = time.Now()
}

func (b *Balances) cleanup() {
for id, balance := range b.balances {
b.mtx.Lock()
Expand Down
17 changes: 17 additions & 0 deletions core/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ func TestReserve(t *testing.T) {
assert.Zero(big.NewRat(0, 1).Cmp(b.Balance(mid)))
}

func TestFixedPrice(t *testing.T) {
b := NewBalances(5 * time.Second)
id1 := ManifestID("12345")
id2 := ManifestID("abcdef")

// No fixed price set yet
assert.Nil(t, b.FixedPrice(id1))

// Set fixed price
p := big.NewRat(1, 5)
b.SetFixedPrice(id1, p)
assert.Equal(t, p, b.FixedPrice(id1))

// No fixed price for a different manifest ID
assert.Nil(t, b.FixedPrice(id2))
}

func TestBalancesCleanup(t *testing.T) {
b := NewBalances(5 * time.Second)
assert := assert.New(t)
Expand Down
26 changes: 13 additions & 13 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ func TestPriceInfo(t *testing.T) {
recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil)
orch := NewOrchestrator(n, nil)

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err := common.PriceToFixed(expPricePerPixel)
Expand All @@ -1410,7 +1410,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(1010, 100)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1425,7 +1425,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(101, 1000)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1439,7 +1439,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(2525, 1000)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1458,7 +1458,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(11, 1)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1477,7 +1477,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(1100, 10)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1496,7 +1496,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(20, 1)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1509,7 +1509,7 @@ func TestPriceInfo(t *testing.T) {
n.SetBasePrice("default", big.NewRat(0, 1))
orch = NewOrchestrator(n, nil)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(priceInfo.PricePerUnit)
assert.Equal(int64(1), priceInfo.PixelsPerUnit)
Expand All @@ -1527,7 +1527,7 @@ func TestPriceInfo(t *testing.T) {
overhead := new(big.Rat).Add(big.NewRat(1, 1), new(big.Rat).Inv(txMultiplier))
expPricePerPixel = new(big.Rat).Mul(basePrice, overhead) // 23953749205332825000/926899968213313
require.Equal(expPricePerPixel.Num().Cmp(big.NewInt(int64(math.MaxInt64))), 1)
priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
// for this case price will be rounded when converting to fixed
assert.NotEqual(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)), 0)
Expand All @@ -1543,7 +1543,7 @@ func TestPriceInfo(t *testing.T) {

// Now make sure when AutoAdjustPrice = false we are returning the base price
n.AutoAdjustPrice = false
priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Equal(basePrice, big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))
}
Expand All @@ -1553,7 +1553,7 @@ func TestPriceInfo_GivenNilNode_ReturnsNilError(t *testing.T) {
orch := NewOrchestrator(n, nil)
orch.node = nil

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, err)
assert.Nil(t, priceInfo)
}
Expand All @@ -1563,7 +1563,7 @@ func TestPriceInfo_GivenNilRecipient_ReturnsNilError(t *testing.T) {
orch := NewOrchestrator(n, nil)
n.Recipient = nil

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, err)
assert.Nil(t, priceInfo)
}
Expand All @@ -1578,7 +1578,7 @@ func TestPriceInfo_TxMultiplierError_ReturnsError(t *testing.T) {
recipient.On("TxCostMultiplier", mock.Anything).Return(nil, expError)
orch := NewOrchestrator(n, nil)

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, priceInfo)
assert.EqualError(t, err, expError.Error())
}
Expand Down
24 changes: 21 additions & 3 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
return fmt.Errorf("invalid expected price sent with payment err=%q", "expected price is nil")
}

// During the first payment, set the fixed price per session
if balances, ok := orch.node.Balances.balances[sender]; ok {
if balances.FixedPrice(manifestID) == nil {
balances.SetFixedPrice(manifestID, priceInfoRat)
glog.V(6).Infof("Setting fixed price=%v for session=%v", priceInfoRat, manifestID)
}
}

ticketParams := &pm.TicketParams{
Recipient: ethcommon.BytesToAddress(payment.TicketParams.Recipient),
FaceValue: new(big.Int).SetBytes(payment.TicketParams.FaceValue),
Expand Down Expand Up @@ -252,12 +260,12 @@ func (orch *orchestrator) TicketParams(sender ethcommon.Address, priceInfo *net.
}, nil
}

func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (orch *orchestrator) PriceInfo(sender ethcommon.Address, manifestID ManifestID) (*net.PriceInfo, error) {
if orch.node == nil || orch.node.Recipient == nil {
return nil, nil
}

price, err := orch.priceInfo(sender)
price, err := orch.priceInfo(sender, manifestID)
if err != nil {
return nil, err
}
Expand All @@ -273,9 +281,19 @@ func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, e
}

// priceInfo returns price per pixel as a fixed point number wrapped in a big.Rat
func (orch *orchestrator) priceInfo(sender ethcommon.Address) (*big.Rat, error) {
func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID ManifestID) (*big.Rat, error) {
basePrice := orch.node.GetBasePrice(sender.String())

// If there is already a fixed price for the given session, use this price
if manifestID != "" {
if balances, ok := orch.node.Balances.balances[sender]; ok {
fixedPrice := balances.FixedPrice(manifestID)
if fixedPrice != nil {
return fixedPrice, nil
}
}
}

if basePrice == nil {
basePrice = orch.node.GetBasePrice("default")
}
Expand Down
1 change: 1 addition & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core.
Balance: balance,
lock: &sync.RWMutex{},
OrchestratorScore: oScore,
InitialPrice: od.RemoteInfo.PriceInfo,
}

sessions = append(sessions, session)
Expand Down
13 changes: 7 additions & 6 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Orchestrator interface {
TranscoderResults(job int64, res *core.RemoteTranscoderResult)
ProcessPayment(ctx context.Context, payment net.Payment, manifestID core.ManifestID) error
TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error)
PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error)
PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error)
SufficientBalance(addr ethcommon.Address, manifestID core.ManifestID) bool
DebitFees(addr ethcommon.Address, manifestID core.ManifestID, price *net.PriceInfo, pixels int64)
Capabilities() *net.Capabilities
Expand Down Expand Up @@ -118,6 +118,7 @@ type BroadcastSession struct {
OrchestratorOS drivers.OSSession
PMSessionID string
Balance Balance
InitialPrice *net.PriceInfo
}

func (bs *BroadcastSession) Transcoder() string {
Expand Down Expand Up @@ -323,7 +324,7 @@ func getOrchestrator(orch Orchestrator, req *net.OrchestratorRequest) (*net.Orch
}

// currently, orchestrator == transcoder
return orchestratorInfo(orch, addr, orch.ServiceURI().String())
return orchestratorInfo(orch, addr, orch.ServiceURI().String(), "")
}

func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net.EndTranscodingSessionRequest) (*net.EndTranscodingSessionResponse, error) {
Expand All @@ -335,18 +336,18 @@ func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net.
return &net.EndTranscodingSessionResponse{}, nil
}

func getPriceInfo(orch Orchestrator, addr ethcommon.Address) (*net.PriceInfo, error) {
func getPriceInfo(orch Orchestrator, addr ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
if AuthWebhookURL != nil {
webhookRes := getFromDiscoveryAuthWebhookCache(addr.Hex())
if webhookRes != nil && webhookRes.PriceInfo != nil {
return webhookRes.PriceInfo, nil
}
}
return orch.PriceInfo(addr)
return orch.PriceInfo(addr, manifestID)
}

func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string) (*net.OrchestratorInfo, error) {
priceInfo, err := getPriceInfo(orch, addr)
func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string, manifestID core.ManifestID) (*net.OrchestratorInfo, error) {
priceInfo, err := getPriceInfo(orch, addr, manifestID)
if err != nil {
return nil, err
}
Expand Down
30 changes: 23 additions & 7 deletions server/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (r *stubOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net
return r.ticketParams, nil
}

func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
return r.priceInfo, nil
}

Expand Down Expand Up @@ -696,7 +696,23 @@ func TestValidatePrice(t *testing.T) {
err = validatePrice(s)
assert.Nil(err)

// O Initial Price == O Price
s.InitialPrice = oinfo.PriceInfo
err = validatePrice(s)
assert.Nil(err)

// O Initial Price higher than O Price
s.InitialPrice = &net.PriceInfo{PricePerUnit: 10, PixelsPerUnit: 3}
err = validatePrice(s)
assert.Nil(err)

// O Initial Price lower than O Price
s.InitialPrice = &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 10}
err = validatePrice(s)
assert.ErrorContains(err, "price has changed")

// B MaxPrice < O Price
s.InitialPrice = nil
BroadcastCfg.SetMaxPrice(big.NewRat(1, 5))
err = validatePrice(s)
assert.EqualError(err, fmt.Sprintf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5)))
Expand Down Expand Up @@ -1034,7 +1050,7 @@ func TestGetPriceInfo_NoWebhook_DefaultPriceError_ReturnsError(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(nil, expErr)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Nil(p)
assert.EqualError(err, expErr.Error())
}
Expand All @@ -1052,7 +1068,7 @@ func TestGetPriceInfo_NoWebhook_ReturnsDefaultPrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand All @@ -1076,7 +1092,7 @@ func TestGetPriceInfo_Webhook_NoCache_ReturnsDefaultPrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand All @@ -1102,7 +1118,7 @@ func TestGetPriceInfo_Webhook_Cache_WrongType_ReturnsDefaultPrice(t *testing.T)

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand Down Expand Up @@ -1133,7 +1149,7 @@ func TestGetPriceInfo_Webhook_Cache_ReturnsCachePrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(20))
assert.Equal(p.PixelsPerUnit, int64(19))
assert.Nil(err)
Expand Down Expand Up @@ -1307,7 +1323,7 @@ func (o *mockOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net
return nil, args.Error(1)
}

func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
args := o.Called(sender)
if args.Get(0) != nil {
return args.Get(0).(*net.PriceInfo), args.Error(1)
Expand Down
7 changes: 6 additions & 1 deletion server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) {
return
}

oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String())
oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String(), core.ManifestID(segData.AuthToken.SessionId))
if err != nil {
clog.Errorf(ctx, "Error updating orchestrator info - err=%q", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand Down Expand Up @@ -864,6 +864,11 @@ func validatePrice(sess *BroadcastSession) error {
if maxPrice != nil && oPrice.Cmp(maxPrice) == 1 {
return fmt.Errorf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", maxPrice.Num().Int64(), maxPrice.Denom().Int64())
}
iPrice, err := common.RatPriceInfo(sess.InitialPrice)
if err == nil && iPrice != nil && oPrice.Cmp(iPrice) == 1 {
return fmt.Errorf("Orchestrator price has changed, Orchestrator price: %v, Orchestrator initial price: %v", oPrice, iPrice)
}

return nil
}

Expand Down
Loading