diff --git a/.vscode/launch.json b/.vscode/launch.json index e9eb78d..11f003d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,7 +13,7 @@ "program": "${workspaceFolder}/cmd/psweb/", "showLog": false, //"envFile": "${workspaceFolder}/.env", - "args": ["-datadir", "/home/vlad/.peerswap_t4"] + //"args": ["-datadir", "/home/vlad/.peerswap_t4"] //"args": ["-datadir", "/home/vlad/.peerswap2"] }, // sudo bash -c 'echo 0 > /proc/sys/kernel/yama/ptrace_scope' diff --git a/cmd/psweb/handlers.go b/cmd/psweb/handlers.go index c3eebca..a6f1b57 100644 --- a/cmd/psweb/handlers.go +++ b/cmd/psweb/handlers.go @@ -1171,7 +1171,7 @@ func afHandler(w http.ResponseWriter, r *http.Request) { } daysNoFlow := 999 - ts, ok := ln.LastForwardTS.SafeRead(ch.ChannelId) + ts, ok := ln.LastForwardTS.Read(ch.ChannelId) if ok { daysNoFlow = int(currentTime.Sub(time.Unix(ts, 0)).Hours() / 24) } diff --git a/cmd/psweb/ln/cln.go b/cmd/psweb/ln/cln.go index e3c8e9b..630b6f8 100644 --- a/cmd/psweb/ln/cln.go +++ b/cmd/psweb/ln/cln.go @@ -34,22 +34,22 @@ const ( var ( // arrays mapped per channel - forwardsIn = safemap.NewSafeMap[uint64, []Forwarding]() - forwardsOut = safemap.NewSafeMap[uint64, []Forwarding]() + forwardsIn = safemap.New[uint64, []Forwarding]() + forwardsOut = safemap.New[uint64, []Forwarding]() forwardsLastIndex uint64 downloadComplete bool // track timestamp of the last failed forward - failedForwardTS = safemap.NewSafeMap[uint64, int64]() + failedForwardTS = safemap.New[uint64, int64]() lightning *glightning.Lightning // cln database calls take too long, cache them - htlcsCache = safemap.NewSafeMap[string, []HTLC]() // by shortChannelId - invoicesCache = safemap.NewSafeMap[string, ListInvoicesResponse]() // by PaymentHash - sendpaysCache = safemap.NewSafeMap[string, ListSendPaysResponse]() // by PaymentHash + htlcsCache = safemap.New[string, []HTLC]() // by shortChannelId + invoicesCache = safemap.New[string, ListInvoicesResponse]() // by PaymentHash + sendpaysCache = safemap.New[string, ListSendPaysResponse]() // by PaymentHash // sqlite3 channel Id mapped to short_channel_id - sqlShortChannelId = safemap.NewSafeMap[int, string]() + sqlShortChannelId = safemap.New[int, string]() htlcStates = []string{"SENT_ADD_HTLC", "SENT_ADD_COMMIT", "RCVD_ADD_REVOCATION", "RCVD_ADD_ACK_COMMIT", "SENT_ADD_ACK_REVOCATION", "RCVD_REMOVE_HTLC", "RCVD_REMOVE_COMMIT", "SENT_REMOVE_REVOCATION", "SENT_REMOVE_ACK_COMMIT", "RCVD_REMOVE_ACK_REVOCATION", "RCVD_ADD_HTLC", "RCVD_ADD_COMMIT", "SENT_ADD_REVOCATION", "SENT_ADD_ACK_COMMIT", "RCVD_ADD_ACK_REVOCATION", "SENT_REMOVE_HTLC", "SENT_REMOVE_COMMIT", "RCVD_REMOVE_REVOCATION", "RCVD_REMOVE_ACK_COMMIT", "SENT_REMOVE_ACK_REVOCATION"} // map full channel id to short channel id - fullToShortChannelId = safemap.NewSafeMap[string, string]() + fullToShortChannelId = safemap.New[string, string]() ) type Forwarding struct { @@ -478,11 +478,11 @@ func CacheHTLCs(where string) int { channelMap := channel.(map[string]interface{}) if channelMap["channel_id"] != nil { if channelMap["short_channel_id"] != nil { - fullToShortChannelId.SafeWrite(channelMap["channel_id"].(string), channelMap["short_channel_id"].(string)) + fullToShortChannelId.Write(channelMap["channel_id"].(string), channelMap["short_channel_id"].(string)) } else { // private zero-conf channel alias := channelMap["alias"].(map[string]interface{}) - fullToShortChannelId.SafeWrite(channelMap["channel_id"].(string), alias["local"].(string)) + fullToShortChannelId.Write(channelMap["channel_id"].(string), alias["local"].(string)) } } } @@ -519,12 +519,12 @@ func CacheHTLCs(where string) int { } // map sql channel id to short channel id - shortChannelId, found := fullToShortChannelId.SafeRead(hex.EncodeToString(data)) + shortChannelId, found := fullToShortChannelId.Read(hex.EncodeToString(data)) if !found { // full channel ID not found, ignore channel continue } - sqlShortChannelId.SafeWrite(id, shortChannelId) + sqlShortChannelId.Write(id, shortChannelId) } // Check for errors from iterating over rows @@ -563,7 +563,7 @@ func CacheHTLCs(where string) int { return 0 } - scid, ok := sqlShortChannelId.SafeRead(cid) + scid, ok := sqlShortChannelId.Read(cid) if ok { htlc.ShortChannelId = scid htlc.State = htlcStates[hstate] @@ -622,28 +622,28 @@ func cacheForwards(client *glightning.Lightning) int { if f.Status == "settled" && f.OutMsat >= IGNORE_FORWARDS_MSAT { chIn := ConvertClnToLndChannelId(f.InChannel) - fi, ok := forwardsIn.SafeRead(chIn) + fi, ok := forwardsIn.Read(chIn) if !ok { fi = []Forwarding{} // Initialize an empty slice if the key does not exist } - fi = append(fi, f) // Append the new forwarding record - forwardsIn.SafeWrite(chIn, fi) // Write the updated slice + fi = append(fi, f) // Append the new forwarding record + forwardsIn.Write(chIn, fi) // Write the updated slice - fo, ok := forwardsOut.SafeRead(chOut) + fo, ok := forwardsOut.Read(chOut) if !ok { fo = []Forwarding{} // Initialize an empty slice if the key does not exist } - fo = append(fo, f) // Append the new forwarding record - forwardsOut.SafeWrite(chOut, fo) // Write the updated slice + fo = append(fo, f) // Append the new forwarding record + forwardsOut.Write(chOut, fo) // Write the updated slice // save for autofees - LastForwardTS.SafeWrite(chOut, int64(f.ResolvedTime)) + LastForwardTS.Write(chOut, int64(f.ResolvedTime)) // forget last failed attempt - failedForwardTS.SafeWrite(chOut, 0) + failedForwardTS.Write(chOut, 0) } else { // catch not enough balance error if f.FailCode == 4103 { - failedForwardTS.SafeWrite(chOut, int64(f.ReceivedTime)) + failedForwardTS.Write(chOut, int64(f.ReceivedTime)) } } } @@ -680,7 +680,7 @@ func GetForwardingStats(lndChannelId uint64) *ForwardingStats { timestamp30d := float64(now.AddDate(0, 0, -30).Unix()) timestamp6m := float64(now.AddDate(0, -6, 0).Unix()) - fo, ok := forwardsOut.SafeRead(lndChannelId) + fo, ok := forwardsOut.Read(lndChannelId) if ok { for _, e := range fo { if e.ResolvedTime > timestamp6m && e.OutMsat >= IGNORE_FORWARDS_MSAT { @@ -698,7 +698,7 @@ func GetForwardingStats(lndChannelId uint64) *ForwardingStats { } } - fi, ok := forwardsIn.SafeRead(lndChannelId) + fi, ok := forwardsIn.Read(lndChannelId) if ok { for _, e := range fi { if e.ResolvedTime > timestamp6m && e.OutMsat >= IGNORE_FORWARDS_MSAT { @@ -834,7 +834,7 @@ func GetChannelStats(lndChannelId uint64, timeStamp uint64) *ChannelStats { timeStampF := float64(timeStamp) - fo, ok := forwardsOut.SafeRead(lndChannelId) + fo, ok := forwardsOut.Read(lndChannelId) if ok { for _, e := range fo { if e.ResolvedTime > timeStampF && e.OutMsat >= IGNORE_FORWARDS_MSAT { @@ -844,7 +844,7 @@ func GetChannelStats(lndChannelId uint64, timeStamp uint64) *ChannelStats { } } - fi, ok := forwardsIn.SafeRead(lndChannelId) + fi, ok := forwardsIn.Read(lndChannelId) if ok { for _, e := range fi { if e.ResolvedTime > timeStampF && e.OutMsat >= IGNORE_FORWARDS_MSAT { @@ -1141,23 +1141,23 @@ func appendHTLC(htlc HTLC) { if htlc.State != "SENT_REMOVE_ACK_REVOCATION" && htlc.State != "RCVD_REMOVE_ACK_REVOCATION" { return } - htlcs, ok := htlcsCache.SafeRead(htlc.ShortChannelId) + htlcs, ok := htlcsCache.Read(htlc.ShortChannelId) if !ok { htlcs = []HTLC{} // Initialize an empty slice if the key does not exist } - htlcs = append(htlcs, htlc) // Append the new HTLC - htlcsCache.SafeWrite(htlc.ShortChannelId, htlcs) // Write the updated slice + htlcs = append(htlcs, htlc) // Append the new HTLC + htlcsCache.Write(htlc.ShortChannelId, htlcs) // Write the updated slice } func GetInvoice(client *glightning.Lightning, request *ListInvoicesRequest) (ListInvoicesResponse, error) { - inv, ok := invoicesCache.SafeRead(request.PaymentHash) + inv, ok := invoicesCache.Read(request.PaymentHash) if !ok { // fetch from cln err := client.Request(request, &inv) if err != nil { return inv, err } // cache it - invoicesCache.SafeWrite(request.PaymentHash, inv) + invoicesCache.Write(request.PaymentHash, inv) } return inv, nil } @@ -1174,7 +1174,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI rebalanceCostMsat uint64 ) - htlcs, ok := htlcsCache.SafeRead(channelId) + htlcs, ok := htlcsCache.Read(channelId) if ok { for _, htlc := range htlcs { @@ -1201,7 +1201,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI } else { // no invoices // can be a rebalance in, check timestamp and record the stats - pmt, ok := sendpaysCache.SafeRead(htlc.PaymentHash) + pmt, ok := sendpaysCache.Read(htlc.PaymentHash) if !ok { // fetch from cln err := client.Request(&ListSendPaysRequest{ PaymentHash: htlc.PaymentHash, @@ -1210,7 +1210,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI continue } // cache it - sendpaysCache.SafeWrite(htlc.PaymentHash, pmt) + sendpaysCache.Write(htlc.PaymentHash, pmt) } for _, p := range pmt.Payments { @@ -1223,7 +1223,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI case "RCVD_REMOVE_ACK_REVOCATION": // direction out, look for payments - pmt, ok := sendpaysCache.SafeRead(htlc.PaymentHash) + pmt, ok := sendpaysCache.Read(htlc.PaymentHash) if !ok { // fetch from cln err := client.Request(&ListSendPaysRequest{ PaymentHash: htlc.PaymentHash, @@ -1232,7 +1232,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI continue } // cache it - sendpaysCache.SafeWrite(htlc.PaymentHash, pmt) + sendpaysCache.Write(htlc.PaymentHash, pmt) } for _, p := range pmt.Payments { @@ -1449,10 +1449,10 @@ func ApplyAutoFees() { liqPct := int(channelMap["to_us_msat"].(float64) * 100 / channelMap["total_msat"].(float64)) // check 10 minutes back to be sure - ts, ok := LastForwardTS.SafeRead(channelId) + ts, ok := LastForwardTS.Read(channelId) if ok && ts > time.Now().Add(-time.Duration(10*time.Minute)).Unix() { // forget failed HTLC to prevent duplicate action - failedForwardTS.SafeWrite(channelId, 0) + failedForwardTS.Write(channelId, 0) if liqPct <= params.LowLiqPct { // bump fee @@ -1489,7 +1489,7 @@ func ApplyAutoFees() { func PlotPPM(lndChannelId uint64) *[]DataPoint { var plot []DataPoint - fo, ok := forwardsOut.SafeRead(lndChannelId) + fo, ok := forwardsOut.Read(lndChannelId) if ok { for _, e := range fo { // ignore small forwards @@ -1531,7 +1531,7 @@ func ForwardsLog(channelId uint64, fromTS int64) *[]DataPoint { }) if channelId > 0 { - fi, ok := forwardsIn.SafeRead(channelId) + fi, ok := forwardsIn.Read(channelId) if ok { for _, e := range fi { // ignore small forwards diff --git a/cmd/psweb/ln/common.go b/cmd/psweb/ln/common.go index 431a6aa..2f5fe23 100644 --- a/cmd/psweb/ln/common.go +++ b/cmd/psweb/ln/common.go @@ -53,7 +53,7 @@ var ( } // track timestamp of the last outbound forward per channel - LastForwardTS = safemap.NewSafeMap[uint64, int64]() + LastForwardTS = safemap.New[uint64, int64]() // received via custom messages, per peer nodeId LiquidBalances = make(map[string]*BalanceInfo) @@ -439,7 +439,7 @@ func calculateAutoFee(channelId uint64, params *AutoFeeParams, liqPct int, oldFe // must be definitely above threshold and cool-off period passed if liqPct > params.LowLiqPct && lastUpdate < time.Now().Add(-time.Duration(params.CoolOffHours)*time.Hour).Unix() { // check the inactivity period - if ts, ok := LastForwardTS.SafeRead(channelId); ok && ts < time.Now().AddDate(0, 0, -params.InactivityDays).Unix() { + if ts, ok := LastForwardTS.Read(channelId); ok && ts < time.Now().AddDate(0, 0, -params.InactivityDays).Unix() { // decrease the fee newFee -= params.InactivityDropPPM newFee = newFee * (100 - params.InactivityDropPct) / 100 diff --git a/cmd/psweb/ln/lnd.go b/cmd/psweb/ln/lnd.go index e7bc93a..0442458 100644 --- a/cmd/psweb/ln/lnd.go +++ b/cmd/psweb/ln/lnd.go @@ -23,6 +23,7 @@ import ( "peerswap-web/cmd/psweb/bitcoin" "peerswap-web/cmd/psweb/config" "peerswap-web/cmd/psweb/db" + "peerswap-web/cmd/psweb/safemap" "github.com/elementsproject/peerswap/peerswaprpc" @@ -58,18 +59,18 @@ var ( LndVerson = float64(0) // must be 0.18+ for RBF ability // arrays mapped per channel - forwardsIn = make(map[uint64][]*lnrpc.ForwardingEvent) - forwardsOut = make(map[uint64][]*lnrpc.ForwardingEvent) - paymentHtlcs = make(map[uint64][]*lnrpc.HTLCAttempt) - rebalanceInHtlcs = make(map[uint64][]*lnrpc.HTLCAttempt) - rebalanceOutHtlcs = make(map[uint64][]*lnrpc.HTLCAttempt) - invoiceHtlcs = make(map[uint64][]*lnrpc.InvoiceHTLC) + forwardsIn = safemap.New[uint64, []*lnrpc.ForwardingEvent]() + forwardsOut = safemap.New[uint64, []*lnrpc.ForwardingEvent]() + paymentHtlcs = safemap.New[uint64, []*lnrpc.HTLCAttempt]() + rebalanceInHtlcs = safemap.New[uint64, []*lnrpc.HTLCAttempt]() + rebalanceOutHtlcs = safemap.New[uint64, []*lnrpc.HTLCAttempt]() + invoiceHtlcs = safemap.New[uint64, []*lnrpc.InvoiceHTLC]() // inflight HTLCs mapped per Incoming channel - inflightHTLCs = make(map[uint64][]*InflightHTLC) + inflightHTLCs = safemap.New[uint64, []*InflightHTLC]() // cache peer addresses for reconnects - peerAddresses = make(map[string][]*lnrpc.NodeAddress) + peerAddresses = safemap.New[string, []*lnrpc.NodeAddress]() // las index for invoice subscriptions lastInvoiceSettleIndex uint64 @@ -860,9 +861,21 @@ func downloadForwards(client lnrpc.LightningClient) bool { // sort by in and out channels for _, event := range res.ForwardingEvents { if event.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - forwardsIn[event.ChanIdIn] = append(forwardsIn[event.ChanIdIn], event) - forwardsOut[event.ChanIdOut] = append(forwardsOut[event.ChanIdOut], event) - LastForwardTS.SafeWrite(event.ChanIdOut, int64(event.TimestampNs/1_000_000_000)) + fi, ok := forwardsIn.Read(event.ChanIdIn) + if !ok { + fi = []*lnrpc.ForwardingEvent{} // Initialize an empty slice if the key does not exist + } + fi = append(fi, event) // Append the new forwarding record + forwardsIn.Write(event.ChanIdIn, fi) // Write the updated slice + + fo, ok := forwardsOut.Read(event.ChanIdOut) + if !ok { + fo = []*lnrpc.ForwardingEvent{} // Initialize an empty slice if the key does not exist + } + fo = append(fo, event) // Append the new forwarding record + forwardsOut.Write(event.ChanIdOut, fo) // Write the updated slice + + LastForwardTS.Write(event.ChanIdOut, int64(event.TimestampNs/1_000_000_000)) } } @@ -967,10 +980,26 @@ func appendPayment(payment *lnrpc.Payment) { lastHop := htlc.Route.Hops[len(htlc.Route.Hops)-1] if lastHop.PubKey == MyNodeId { // this is a circular rebalancing - rebalanceOutHtlcs[chanId] = append(rebalanceOutHtlcs[chanId], htlc) - rebalanceInHtlcs[lastHop.ChanId] = append(rebalanceInHtlcs[lastHop.ChanId], htlc) + htlcs, ok := rebalanceOutHtlcs.Read(chanId) + if !ok { + htlcs = []*lnrpc.HTLCAttempt{} // Initialize an empty slice if the key does not exist + } + htlcs = append(htlcs, htlc) // Append the new forwarding record + rebalanceOutHtlcs.Write(chanId, htlcs) // Write the updated slice + + htlcs, ok = rebalanceInHtlcs.Read(lastHop.ChanId) + if !ok { + htlcs = []*lnrpc.HTLCAttempt{} // Initialize an empty slice if the key does not exist + } + htlcs = append(htlcs, htlc) // Append the new forwarding record + rebalanceInHtlcs.Write(lastHop.ChanId, htlcs) // Write the updated slice } else { - paymentHtlcs[chanId] = append(paymentHtlcs[chanId], htlc) + htlcs, ok := paymentHtlcs.Read(chanId) + if !ok { + htlcs = []*lnrpc.HTLCAttempt{} // Initialize an empty slice if the key does not exist + } + htlcs = append(htlcs, htlc) // Append the new forwarding record + paymentHtlcs.Write(chanId, htlcs) // Write the updated slice } } } @@ -1041,7 +1070,12 @@ func subscribeForwards(ctx context.Context, client routerrpc.RouterClient) error htlc.IncomingHtlcId = htlcEvent.IncomingHtlcId htlc.OutgoingHtlcId = htlcEvent.OutgoingHtlcId - inflightHTLCs[htlcEvent.IncomingChannelId] = append(inflightHTLCs[htlcEvent.IncomingChannelId], htlc) + htlcs, ok := inflightHTLCs.Read(htlcEvent.IncomingChannelId) + if !ok { + htlcs = []*InflightHTLC{} // Initialize an empty slice if the key does not exist + } + htlcs = append(htlcs, htlc) // Append the new forwarding record + inflightHTLCs.Write(htlcEvent.IncomingChannelId, htlcs) // Write the updated slice } } case *routerrpc.HtlcEvent_ForwardFailEvent: @@ -1066,32 +1100,47 @@ func subscribeForwards(ctx context.Context, client routerrpc.RouterClient) error case *routerrpc.HtlcEvent_SettleEvent: // find HTLC in queue - for _, htlc := range inflightHTLCs[htlcEvent.IncomingChannelId] { - if htlc.IncomingHtlcId == htlcEvent.IncomingHtlcId { - // store the last timestamp - lastForwardCreationTs = htlc.forwardingEvent.TimestampNs / 1_000_000_000 - // delete from queue - removeInflightHTLC(htlcEvent.IncomingChannelId, htlcEvent.IncomingHtlcId) - - // ignore dust - if htlc.forwardingEvent.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - // add our stored forwards - forwardsIn[htlcEvent.IncomingChannelId] = append(forwardsIn[htlcEvent.IncomingChannelId], htlc.forwardingEvent) - // settled htlcEvent has no Outgoing info, take from queue - forwardsOut[htlc.OutgoingChannelId] = append(forwardsOut[htlc.OutgoingChannelId], htlc.forwardingEvent) - // TS for autofee - LastForwardTS.SafeWrite(htlc.OutgoingChannelId, int64(htlc.forwardingEvent.TimestampNs/1_000_000_000)) - - // execute autofee - client, cleanup, err := GetClient() - if err != nil { - return err + htlcs, ok := inflightHTLCs.Read(htlcEvent.IncomingChannelId) + if ok { + for _, htlc := range htlcs { + if htlc.IncomingHtlcId == htlcEvent.IncomingHtlcId { + // store the last timestamp + lastForwardCreationTs = htlc.forwardingEvent.TimestampNs / 1_000_000_000 + // delete from queue + removeInflightHTLC(htlcEvent.IncomingChannelId, htlcEvent.IncomingHtlcId) + + // ignore dust + if htlc.forwardingEvent.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + // add our stored forwards + fi, ok := forwardsIn.Read(htlcEvent.IncomingChannelId) + if !ok { + fi = []*lnrpc.ForwardingEvent{} // Initialize an empty slice if the key does not exist + } + fi = append(fi, htlc.forwardingEvent) // Append the new forwarding record + forwardsIn.Write(htlcEvent.IncomingChannelId, fi) // Write the updated slice + + // settled htlcEvent has no Outgoing info, take from queue + fo, ok := forwardsOut.Read(htlc.OutgoingChannelId) + if !ok { + fo = []*lnrpc.ForwardingEvent{} // Initialize an empty slice if the key does not exist + } + fo = append(fo, htlc.forwardingEvent) // Append the new forwarding record + forwardsOut.Write(htlc.OutgoingChannelId, fo) // Write the updated slice + + // TS for autofee + LastForwardTS.Write(htlc.OutgoingChannelId, int64(htlc.forwardingEvent.TimestampNs/1_000_000_000)) + + // execute autofee + client, cleanup, err := GetClient() + if err != nil { + return err + } + defer cleanup() + + // calculate with new balance + applyAutoFee(client, htlc.forwardingEvent.ChanIdOut, false) + break } - defer cleanup() - - // calculate with new balance - applyAutoFee(client, htlc.forwardingEvent.ChanIdOut, false) - break } } } @@ -1102,7 +1151,7 @@ func subscribeForwards(ctx context.Context, client routerrpc.RouterClient) error // Function to remove an InflightHTLC object from a slice in the map by IncomingChannelId func removeInflightHTLC(incomingChannelId, incomingHtlcId uint64) { // Retrieve the slice from the map - htlcSlice, exists := inflightHTLCs[incomingChannelId] + htlcSlice, exists := inflightHTLCs.Read(incomingChannelId) if !exists { return } @@ -1118,12 +1167,12 @@ func removeInflightHTLC(incomingChannelId, incomingHtlcId uint64) { // If the object is found, remove it from the slice if index != -1 { - inflightHTLCs[incomingChannelId] = append(htlcSlice[:index], htlcSlice[index+1:]...) + inflightHTLCs.Write(incomingChannelId, append(htlcSlice[:index], htlcSlice[index+1:]...)) } // If the slice becomes empty after removal, delete the map entry - if len(inflightHTLCs[incomingChannelId]) == 0 { - delete(inflightHTLCs, incomingChannelId) + if htlcs, ok := inflightHTLCs.Read(incomingChannelId); ok && len(htlcs) == 0 { + inflightHTLCs.Delete(incomingChannelId) } } @@ -1273,7 +1322,12 @@ func appendInvoice(invoice *lnrpc.Invoice) { } for _, htlc := range invoice.Htlcs { if htlc.State == lnrpc.InvoiceHTLCState_SETTLED { - invoiceHtlcs[htlc.ChanId] = append(invoiceHtlcs[htlc.ChanId], htlc) + inv, ok := invoiceHtlcs.Read(htlc.ChanId) + if !ok { + inv = []*lnrpc.InvoiceHTLC{} // Initialize an empty slice if the key does not exist + } + inv = append(inv, htlc) // Append the new forwarding record + invoiceHtlcs.Write(htlc.ChanId, inv) // Write the updated slice } } } @@ -1319,11 +1373,11 @@ func subscribeMessages(ctx context.Context, client lnrpc.LightningClient) error OnMyCustomMessage(nodeId, data.Data) - if peerAddresses[nodeId] == nil { + if _, ok := peerAddresses.Read(nodeId); !ok { // cache peer addresses for reconnects info, err := client.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{PubKey: nodeId, IncludeChannels: false}) if err == nil { - peerAddresses[nodeId] = info.Node.Addresses + peerAddresses.Write(nodeId, info.Node.Addresses) } } } @@ -1394,31 +1448,37 @@ func GetForwardingStats(channelId uint64) *ForwardingStats { timestamp30d := uint64(now.AddDate(0, 0, -30).Unix()) * 1_000_000_000 timestamp6m := uint64(now.AddDate(0, -6, 0).Unix()) * 1_000_000_000 - for _, e := range forwardsOut[channelId] { - if e.TimestampNs > timestamp6m && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - result.AmountOut6m += e.AmtOut - feeMsat6m += e.FeeMsat - if e.TimestampNs > timestamp30d { - result.AmountOut30d += e.AmtOut - feeMsat30d += e.FeeMsat - if e.TimestampNs > timestamp7d { - result.AmountOut7d += e.AmtOut - feeMsat7d += e.FeeMsat + fo, ok := forwardsOut.Read(channelId) + if ok { + for _, e := range fo { + if e.TimestampNs > timestamp6m && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + result.AmountOut6m += e.AmtOut + feeMsat6m += e.FeeMsat + if e.TimestampNs > timestamp30d { + result.AmountOut30d += e.AmtOut + feeMsat30d += e.FeeMsat + if e.TimestampNs > timestamp7d { + result.AmountOut7d += e.AmtOut + feeMsat7d += e.FeeMsat + } } } } } - for _, e := range forwardsIn[channelId] { - if e.TimestampNs > timestamp6m && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - result.AmountIn6m += e.AmtIn - assistedMsat6m += e.FeeMsat - if e.TimestampNs > timestamp30d { - result.AmountIn30d += e.AmtIn - assistedMsat30d += e.FeeMsat - if e.TimestampNs > timestamp7d { - result.AmountIn7d += e.AmtIn - assistedMsat7d += e.FeeMsat + fi, ok := forwardsIn.Read(channelId) + if ok { + for _, e := range fi { + if e.TimestampNs > timestamp6m && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + result.AmountIn6m += e.AmtIn + assistedMsat6m += e.FeeMsat + if e.TimestampNs > timestamp30d { + result.AmountIn30d += e.AmtIn + assistedMsat30d += e.FeeMsat + if e.TimestampNs > timestamp7d { + result.AmountIn7d += e.AmtIn + assistedMsat7d += e.FeeMsat + } } } } @@ -1515,58 +1575,80 @@ func GetChannelStats(channelId uint64, timeStamp uint64) *ChannelStats { timestampNs := timeStamp * 1_000_000_000 - for _, e := range forwardsOut[channelId] { - if e.TimestampNs > timestampNs && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - routedOutMsat += e.AmtOutMsat - feeMsat += e.FeeMsat + fo, ok := forwardsOut.Read(channelId) + if ok { + for _, e := range fo { + if e.TimestampNs > timestampNs && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + routedOutMsat += e.AmtOutMsat + feeMsat += e.FeeMsat + } } } - for _, e := range forwardsIn[channelId] { - if e.TimestampNs > timestampNs && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - routedInMsat += e.AmtInMsat - assistedMsat += e.FeeMsat + fi, ok := forwardsIn.Read(channelId) + if ok { + for _, e := range fi { + if e.TimestampNs > timestampNs && e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + routedInMsat += e.AmtInMsat + assistedMsat += e.FeeMsat + } } } - for i := 0; i < len(invoiceHtlcs[channelId]); i++ { - e := invoiceHtlcs[channelId][i] - if uint64(e.AcceptTime) > timeStamp { - // check if it is related to a circular rebalancing - found := false - for _, r := range rebalanceInHtlcs[channelId] { - if e.AmtMsat == uint64(r.Route.TotalAmtMsat-r.Route.TotalFeesMsat) { - found = true - break + inv, ok := invoiceHtlcs.Read(channelId) + if ok { + for i := 0; i < len(inv); i++ { + e := inv[i] + if uint64(e.AcceptTime) > timeStamp { + // check if it is related to a circular rebalancing + htcls, ok := rebalanceInHtlcs.Read(channelId) + if ok { + found := false + for _, r := range htcls { + if e.AmtMsat == uint64(r.Route.TotalAmtMsat-r.Route.TotalFeesMsat) { + found = true + break + } + } + if found { + // remove invoice to avoid double counting + inv = append(inv[:i], inv[i+1:]...) + invoiceHtlcs.Write(channelId, inv) + i-- + } else { + invoicedMsat += e.AmtMsat + } } } - if found { - // remove invoice to avoid double counting - invoiceHtlcs[channelId] = append(invoiceHtlcs[channelId][:i], invoiceHtlcs[channelId][i+1:]...) - i-- - } else { - invoicedMsat += e.AmtMsat - } } } - for _, e := range paymentHtlcs[channelId] { - if uint64(e.AttemptTimeNs) > timestampNs { - paidOutMsat += e.Route.TotalAmtMsat - costMsat += e.Route.TotalFeesMsat + htlcs, ok := paymentHtlcs.Read(channelId) + if ok { + for _, e := range htlcs { + if uint64(e.AttemptTimeNs) > timestampNs { + paidOutMsat += e.Route.TotalAmtMsat + costMsat += e.Route.TotalFeesMsat + } } } - for _, e := range rebalanceInHtlcs[channelId] { - if uint64(e.AttemptTimeNs) > timestampNs { - rebalanceInMsat += e.Route.TotalAmtMsat - e.Route.TotalFeesMsat - rebalanceCostMsat += e.Route.TotalFeesMsat + htcls, ok := rebalanceInHtlcs.Read(channelId) + if ok { + for _, e := range htcls { + if uint64(e.AttemptTimeNs) > timestampNs { + rebalanceInMsat += e.Route.TotalAmtMsat - e.Route.TotalFeesMsat + rebalanceCostMsat += e.Route.TotalFeesMsat + } } } - for _, e := range rebalanceOutHtlcs[channelId] { - if uint64(e.AttemptTimeNs) > timestampNs { - rebalanceOutMsat += e.Route.TotalAmtMsat + htcls, ok = rebalanceOutHtlcs.Read(channelId) + if ok { + for _, e := range htcls { + if uint64(e.AttemptTimeNs) > timestampNs { + rebalanceOutMsat += e.Route.TotalAmtMsat + } } } @@ -2147,15 +2229,18 @@ func ApplyAutoFees() { func PlotPPM(channelId uint64) *[]DataPoint { var plot []DataPoint - for _, e := range forwardsOut[channelId] { - // ignore small forwards - if e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { - plot = append(plot, DataPoint{ - TS: e.TimestampNs / 1_000_000_000, - Amount: e.AmtOut, - Fee: float64(e.FeeMsat) / 1000, - PPM: e.FeeMsat * 1_000_000 / e.AmtOutMsat, - }) + fo, ok := forwardsOut.Read(channelId) + if ok { + for _, e := range fo { + // ignore small forwards + if e.AmtOutMsat >= IGNORE_FORWARDS_MSAT { + plot = append(plot, DataPoint{ + TS: e.TimestampNs / 1_000_000_000, + Amount: e.AmtOut, + Fee: float64(e.FeeMsat) / 1000, + PPM: e.FeeMsat * 1_000_000 / e.AmtOutMsat, + }) + } } } @@ -2167,11 +2252,12 @@ func ForwardsLog(channelId uint64, fromTS int64) *[]DataPoint { var log []DataPoint fromTS_Ns := uint64(fromTS * 1_000_000_000) - for chId := range forwardsOut { + // Process forwards from forwardsOut + forwardsOut.Iterate(func(chId uint64, fo []*lnrpc.ForwardingEvent) { if channelId > 0 && channelId != chId { - continue + return } - for _, e := range forwardsOut[chId] { + for _, e := range fo { // ignore small forwards if e.AmtOutMsat >= IGNORE_FORWARDS_MSAT && e.TimestampNs >= fromTS_Ns { log = append(log, DataPoint{ @@ -2184,20 +2270,23 @@ func ForwardsLog(channelId uint64, fromTS int64) *[]DataPoint { }) } } - } + }) if channelId > 0 { - for _, e := range forwardsIn[channelId] { - // ignore small forwards - if e.AmtOutMsat >= IGNORE_FORWARDS_MSAT && e.TimestampNs >= fromTS_Ns { - log = append(log, DataPoint{ - TS: e.TimestampNs / 1_000_000_000, - Amount: e.AmtOut, - Fee: float64(e.FeeMsat) / 1000, - PPM: e.FeeMsat * 1_000_000 / e.AmtOutMsat, - ChanIdIn: e.ChanIdIn, - ChanIdOut: e.ChanIdOut, - }) + fi, ok := forwardsIn.Read(channelId) + if ok { + for _, e := range fi { + // ignore small forwards + if e.AmtOutMsat >= IGNORE_FORWARDS_MSAT && e.TimestampNs >= fromTS_Ns { + log = append(log, DataPoint{ + TS: e.TimestampNs / 1_000_000_000, + Amount: e.AmtOut, + Fee: float64(e.FeeMsat) / 1000, + PPM: e.FeeMsat * 1_000_000 / e.AmtOutMsat, + ChanIdIn: e.ChanIdIn, + ChanIdOut: e.ChanIdOut, + }) + } } } } @@ -2212,9 +2301,9 @@ func ForwardsLog(channelId uint64, fromTS int64) *[]DataPoint { func reconnectPeer(client lnrpc.LightningClient, nodeId string) bool { ctx := context.Background() - addresses := peerAddresses[nodeId] + addresses, ok := peerAddresses.Read(nodeId) - if addresses == nil { + if !ok { info, err := client.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{PubKey: nodeId, IncludeChannels: false}) if err == nil { addresses = info.Node.Addresses diff --git a/cmd/psweb/main-cln.go b/cmd/psweb/main-cln.go index 9bed588..557d034 100644 --- a/cmd/psweb/main-cln.go +++ b/cmd/psweb/main-cln.go @@ -46,7 +46,7 @@ func main() { var ( showHelp = flag.Bool("help", false, "Show help") showVersion = flag.Bool("version", false, "Show version") - developer = flag.Bool("developer", false, "Flag passed by clightningd, ignored") + developer = flag.Bool("developer", false, "Flag passed by clightningd") ) flag.Parse() diff --git a/cmd/psweb/main.go b/cmd/psweb/main.go index cbb8966..6a0dae7 100644 --- a/cmd/psweb/main.go +++ b/cmd/psweb/main.go @@ -25,6 +25,7 @@ import ( "peerswap-web/cmd/psweb/liquid" "peerswap-web/cmd/psweb/ln" "peerswap-web/cmd/psweb/ps" + "peerswap-web/cmd/psweb/safemap" "github.com/elementsproject/peerswap/peerswaprpc" "github.com/gorilla/mux" @@ -49,7 +50,7 @@ type SwapParams struct { } var ( - aliasCache = make(map[string]string) + aliasCache = safemap.New[string, string]() templates = template.New("") //go:embed static/* staticFiles embed.FS @@ -486,11 +487,12 @@ func convertPeersToHTMLTable( } // find last swap timestamps per channel - swapTimestamps := make(map[uint64]int64) + swapTimestamps := safemap.New[uint64, int64]() for _, swap := range swaps { - if simplifySwapState(swap.State) == "success" && swapTimestamps[swap.LndChanId] < swap.CreatedAt { - swapTimestamps[swap.LndChanId] = swap.CreatedAt + ts, ok := swapTimestamps.Read(swap.LndChanId) + if simplifySwapState(swap.State) == "success" && ok && ts < swap.CreatedAt { + swapTimestamps.Write(swap.LndChanId, swap.CreatedAt) } } @@ -544,8 +546,8 @@ func convertPeersToHTMLTable( // timestamp of the last swap or 6 months horizon lastSwapTimestamp := time.Now().AddDate(0, -6, 0).Unix() - if swapTimestamps[channel.ChannelId] > lastSwapTimestamp { - lastSwapTimestamp = swapTimestamps[channel.ChannelId] + if ts, ok := swapTimestamps.Read(channel.ChannelId); ok && ts > lastSwapTimestamp { + lastSwapTimestamp = ts tooltip = "Since the last swap " + timePassedAgo(time.Unix(lastSwapTimestamp, 0).UTC()) sinceLastSwap = "since the last swap" } @@ -1265,7 +1267,7 @@ func checkPegin() { func getNodeAlias(key string) string { // search in cache - alias, exists := aliasCache[key] + alias, exists := aliasCache.Read(key) if exists { return alias } @@ -1288,7 +1290,7 @@ func getNodeAlias(key string) string { } // save to cache if alias was found - aliasCache[key] = alias + aliasCache.Write(key, alias) return alias } diff --git a/cmd/psweb/safemap/safemap.go b/cmd/psweb/safemap/safemap.go index 84b3f17..3cc6fc0 100644 --- a/cmd/psweb/safemap/safemap.go +++ b/cmd/psweb/safemap/safemap.go @@ -8,15 +8,15 @@ type SafeMap[K comparable, V any] struct { m map[K]V } -// NewSafeMap creates and initializes a new SafeMap -func NewSafeMap[K comparable, V any]() *SafeMap[K, V] { +// New creates and initializes a new SafeMap +func New[K comparable, V any]() *SafeMap[K, V] { return &SafeMap[K, V]{ m: make(map[K]V), } } // SafeWrite safely writes a key-value pair to the map -func (sm *SafeMap[K, V]) SafeWrite(key K, value V) { +func (sm *SafeMap[K, V]) Write(key K, value V) { sm.mu.Lock() defer sm.mu.Unlock() sm.m[key] = value @@ -24,7 +24,7 @@ func (sm *SafeMap[K, V]) SafeWrite(key K, value V) { // SafeRead safely reads a value for a given key from the map // Returns the value and a boolean indicating if the key exists -func (sm *SafeMap[K, V]) SafeRead(key K) (V, bool) { +func (sm *SafeMap[K, V]) Read(key K) (V, bool) { sm.mu.Lock() defer sm.mu.Unlock() value, ok := sm.m[key] @@ -32,7 +32,7 @@ func (sm *SafeMap[K, V]) SafeRead(key K) (V, bool) { } // SafeDelete safely deletes a key from the map -func (sm *SafeMap[K, V]) SafeDelete(key K) { +func (sm *SafeMap[K, V]) Delete(key K) { sm.mu.Lock() defer sm.mu.Unlock() delete(sm.m, key)