diff --git a/ingest/offers/main.go b/ingest/offers/main.go new file mode 100644 index 0000000000..fe3cbaf254 --- /dev/null +++ b/ingest/offers/main.go @@ -0,0 +1,108 @@ +package offers + +import ( + "github.com/guregu/null" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/utils" + "github.com/stellar/go/xdr" +) + +// Constants for event types +const ( + EventTypeOfferCreated = "OfferCreated" + EventTypeOfferFill = "OfferUpdated" + EventTypeOfferClosed = "OfferClosed" +) + +// Base struct with common fields for all offer events. +type OfferEventData struct { + SellerId string + OfferID int64 + BuyingAsset xdr.Asset + SellingAsset xdr.Asset + RemainingAmount int64 // Remaining amount that still needs to be filled for this offer + PriceN int32 + PriceD int32 + Flags int32 + IsPassive bool + LastModifiedLedger uint32 + Sponsor null.String +} + +type OfferEvent interface { + OfferEventType() string +} + +type OfferCreatedEvent struct { + OfferEventData +} + +// Method to get common event data +func (e OfferEventData) GetOfferData() OfferEventData { + return e +} + +func (e OfferCreatedEvent) OfferEventType() string { return EventTypeOfferCreated } + +type OfferFillEvent struct { + OfferEventData + FillAmount int64 // How much amount of the order was filled from last time +} + +func (e OfferFillEvent) OfferEventType() string { return EventTypeOfferFill } + +type OfferClosedEvent struct { + OfferEventData + CloseReason string +} + +func (e OfferClosedEvent) OfferEventType() string { return EventTypeOfferClosed } + +func populateOfferData(e *xdr.LedgerEntry) OfferEventData { + offer := e.Data.MustOffer() + return OfferEventData{ + SellerId: offer.SellerId.Address(), + OfferID: int64(offer.OfferId), + + BuyingAsset: offer.Buying, + SellingAsset: offer.Selling, + RemainingAmount: int64(offer.Amount), + PriceN: int32(offer.Price.N), + PriceD: int32(offer.Price.D), + Flags: int32(offer.Flags), + IsPassive: int32(offer.Flags) == int32(xdr.OfferEntryFlagsPassiveFlag), + LastModifiedLedger: uint32(e.LastModifiedLedgerSeq), + Sponsor: utils.LedgerEntrySponsorToNullString(*e), + } +} + +func ProcessOffer(change ingest.Change) OfferEvent { + if change.Type != xdr.LedgerEntryTypeOffer { + return nil + } + var o OfferEventData + var event OfferEvent + + switch { + case change.Pre == nil && change.Post != nil: + // New offer + o = populateOfferData(change.Post) + event = OfferCreatedEvent{OfferEventData: o} + + case change.Pre != nil && change.Post != nil: + // Order Fill + o = populateOfferData(change.Post) + fillAmt := int64(change.Pre.Data.MustOffer().Amount - change.Post.Data.MustOffer().Amount) + event = OfferFillEvent{OfferEventData: o, FillAmount: fillAmt} + //TODO: populate MatchingOrders field in OfferFillEvent + + // Offer Fill + case change.Pre != nil && change.Post == nil: + // Offer Removed + o = populateOfferData(change.Pre) + event = OfferClosedEvent{OfferEventData: o} + //TODO: populate CloseReason field in OfferClosedEvent + } + + return event +} diff --git a/ingest/utils/utils.go b/ingest/utils/utils.go new file mode 100644 index 0000000000..82d218c9fe --- /dev/null +++ b/ingest/utils/utils.go @@ -0,0 +1,17 @@ +package utils + +import ( + "github.com/guregu/null" + "github.com/stellar/go/xdr" +) + +func LedgerEntrySponsorToNullString(entry xdr.LedgerEntry) null.String { + sponsoringID := entry.SponsoringID() + + var sponsor null.String + if sponsoringID != nil { + sponsor.SetValid((*sponsoringID).Address()) + } + + return sponsor +} diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index bf1e2648c0..30344473bd 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -2,11 +2,10 @@ package processors import ( "context" - "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/offers" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" ) // The offers processor can be configured to trim the offers table @@ -37,31 +36,30 @@ func (p *OffersProcessor) reset() { } func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { - if change.Type != xdr.LedgerEntryTypeOffer { + event := offers.ProcessOffer(change) + if event == nil { return nil } - switch { - case change.Pre == nil && change.Post != nil: - // Created - err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post)) + switch ev := event.(type) { + case offers.OfferCreatedEvent: + row := p.offerEventToRow(ev.OfferEventData) + err := p.insertBatchBuilder.Add(row) if err != nil { return errors.New("Error adding to OffersBatchInsertBuilder") } - case change.Pre != nil && change.Post != nil: - // Updated - row := p.ledgerEntryToRow(change.Post) + case offers.OfferFillEvent: + row := p.offerEventToRow(ev.OfferEventData) p.batchUpdateOffers = append(p.batchUpdateOffers, row) - case change.Pre != nil && change.Post == nil: - // Removed - row := p.ledgerEntryToRow(change.Pre) + case offers.OfferClosedEvent: + row := p.offerEventToRow(ev.OfferEventData) row.Deleted = true row.LastModifiedLedger = p.sequence p.batchUpdateOffers = append(p.batchUpdateOffers, row) default: - return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") - } + return errors.New("Unknown offer event") + } if p.insertBatchBuilder.Len()+len(p.batchUpdateOffers) > maxBatchSize { if err := p.flushCache(ctx); err != nil { return errors.Wrap(err, "error in Commit") @@ -69,22 +67,22 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang } return nil + } -func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer { - offer := entry.Data.MustOffer() +func (p *OffersProcessor) offerEventToRow(e offers.OfferEventData) history.Offer { return history.Offer{ - SellerID: offer.SellerId.Address(), - OfferID: int64(offer.OfferId), - SellingAsset: offer.Selling, - BuyingAsset: offer.Buying, - Amount: int64(offer.Amount), - Pricen: int32(offer.Price.N), - Priced: int32(offer.Price.D), - Price: float64(offer.Price.N) / float64(offer.Price.D), - Flags: int32(offer.Flags), - LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq), - Sponsor: ledgerEntrySponsorToNullString(*entry), + SellerID: e.SellerId, + OfferID: e.OfferID, + SellingAsset: e.SellingAsset, + BuyingAsset: e.BuyingAsset, + Amount: e.RemainingAmount, + Pricen: e.PriceN, + Priced: e.PriceD, + Price: float64(e.PriceN) / float64(e.PriceD), + Flags: e.Flags, + LastModifiedLedger: e.LastModifiedLedger, + Sponsor: e.Sponsor, } }