From 7d2844b1e94e0b4afc256eaed5064b308215432e Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Wed, 13 Nov 2024 22:38:58 -0800 Subject: [PATCH 1/6] First stab at unifying offers processor code at top level --- ingest/offers/main.go | 147 ++++++++++++++++++ ingest/utils/utils.go | 17 ++ .../ingest/processors/offers_processor.go | 62 ++++---- 3 files changed, 198 insertions(+), 28 deletions(-) create mode 100644 ingest/offers/main.go create mode 100644 ingest/utils/utils.go diff --git a/ingest/offers/main.go b/ingest/offers/main.go new file mode 100644 index 0000000000..aa87094434 --- /dev/null +++ b/ingest/offers/main.go @@ -0,0 +1,147 @@ +package offers + +import ( + "github.com/guregu/null" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/utils" + "github.com/stellar/go/xdr" +) + +// Constants for event types +const ( + EventTypeOfferCreated = "OfferCreated" + EventTypeOfferFill = "OfferUpdated" + EventTypeOfferClosed = "OfferClosed" +) + +// Base struct with common fields for all offer events +type OfferEventData struct { + SellerId string + OfferID int64 + + BuyingAsset xdr.Asset + SellingAsset xdr.Asset + + RemainingAmount int64 + PriceN int32 + PriceD int32 + + IsPassive bool + LastModifiedLedger uint32 + Sponsor null.String +} + +type OfferEvent interface { + OfferEventType() string + GetOfferData() OfferEventData +} + +type OfferCreatedEvent struct { + OfferEventData +} + +// Method to get common event data +func (e OfferEventData) GetOfferData() OfferEventData { + return e +} + +func (e OfferCreatedEvent) OfferEventType() string { return EventTypeOfferCreated } + +type OfferFillEvent struct { + OfferEventData + FillAmount int64 + MatchingOrders []OfferFill +} + +func (e OfferFillEvent) OfferEventType() string { return EventTypeOfferFill } + +type OfferClosedEvent struct { + OfferEventData + CloseReason string +} + +func (e OfferClosedEvent) OfferEventType() string { return EventTypeOfferClosed } + +type OfferFillInfo struct { + AssetSold xdr.Asset + AmountSold int64 + AssetBought xdr.Asset + AmountBought int64 +} + +type OfferFill interface { + OfferClaimantType() string +} + +type LiquidityPoolClaimant struct { + PoolId xdr.PoolId + *OfferFillInfo +} + +type OrderBookClaimant struct { + SellerId xdr.AccountId + OfferId int64 + *OfferFillInfo +} + +const ( + OrderBookClaimantType = "OrderBookClaimant" + LiquidityPoolClaimantType = "LiquidityPoolClaimant" +) + +func (o *LiquidityPoolClaimant) OfferClaimantType() string { + return LiquidityPoolClaimantType +} + +func (o *OrderBookClaimant) OfferClaimantType() string { + return OrderBookClaimantType +} + +func populateOfferData(e *xdr.LedgerEntry) OfferEventData { + offer := e.Data.MustOffer() + return OfferEventData{ + SellerId: offer.SellerId.Address(), + OfferID: int64(offer.OfferId), + + BuyingAsset: offer.Buying, + SellingAsset: offer.Selling, + RemainingAmount: int64(offer.Amount), + PriceN: int32(offer.Price.N), + PriceD: int32(offer.Price.D), + IsPassive: (offer.Flags & 0x1) != 0, + LastModifiedLedger: uint32(e.LastModifiedLedgerSeq), + Sponsor: utils.LedgerEntrySponsorToNullString(*e), + } +} + +func ProcessOffer(change ingest.Change) OfferEvent { + if change.Type != xdr.LedgerEntryTypeOffer { + return nil + } + var o OfferEventData + var event OfferEvent + + switch { + case change.Pre == nil && change.Post != nil: + // New offer + o = populateOfferData(change.Post) + event = OfferCreatedEvent{OfferEventData: o} + + case change.Pre != nil && change.Post != nil: + // Order Fill + o = populateOfferData(change.Post) + fillAmt := int64(change.Pre.Data.MustOffer().Amount - change.Post.Data.MustOffer().Amount) + o.RemainingAmount = fillAmt + event = OfferFillEvent{OfferEventData: o, FillAmount: fillAmt} + //TODO: populate MatchingOrders field in OfferFillEvent + + // Offer Fill + case change.Pre != nil && change.Post == nil: + // Offer Removed + o = populateOfferData(change.Pre) + event = OfferClosedEvent{OfferEventData: o} + //TODO: populate CloseReason field in OfferClosedEvent + } + return event + +} diff --git a/ingest/utils/utils.go b/ingest/utils/utils.go new file mode 100644 index 0000000000..82d218c9fe --- /dev/null +++ b/ingest/utils/utils.go @@ -0,0 +1,17 @@ +package utils + +import ( + "github.com/guregu/null" + "github.com/stellar/go/xdr" +) + +func LedgerEntrySponsorToNullString(entry xdr.LedgerEntry) null.String { + sponsoringID := entry.SponsoringID() + + var sponsor null.String + if sponsoringID != nil { + sponsor.SetValid((*sponsoringID).Address()) + } + + return sponsor +} diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index bf1e2648c0..be505ec415 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -2,11 +2,11 @@ package processors import ( "context" - + "fmt" "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/offers" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" ) // The offers processor can be configured to trim the offers table @@ -37,31 +37,30 @@ func (p *OffersProcessor) reset() { } func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { - if change.Type != xdr.LedgerEntryTypeOffer { + event := offers.ProcessOffer(change) + if event == nil { return nil } - switch { - case change.Pre == nil && change.Post != nil: - // Created - err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post)) + switch ev := event.(type) { + case offers.OfferCreatedEvent: + row := p.eventToRow(ev.OfferEventData) + err := p.insertBatchBuilder.Add(row) if err != nil { return errors.New("Error adding to OffersBatchInsertBuilder") } - case change.Pre != nil && change.Post != nil: - // Updated - row := p.ledgerEntryToRow(change.Post) + case offers.OfferFillEvent: + row := p.eventToRow(ev.OfferEventData) p.batchUpdateOffers = append(p.batchUpdateOffers, row) - case change.Pre != nil && change.Post == nil: - // Removed - row := p.ledgerEntryToRow(change.Pre) + case offers.OfferClosedEvent: + row := p.eventToRow(ev.OfferEventData) row.Deleted = true row.LastModifiedLedger = p.sequence p.batchUpdateOffers = append(p.batchUpdateOffers, row) default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") - } + return errors.New("Unknown offer event") + } if p.insertBatchBuilder.Len()+len(p.batchUpdateOffers) > maxBatchSize { if err := p.flushCache(ctx); err != nil { return errors.Wrap(err, "error in Commit") @@ -69,22 +68,29 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang } return nil + } -func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer { - offer := entry.Data.MustOffer() +func (p *OffersProcessor) eventToRow(event offers.OfferEventData) history.Offer { + flags := int32(0) + if event.IsPassive { + flags = 1 + } + fmt.Println("************") + fmt.Printf("%v", event) + fmt.Println("************") + return history.Offer{ - SellerID: offer.SellerId.Address(), - OfferID: int64(offer.OfferId), - SellingAsset: offer.Selling, - BuyingAsset: offer.Buying, - Amount: int64(offer.Amount), - Pricen: int32(offer.Price.N), - Priced: int32(offer.Price.D), - Price: float64(offer.Price.N) / float64(offer.Price.D), - Flags: int32(offer.Flags), - LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq), - Sponsor: ledgerEntrySponsorToNullString(*entry), + SellerID: event.SellerId, + OfferID: event.OfferID, + SellingAsset: event.SellingAsset, + BuyingAsset: event.BuyingAsset, + Pricen: event.PriceN, + Priced: event.PriceD, + Price: float64(event.PriceN / event.PriceD), + Flags: flags, + LastModifiedLedger: event.LastModifiedLedger, + Sponsor: event.Sponsor, } } From 62c5e4d2f1e0499380d533e97a67a78e43b822c4 Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Wed, 13 Nov 2024 22:42:21 -0800 Subject: [PATCH 2/6] Fix price calculation --- .../horizon/internal/ingest/processors/offers_processor.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index be505ec415..3795b4c327 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -2,7 +2,6 @@ package processors import ( "context" - "fmt" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/offers" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -76,9 +75,6 @@ func (p *OffersProcessor) eventToRow(event offers.OfferEventData) history.Offer if event.IsPassive { flags = 1 } - fmt.Println("************") - fmt.Printf("%v", event) - fmt.Println("************") return history.Offer{ SellerID: event.SellerId, @@ -87,7 +83,7 @@ func (p *OffersProcessor) eventToRow(event offers.OfferEventData) history.Offer BuyingAsset: event.BuyingAsset, Pricen: event.PriceN, Priced: event.PriceD, - Price: float64(event.PriceN / event.PriceD), + Price: float64(event.PriceN) / float64(event.PriceD), Flags: flags, LastModifiedLedger: event.LastModifiedLedger, Sponsor: event.Sponsor, From ce975a1856d5ffe44f227711fe8e9f9e3af79b40 Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Thu, 14 Nov 2024 00:29:38 -0800 Subject: [PATCH 3/6] make pointers --- ingest/offers/main.go | 4 ++-- .../horizon/internal/ingest/processors/offers_processor.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ingest/offers/main.go b/ingest/offers/main.go index aa87094434..ba96df3a36 100644 --- a/ingest/offers/main.go +++ b/ingest/offers/main.go @@ -114,7 +114,7 @@ func populateOfferData(e *xdr.LedgerEntry) OfferEventData { } } -func ProcessOffer(change ingest.Change) OfferEvent { +func ProcessOffer(change ingest.Change) *OfferEvent { if change.Type != xdr.LedgerEntryTypeOffer { return nil } @@ -142,6 +142,6 @@ func ProcessOffer(change ingest.Change) OfferEvent { event = OfferClosedEvent{OfferEventData: o} //TODO: populate CloseReason field in OfferClosedEvent } - return event + return &event } diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 3795b4c327..858a8c11fa 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -41,7 +41,7 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang return nil } - switch ev := event.(type) { + switch ev := (*event).(type) { case offers.OfferCreatedEvent: row := p.eventToRow(ev.OfferEventData) err := p.insertBatchBuilder.Add(row) From 410b29e9f90918913ff471fe4269b255a420c145 Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Thu, 14 Nov 2024 01:07:34 -0800 Subject: [PATCH 4/6] Fix the amount --- ingest/offers/main.go | 9 ++++----- .../internal/ingest/processors/offers_processor.go | 11 ++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ingest/offers/main.go b/ingest/offers/main.go index ba96df3a36..0eb4ab25ca 100644 --- a/ingest/offers/main.go +++ b/ingest/offers/main.go @@ -14,7 +14,7 @@ const ( EventTypeOfferClosed = "OfferClosed" ) -// Base struct with common fields for all offer events +// Base struct with common fields for all offer events. type OfferEventData struct { SellerId string OfferID int64 @@ -22,7 +22,7 @@ type OfferEventData struct { BuyingAsset xdr.Asset SellingAsset xdr.Asset - RemainingAmount int64 + RemainingAmount int64 // Remaining amount that still needs to be filled for this offer PriceN int32 PriceD int32 @@ -114,7 +114,7 @@ func populateOfferData(e *xdr.LedgerEntry) OfferEventData { } } -func ProcessOffer(change ingest.Change) *OfferEvent { +func ProcessOffer(change ingest.Change) OfferEvent { if change.Type != xdr.LedgerEntryTypeOffer { return nil } @@ -131,7 +131,6 @@ func ProcessOffer(change ingest.Change) *OfferEvent { // Order Fill o = populateOfferData(change.Post) fillAmt := int64(change.Pre.Data.MustOffer().Amount - change.Post.Data.MustOffer().Amount) - o.RemainingAmount = fillAmt event = OfferFillEvent{OfferEventData: o, FillAmount: fillAmt} //TODO: populate MatchingOrders field in OfferFillEvent @@ -142,6 +141,6 @@ func ProcessOffer(change ingest.Change) *OfferEvent { event = OfferClosedEvent{OfferEventData: o} //TODO: populate CloseReason field in OfferClosedEvent } - return &event + return event } diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 858a8c11fa..2034f3be3a 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -41,18 +41,18 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang return nil } - switch ev := (*event).(type) { + switch ev := event.(type) { case offers.OfferCreatedEvent: - row := p.eventToRow(ev.OfferEventData) + row := p.offerEventToRow(ev.OfferEventData) err := p.insertBatchBuilder.Add(row) if err != nil { return errors.New("Error adding to OffersBatchInsertBuilder") } case offers.OfferFillEvent: - row := p.eventToRow(ev.OfferEventData) + row := p.offerEventToRow(ev.OfferEventData) p.batchUpdateOffers = append(p.batchUpdateOffers, row) case offers.OfferClosedEvent: - row := p.eventToRow(ev.OfferEventData) + row := p.offerEventToRow(ev.OfferEventData) row.Deleted = true row.LastModifiedLedger = p.sequence p.batchUpdateOffers = append(p.batchUpdateOffers, row) @@ -70,7 +70,7 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang } -func (p *OffersProcessor) eventToRow(event offers.OfferEventData) history.Offer { +func (p *OffersProcessor) offerEventToRow(event offers.OfferEventData) history.Offer { flags := int32(0) if event.IsPassive { flags = 1 @@ -81,6 +81,7 @@ func (p *OffersProcessor) eventToRow(event offers.OfferEventData) history.Offer OfferID: event.OfferID, SellingAsset: event.SellingAsset, BuyingAsset: event.BuyingAsset, + Amount: event.RemainingAmount, Pricen: event.PriceN, Priced: event.PriceD, Price: float64(event.PriceN) / float64(event.PriceD), From 9a54e6dab6b72907931bceeafb99a5a98395c9a4 Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Thu, 14 Nov 2024 21:34:04 -0800 Subject: [PATCH 5/6] Fix flags for offerEvents --- ingest/offers/main.go | 21 +++++++------- .../ingest/processors/offers_processor.go | 29 ++++++++----------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/ingest/offers/main.go b/ingest/offers/main.go index 0eb4ab25ca..e47de6f003 100644 --- a/ingest/offers/main.go +++ b/ingest/offers/main.go @@ -16,16 +16,14 @@ const ( // Base struct with common fields for all offer events. type OfferEventData struct { - SellerId string - OfferID int64 - - BuyingAsset xdr.Asset - SellingAsset xdr.Asset - - RemainingAmount int64 // Remaining amount that still needs to be filled for this offer - PriceN int32 - PriceD int32 - + SellerId string + OfferID int64 + BuyingAsset xdr.Asset + SellingAsset xdr.Asset + RemainingAmount int64 // Remaining amount that still needs to be filled for this offer + PriceN int32 + PriceD int32 + Flags int32 IsPassive bool LastModifiedLedger uint32 Sponsor null.String @@ -108,7 +106,8 @@ func populateOfferData(e *xdr.LedgerEntry) OfferEventData { RemainingAmount: int64(offer.Amount), PriceN: int32(offer.Price.N), PriceD: int32(offer.Price.D), - IsPassive: (offer.Flags & 0x1) != 0, + Flags: int32(offer.Flags), + IsPassive: int32(offer.Flags) == int32(xdr.OfferEntryFlagsPassiveFlag), LastModifiedLedger: uint32(e.LastModifiedLedgerSeq), Sponsor: utils.LedgerEntrySponsorToNullString(*e), } diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 2034f3be3a..30344473bd 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -70,24 +70,19 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang } -func (p *OffersProcessor) offerEventToRow(event offers.OfferEventData) history.Offer { - flags := int32(0) - if event.IsPassive { - flags = 1 - } - +func (p *OffersProcessor) offerEventToRow(e offers.OfferEventData) history.Offer { return history.Offer{ - SellerID: event.SellerId, - OfferID: event.OfferID, - SellingAsset: event.SellingAsset, - BuyingAsset: event.BuyingAsset, - Amount: event.RemainingAmount, - Pricen: event.PriceN, - Priced: event.PriceD, - Price: float64(event.PriceN) / float64(event.PriceD), - Flags: flags, - LastModifiedLedger: event.LastModifiedLedger, - Sponsor: event.Sponsor, + SellerID: e.SellerId, + OfferID: e.OfferID, + SellingAsset: e.SellingAsset, + BuyingAsset: e.BuyingAsset, + Amount: e.RemainingAmount, + Pricen: e.PriceN, + Priced: e.PriceD, + Price: float64(e.PriceN) / float64(e.PriceD), + Flags: e.Flags, + LastModifiedLedger: e.LastModifiedLedger, + Sponsor: e.Sponsor, } } From 2864e8da9b52025f6c620ba636d5306a5bfaed0d Mon Sep 17 00:00:00 2001 From: Karthik Iyer Date: Thu, 14 Nov 2024 21:59:25 -0800 Subject: [PATCH 6/6] Simplify the Offer data model as a part of cut-1 --- ingest/offers/main.go | 41 ++--------------------------------------- 1 file changed, 2 insertions(+), 39 deletions(-) diff --git a/ingest/offers/main.go b/ingest/offers/main.go index e47de6f003..fe3cbaf254 100644 --- a/ingest/offers/main.go +++ b/ingest/offers/main.go @@ -31,7 +31,6 @@ type OfferEventData struct { type OfferEvent interface { OfferEventType() string - GetOfferData() OfferEventData } type OfferCreatedEvent struct { @@ -47,8 +46,7 @@ func (e OfferCreatedEvent) OfferEventType() string { return EventTypeOfferCreate type OfferFillEvent struct { OfferEventData - FillAmount int64 - MatchingOrders []OfferFill + FillAmount int64 // How much amount of the order was filled from last time } func (e OfferFillEvent) OfferEventType() string { return EventTypeOfferFill } @@ -60,41 +58,6 @@ type OfferClosedEvent struct { func (e OfferClosedEvent) OfferEventType() string { return EventTypeOfferClosed } -type OfferFillInfo struct { - AssetSold xdr.Asset - AmountSold int64 - AssetBought xdr.Asset - AmountBought int64 -} - -type OfferFill interface { - OfferClaimantType() string -} - -type LiquidityPoolClaimant struct { - PoolId xdr.PoolId - *OfferFillInfo -} - -type OrderBookClaimant struct { - SellerId xdr.AccountId - OfferId int64 - *OfferFillInfo -} - -const ( - OrderBookClaimantType = "OrderBookClaimant" - LiquidityPoolClaimantType = "LiquidityPoolClaimant" -) - -func (o *LiquidityPoolClaimant) OfferClaimantType() string { - return LiquidityPoolClaimantType -} - -func (o *OrderBookClaimant) OfferClaimantType() string { - return OrderBookClaimantType -} - func populateOfferData(e *xdr.LedgerEntry) OfferEventData { offer := e.Data.MustOffer() return OfferEventData{ @@ -140,6 +103,6 @@ func ProcessOffer(change ingest.Change) OfferEvent { event = OfferClosedEvent{OfferEventData: o} //TODO: populate CloseReason field in OfferClosedEvent } - return event + return event }