From 2d8c94b9223d98620af5b50364a86c52f901d49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 17 Oct 2023 14:25:24 +0200 Subject: [PATCH 1/3] Disallow increasing O price during the streaming session --- server/broadcast.go | 1 + server/rpc.go | 1 + server/rpc_test.go | 16 ++++++++++++++++ server/segment_rpc.go | 5 +++++ 4 files changed, 23 insertions(+) diff --git a/server/broadcast.go b/server/broadcast.go index f7ca6b64c4..6fc39b2d2d 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -835,6 +835,7 @@ func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core. Balance: balance, lock: &sync.RWMutex{}, OrchestratorScore: oScore, + InitialPrice: od.RemoteInfo.PriceInfo, } sessions = append(sessions, session) diff --git a/server/rpc.go b/server/rpc.go index 4815375006..91a54df7ad 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -118,6 +118,7 @@ type BroadcastSession struct { OrchestratorOS drivers.OSSession PMSessionID string Balance Balance + InitialPrice *net.PriceInfo } func (bs *BroadcastSession) Transcoder() string { diff --git a/server/rpc_test.go b/server/rpc_test.go index 227337e5ad..5e48080397 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -696,7 +696,23 @@ func TestValidatePrice(t *testing.T) { err = validatePrice(s) assert.Nil(err) + // O Initial Price == O Price + s.InitialPrice = oinfo.PriceInfo + err = validatePrice(s) + assert.Nil(err) + + // O Initial Price higher than O Price + s.InitialPrice = &net.PriceInfo{PricePerUnit: 10, PixelsPerUnit: 3} + err = validatePrice(s) + assert.Nil(err) + + // O Initial Price lower than O Price + s.InitialPrice = &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 10} + err = validatePrice(s) + assert.ErrorContains(err, "price has changed") + // B MaxPrice < O Price + s.InitialPrice = nil BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) err = validatePrice(s) assert.EqualError(err, fmt.Sprintf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5))) diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 4b60320a29..d0f0f26838 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -864,6 +864,11 @@ func validatePrice(sess *BroadcastSession) error { if maxPrice != nil && oPrice.Cmp(maxPrice) == 1 { return fmt.Errorf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", maxPrice.Num().Int64(), maxPrice.Denom().Int64()) } + iPrice, err := common.RatPriceInfo(sess.InitialPrice) + if err == nil && iPrice != nil && oPrice.Cmp(iPrice) == 1 { + return fmt.Errorf("Orchestrator price has changed, Orchestrator price: %v, Orchestrator initial price: %v", oPrice, iPrice) + } + return nil } From 2f08b091d5d0216ed5b8d51cda67b6f301ca52ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 18 Oct 2023 11:43:47 +0200 Subject: [PATCH 2/3] Fix O's price for the streaming session --- core/accounting.go | 22 ++++++++++++++++++++++ core/orchestrator.go | 29 ++++++++++++++++++++++++++--- server/rpc.go | 12 ++++++------ server/segment_rpc.go | 2 +- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/core/accounting.go b/core/accounting.go index 48aa03bd22..d6d972b0ee 100644 --- a/core/accounting.go +++ b/core/accounting.go @@ -124,6 +124,7 @@ type Balances struct { type balance struct { lastUpdate time.Time // Unix time since last update amount *big.Rat // Balance represented as a big.Rat + fixedPrice *big.Rat // Fixed price for the session } // NewBalances creates a Balances instance with the given ttl @@ -181,6 +182,27 @@ func (b *Balances) Balance(id ManifestID) *big.Rat { return b.balances[id].amount } +// FixedPrice retrieves the price fixed the given session +func (b *Balances) FixedPrice(id ManifestID) *big.Rat { + b.mtx.RLock() + defer b.mtx.RUnlock() + if b.balances[id] == nil { + return nil + } + return b.balances[id].fixedPrice +} + +// SetFixedPrice sets fixed price for the given session +func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) { + b.mtx.Lock() + defer b.mtx.Unlock() + if b.balances[id] == nil { + b.balances[id] = &balance{amount: big.NewRat(0, 1)} + } + b.balances[id].fixedPrice = fixedPrice + b.balances[id].lastUpdate = time.Now() +} + func (b *Balances) cleanup() { for id, balance := range b.balances { b.mtx.Lock() diff --git a/core/orchestrator.go b/core/orchestrator.go index f8dd94a368..cc1f4ecf09 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -140,6 +140,14 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen return fmt.Errorf("invalid expected price sent with payment err=%q", "expected price is nil") } + // During the first payment, set the fixed price per session + if balances, ok := orch.node.Balances.balances[sender]; ok { + if balances.FixedPrice(manifestID) == nil { + balances.SetFixedPrice(manifestID, priceInfoRat) + glog.V(6).Infof("Setting fixed price=%v for session=%v", priceInfoRat, manifestID) + } + } + ticketParams := &pm.TicketParams{ Recipient: ethcommon.BytesToAddress(payment.TicketParams.Recipient), FaceValue: new(big.Int).SetBytes(payment.TicketParams.FaceValue), @@ -252,12 +260,12 @@ func (orch *orchestrator) TicketParams(sender ethcommon.Address, priceInfo *net. }, nil } -func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) { +func (orch *orchestrator) PriceInfo(sender ethcommon.Address, manifestID ManifestID) (*net.PriceInfo, error) { if orch.node == nil || orch.node.Recipient == nil { return nil, nil } - price, err := orch.priceInfo(sender) + price, err := orch.priceInfo(sender, manifestID) if err != nil { return nil, err } @@ -273,9 +281,24 @@ func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, e } // priceInfo returns price per pixel as a fixed point number wrapped in a big.Rat -func (orch *orchestrator) priceInfo(sender ethcommon.Address) (*big.Rat, error) { +func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID ManifestID) (*big.Rat, error) { basePrice := orch.node.GetBasePrice(sender.String()) + glog.V(6).Infof("priceInfo") + + // If there is already a fixed price for the given session, use this price + if manifestID != "" { + glog.V(6).Infof("manifestID = %v", manifestID) + if balances, ok := orch.node.Balances.balances[sender]; ok { + glog.V(6).Infof("sender = %v", sender) + fixedPrice := balances.FixedPrice(manifestID) + if fixedPrice != nil { + glog.V(6).Infof("##### Using fixed price=%v", fixedPrice) + return fixedPrice, nil + } + } + } + if basePrice == nil { basePrice = orch.node.GetBasePrice("default") } diff --git a/server/rpc.go b/server/rpc.go index 91a54df7ad..3d02c06642 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -55,7 +55,7 @@ type Orchestrator interface { TranscoderResults(job int64, res *core.RemoteTranscoderResult) ProcessPayment(ctx context.Context, payment net.Payment, manifestID core.ManifestID) error TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error) - PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) + PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) SufficientBalance(addr ethcommon.Address, manifestID core.ManifestID) bool DebitFees(addr ethcommon.Address, manifestID core.ManifestID, price *net.PriceInfo, pixels int64) Capabilities() *net.Capabilities @@ -324,7 +324,7 @@ func getOrchestrator(orch Orchestrator, req *net.OrchestratorRequest) (*net.Orch } // currently, orchestrator == transcoder - return orchestratorInfo(orch, addr, orch.ServiceURI().String()) + return orchestratorInfo(orch, addr, orch.ServiceURI().String(), "") } func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net.EndTranscodingSessionRequest) (*net.EndTranscodingSessionResponse, error) { @@ -336,18 +336,18 @@ func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net. return &net.EndTranscodingSessionResponse{}, nil } -func getPriceInfo(orch Orchestrator, addr ethcommon.Address) (*net.PriceInfo, error) { +func getPriceInfo(orch Orchestrator, addr ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) { if AuthWebhookURL != nil { webhookRes := getFromDiscoveryAuthWebhookCache(addr.Hex()) if webhookRes != nil && webhookRes.PriceInfo != nil { return webhookRes.PriceInfo, nil } } - return orch.PriceInfo(addr) + return orch.PriceInfo(addr, manifestID) } -func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string) (*net.OrchestratorInfo, error) { - priceInfo, err := getPriceInfo(orch, addr) +func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string, manifestID core.ManifestID) (*net.OrchestratorInfo, error) { + priceInfo, err := getPriceInfo(orch, addr, manifestID) if err != nil { return nil, err } diff --git a/server/segment_rpc.go b/server/segment_rpc.go index d0f0f26838..a1c6c0431b 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -112,7 +112,7 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) { return } - oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String()) + oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String(), core.ManifestID(segData.AuthToken.SessionId)) if err != nil { clog.Errorf(ctx, "Error updating orchestrator info - err=%q", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) From 0f621acdb9e0aee3b257cd74936ebe5cb1933aa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 18 Oct 2023 12:18:08 +0200 Subject: [PATCH 3/3] Fix unit tests + remove excessive logging --- core/accounting_test.go | 17 +++++++++++++++++ core/orch_test.go | 26 +++++++++++++------------- core/orchestrator.go | 5 ----- server/rpc_test.go | 14 +++++++------- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/core/accounting_test.go b/core/accounting_test.go index 0ce7b3c4c1..4d654f0314 100644 --- a/core/accounting_test.go +++ b/core/accounting_test.go @@ -211,6 +211,23 @@ func TestReserve(t *testing.T) { assert.Zero(big.NewRat(0, 1).Cmp(b.Balance(mid))) } +func TestFixedPrice(t *testing.T) { + b := NewBalances(5 * time.Second) + id1 := ManifestID("12345") + id2 := ManifestID("abcdef") + + // No fixed price set yet + assert.Nil(t, b.FixedPrice(id1)) + + // Set fixed price + p := big.NewRat(1, 5) + b.SetFixedPrice(id1, p) + assert.Equal(t, p, b.FixedPrice(id1)) + + // No fixed price for a different manifest ID + assert.Nil(t, b.FixedPrice(id2)) +} + func TestBalancesCleanup(t *testing.T) { b := NewBalances(5 * time.Second) assert := assert.New(t) diff --git a/core/orch_test.go b/core/orch_test.go index 93d826fe8b..981661433d 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -1395,7 +1395,7 @@ func TestPriceInfo(t *testing.T) { recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) orch := NewOrchestrator(n, nil) - priceInfo, err := orch.PriceInfo(ethcommon.Address{}) + priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err := common.PriceToFixed(expPricePerPixel) @@ -1410,7 +1410,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(1010, 100) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1425,7 +1425,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(101, 1000) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1439,7 +1439,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(2525, 1000) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1458,7 +1458,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(11, 1) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1477,7 +1477,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(1100, 10) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1496,7 +1496,7 @@ func TestPriceInfo(t *testing.T) { orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(20, 1) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))) fixedPrice, err = common.PriceToFixed(expPricePerPixel) @@ -1509,7 +1509,7 @@ func TestPriceInfo(t *testing.T) { n.SetBasePrice("default", big.NewRat(0, 1)) orch = NewOrchestrator(n, nil) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Zero(priceInfo.PricePerUnit) assert.Equal(int64(1), priceInfo.PixelsPerUnit) @@ -1527,7 +1527,7 @@ func TestPriceInfo(t *testing.T) { overhead := new(big.Rat).Add(big.NewRat(1, 1), new(big.Rat).Inv(txMultiplier)) expPricePerPixel = new(big.Rat).Mul(basePrice, overhead) // 23953749205332825000/926899968213313 require.Equal(expPricePerPixel.Num().Cmp(big.NewInt(int64(math.MaxInt64))), 1) - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) // for this case price will be rounded when converting to fixed assert.NotEqual(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)), 0) @@ -1543,7 +1543,7 @@ func TestPriceInfo(t *testing.T) { // Now make sure when AutoAdjustPrice = false we are returning the base price n.AutoAdjustPrice = false - priceInfo, err = orch.PriceInfo(ethcommon.Address{}) + priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(err) assert.Equal(basePrice, big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)) } @@ -1553,7 +1553,7 @@ func TestPriceInfo_GivenNilNode_ReturnsNilError(t *testing.T) { orch := NewOrchestrator(n, nil) orch.node = nil - priceInfo, err := orch.PriceInfo(ethcommon.Address{}) + priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(t, err) assert.Nil(t, priceInfo) } @@ -1563,7 +1563,7 @@ func TestPriceInfo_GivenNilRecipient_ReturnsNilError(t *testing.T) { orch := NewOrchestrator(n, nil) n.Recipient = nil - priceInfo, err := orch.PriceInfo(ethcommon.Address{}) + priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(t, err) assert.Nil(t, priceInfo) } @@ -1578,7 +1578,7 @@ func TestPriceInfo_TxMultiplierError_ReturnsError(t *testing.T) { recipient.On("TxCostMultiplier", mock.Anything).Return(nil, expError) orch := NewOrchestrator(n, nil) - priceInfo, err := orch.PriceInfo(ethcommon.Address{}) + priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "") assert.Nil(t, priceInfo) assert.EqualError(t, err, expError.Error()) } diff --git a/core/orchestrator.go b/core/orchestrator.go index cc1f4ecf09..9b1a7c77e6 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -284,16 +284,11 @@ func (orch *orchestrator) PriceInfo(sender ethcommon.Address, manifestID Manifes func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID ManifestID) (*big.Rat, error) { basePrice := orch.node.GetBasePrice(sender.String()) - glog.V(6).Infof("priceInfo") - // If there is already a fixed price for the given session, use this price if manifestID != "" { - glog.V(6).Infof("manifestID = %v", manifestID) if balances, ok := orch.node.Balances.balances[sender]; ok { - glog.V(6).Infof("sender = %v", sender) fixedPrice := balances.FixedPrice(manifestID) if fixedPrice != nil { - glog.V(6).Infof("##### Using fixed price=%v", fixedPrice) return fixedPrice, nil } } diff --git a/server/rpc_test.go b/server/rpc_test.go index 5e48080397..0250c27891 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -137,7 +137,7 @@ func (r *stubOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net return r.ticketParams, nil } -func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) { +func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) { return r.priceInfo, nil } @@ -1050,7 +1050,7 @@ func TestGetPriceInfo_NoWebhook_DefaultPriceError_ReturnsError(t *testing.T) { orch.On("PriceInfo", mock.Anything).Return(nil, expErr) - p, err := getPriceInfo(orch, addr) + p, err := getPriceInfo(orch, addr, "") assert.Nil(p) assert.EqualError(err, expErr.Error()) } @@ -1068,7 +1068,7 @@ func TestGetPriceInfo_NoWebhook_ReturnsDefaultPrice(t *testing.T) { orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil) - p, err := getPriceInfo(orch, addr) + p, err := getPriceInfo(orch, addr, "") assert.Equal(p.PricePerUnit, int64(100)) assert.Equal(p.PixelsPerUnit, int64(30)) assert.Nil(err) @@ -1092,7 +1092,7 @@ func TestGetPriceInfo_Webhook_NoCache_ReturnsDefaultPrice(t *testing.T) { orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil) - p, err := getPriceInfo(orch, addr) + p, err := getPriceInfo(orch, addr, "") assert.Equal(p.PricePerUnit, int64(100)) assert.Equal(p.PixelsPerUnit, int64(30)) assert.Nil(err) @@ -1118,7 +1118,7 @@ func TestGetPriceInfo_Webhook_Cache_WrongType_ReturnsDefaultPrice(t *testing.T) orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil) - p, err := getPriceInfo(orch, addr) + p, err := getPriceInfo(orch, addr, "") assert.Equal(p.PricePerUnit, int64(100)) assert.Equal(p.PixelsPerUnit, int64(30)) assert.Nil(err) @@ -1149,7 +1149,7 @@ func TestGetPriceInfo_Webhook_Cache_ReturnsCachePrice(t *testing.T) { orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil) - p, err := getPriceInfo(orch, addr) + p, err := getPriceInfo(orch, addr, "") assert.Equal(p.PricePerUnit, int64(20)) assert.Equal(p.PixelsPerUnit, int64(19)) assert.Nil(err) @@ -1323,7 +1323,7 @@ func (o *mockOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net return nil, args.Error(1) } -func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) { +func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) { args := o.Called(sender) if args.Get(0) != nil { return args.Get(0).(*net.PriceInfo), args.Error(1)