Skip to content

Commit

Permalink
LND safe map
Browse files Browse the repository at this point in the history
  • Loading branch information
Impa10r committed Dec 23, 2024
1 parent fd20619 commit f969af3
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 186 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"program": "${workspaceFolder}/cmd/psweb/",
"showLog": false,
//"envFile": "${workspaceFolder}/.env",
"args": ["-datadir", "/home/vlad/.peerswap_t4"]
//"args": ["-datadir", "/home/vlad/.peerswap_t4"]
//"args": ["-datadir", "/home/vlad/.peerswap2"]
},
// sudo bash -c 'echo 0 > /proc/sys/kernel/yama/ptrace_scope'
Expand Down
2 changes: 1 addition & 1 deletion cmd/psweb/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ func afHandler(w http.ResponseWriter, r *http.Request) {
}

daysNoFlow := 999
ts, ok := ln.LastForwardTS.SafeRead(ch.ChannelId)
ts, ok := ln.LastForwardTS.Read(ch.ChannelId)
if ok {
daysNoFlow = int(currentTime.Sub(time.Unix(ts, 0)).Hours() / 24)
}
Expand Down
80 changes: 40 additions & 40 deletions cmd/psweb/ln/cln.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ const (

var (
// arrays mapped per channel
forwardsIn = safemap.NewSafeMap[uint64, []Forwarding]()
forwardsOut = safemap.NewSafeMap[uint64, []Forwarding]()
forwardsIn = safemap.New[uint64, []Forwarding]()
forwardsOut = safemap.New[uint64, []Forwarding]()
forwardsLastIndex uint64
downloadComplete bool
// track timestamp of the last failed forward
failedForwardTS = safemap.NewSafeMap[uint64, int64]()
failedForwardTS = safemap.New[uint64, int64]()
lightning *glightning.Lightning
// cln database calls take too long, cache them
htlcsCache = safemap.NewSafeMap[string, []HTLC]() // by shortChannelId
invoicesCache = safemap.NewSafeMap[string, ListInvoicesResponse]() // by PaymentHash
sendpaysCache = safemap.NewSafeMap[string, ListSendPaysResponse]() // by PaymentHash
htlcsCache = safemap.New[string, []HTLC]() // by shortChannelId
invoicesCache = safemap.New[string, ListInvoicesResponse]() // by PaymentHash
sendpaysCache = safemap.New[string, ListSendPaysResponse]() // by PaymentHash
// sqlite3 channel Id mapped to short_channel_id
sqlShortChannelId = safemap.NewSafeMap[int, string]()
sqlShortChannelId = safemap.New[int, string]()
htlcStates = []string{"SENT_ADD_HTLC", "SENT_ADD_COMMIT", "RCVD_ADD_REVOCATION", "RCVD_ADD_ACK_COMMIT", "SENT_ADD_ACK_REVOCATION", "RCVD_REMOVE_HTLC", "RCVD_REMOVE_COMMIT", "SENT_REMOVE_REVOCATION", "SENT_REMOVE_ACK_COMMIT", "RCVD_REMOVE_ACK_REVOCATION", "RCVD_ADD_HTLC", "RCVD_ADD_COMMIT", "SENT_ADD_REVOCATION", "SENT_ADD_ACK_COMMIT", "RCVD_ADD_ACK_REVOCATION", "SENT_REMOVE_HTLC", "SENT_REMOVE_COMMIT", "RCVD_REMOVE_REVOCATION", "RCVD_REMOVE_ACK_COMMIT", "SENT_REMOVE_ACK_REVOCATION"}
// map full channel id to short channel id
fullToShortChannelId = safemap.NewSafeMap[string, string]()
fullToShortChannelId = safemap.New[string, string]()
)

type Forwarding struct {
Expand Down Expand Up @@ -478,11 +478,11 @@ func CacheHTLCs(where string) int {
channelMap := channel.(map[string]interface{})
if channelMap["channel_id"] != nil {
if channelMap["short_channel_id"] != nil {
fullToShortChannelId.SafeWrite(channelMap["channel_id"].(string), channelMap["short_channel_id"].(string))
fullToShortChannelId.Write(channelMap["channel_id"].(string), channelMap["short_channel_id"].(string))
} else {
// private zero-conf channel
alias := channelMap["alias"].(map[string]interface{})
fullToShortChannelId.SafeWrite(channelMap["channel_id"].(string), alias["local"].(string))
fullToShortChannelId.Write(channelMap["channel_id"].(string), alias["local"].(string))
}
}
}
Expand Down Expand Up @@ -519,12 +519,12 @@ func CacheHTLCs(where string) int {
}

// map sql channel id to short channel id
shortChannelId, found := fullToShortChannelId.SafeRead(hex.EncodeToString(data))
shortChannelId, found := fullToShortChannelId.Read(hex.EncodeToString(data))
if !found {
// full channel ID not found, ignore channel
continue
}
sqlShortChannelId.SafeWrite(id, shortChannelId)
sqlShortChannelId.Write(id, shortChannelId)
}

// Check for errors from iterating over rows
Expand Down Expand Up @@ -563,7 +563,7 @@ func CacheHTLCs(where string) int {
return 0
}

scid, ok := sqlShortChannelId.SafeRead(cid)
scid, ok := sqlShortChannelId.Read(cid)
if ok {
htlc.ShortChannelId = scid
htlc.State = htlcStates[hstate]
Expand Down Expand Up @@ -622,28 +622,28 @@ func cacheForwards(client *glightning.Lightning) int {
if f.Status == "settled" && f.OutMsat >= IGNORE_FORWARDS_MSAT {
chIn := ConvertClnToLndChannelId(f.InChannel)

fi, ok := forwardsIn.SafeRead(chIn)
fi, ok := forwardsIn.Read(chIn)
if !ok {
fi = []Forwarding{} // Initialize an empty slice if the key does not exist
}
fi = append(fi, f) // Append the new forwarding record
forwardsIn.SafeWrite(chIn, fi) // Write the updated slice
fi = append(fi, f) // Append the new forwarding record
forwardsIn.Write(chIn, fi) // Write the updated slice

fo, ok := forwardsOut.SafeRead(chOut)
fo, ok := forwardsOut.Read(chOut)
if !ok {
fo = []Forwarding{} // Initialize an empty slice if the key does not exist
}
fo = append(fo, f) // Append the new forwarding record
forwardsOut.SafeWrite(chOut, fo) // Write the updated slice
fo = append(fo, f) // Append the new forwarding record
forwardsOut.Write(chOut, fo) // Write the updated slice

// save for autofees
LastForwardTS.SafeWrite(chOut, int64(f.ResolvedTime))
LastForwardTS.Write(chOut, int64(f.ResolvedTime))
// forget last failed attempt
failedForwardTS.SafeWrite(chOut, 0)
failedForwardTS.Write(chOut, 0)
} else {
// catch not enough balance error
if f.FailCode == 4103 {
failedForwardTS.SafeWrite(chOut, int64(f.ReceivedTime))
failedForwardTS.Write(chOut, int64(f.ReceivedTime))
}
}
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func GetForwardingStats(lndChannelId uint64) *ForwardingStats {
timestamp30d := float64(now.AddDate(0, 0, -30).Unix())
timestamp6m := float64(now.AddDate(0, -6, 0).Unix())

fo, ok := forwardsOut.SafeRead(lndChannelId)
fo, ok := forwardsOut.Read(lndChannelId)
if ok {
for _, e := range fo {
if e.ResolvedTime > timestamp6m && e.OutMsat >= IGNORE_FORWARDS_MSAT {
Expand All @@ -698,7 +698,7 @@ func GetForwardingStats(lndChannelId uint64) *ForwardingStats {
}
}

fi, ok := forwardsIn.SafeRead(lndChannelId)
fi, ok := forwardsIn.Read(lndChannelId)
if ok {
for _, e := range fi {
if e.ResolvedTime > timestamp6m && e.OutMsat >= IGNORE_FORWARDS_MSAT {
Expand Down Expand Up @@ -834,7 +834,7 @@ func GetChannelStats(lndChannelId uint64, timeStamp uint64) *ChannelStats {

timeStampF := float64(timeStamp)

fo, ok := forwardsOut.SafeRead(lndChannelId)
fo, ok := forwardsOut.Read(lndChannelId)
if ok {
for _, e := range fo {
if e.ResolvedTime > timeStampF && e.OutMsat >= IGNORE_FORWARDS_MSAT {
Expand All @@ -844,7 +844,7 @@ func GetChannelStats(lndChannelId uint64, timeStamp uint64) *ChannelStats {
}
}

fi, ok := forwardsIn.SafeRead(lndChannelId)
fi, ok := forwardsIn.Read(lndChannelId)
if ok {
for _, e := range fi {
if e.ResolvedTime > timeStampF && e.OutMsat >= IGNORE_FORWARDS_MSAT {
Expand Down Expand Up @@ -1141,23 +1141,23 @@ func appendHTLC(htlc HTLC) {
if htlc.State != "SENT_REMOVE_ACK_REVOCATION" && htlc.State != "RCVD_REMOVE_ACK_REVOCATION" {
return
}
htlcs, ok := htlcsCache.SafeRead(htlc.ShortChannelId)
htlcs, ok := htlcsCache.Read(htlc.ShortChannelId)
if !ok {
htlcs = []HTLC{} // Initialize an empty slice if the key does not exist
}
htlcs = append(htlcs, htlc) // Append the new HTLC
htlcsCache.SafeWrite(htlc.ShortChannelId, htlcs) // Write the updated slice
htlcs = append(htlcs, htlc) // Append the new HTLC
htlcsCache.Write(htlc.ShortChannelId, htlcs) // Write the updated slice
}

func GetInvoice(client *glightning.Lightning, request *ListInvoicesRequest) (ListInvoicesResponse, error) {
inv, ok := invoicesCache.SafeRead(request.PaymentHash)
inv, ok := invoicesCache.Read(request.PaymentHash)
if !ok { // fetch from cln
err := client.Request(request, &inv)
if err != nil {
return inv, err
}
// cache it
invoicesCache.SafeWrite(request.PaymentHash, inv)
invoicesCache.Write(request.PaymentHash, inv)
}
return inv, nil
}
Expand All @@ -1174,7 +1174,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI
rebalanceCostMsat uint64
)

htlcs, ok := htlcsCache.SafeRead(channelId)
htlcs, ok := htlcsCache.Read(channelId)

if ok {
for _, htlc := range htlcs {
Expand All @@ -1201,7 +1201,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI
} else {
// no invoices
// can be a rebalance in, check timestamp and record the stats
pmt, ok := sendpaysCache.SafeRead(htlc.PaymentHash)
pmt, ok := sendpaysCache.Read(htlc.PaymentHash)
if !ok { // fetch from cln
err := client.Request(&ListSendPaysRequest{
PaymentHash: htlc.PaymentHash,
Expand All @@ -1210,7 +1210,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI
continue
}
// cache it
sendpaysCache.SafeWrite(htlc.PaymentHash, pmt)
sendpaysCache.Write(htlc.PaymentHash, pmt)
}

for _, p := range pmt.Payments {
Expand All @@ -1223,7 +1223,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI

case "RCVD_REMOVE_ACK_REVOCATION":
// direction out, look for payments
pmt, ok := sendpaysCache.SafeRead(htlc.PaymentHash)
pmt, ok := sendpaysCache.Read(htlc.PaymentHash)
if !ok { // fetch from cln
err := client.Request(&ListSendPaysRequest{
PaymentHash: htlc.PaymentHash,
Expand All @@ -1232,7 +1232,7 @@ func fetchPaymentsStats(client *glightning.Lightning, timeStamp uint64, channelI
continue
}
// cache it
sendpaysCache.SafeWrite(htlc.PaymentHash, pmt)
sendpaysCache.Write(htlc.PaymentHash, pmt)
}

for _, p := range pmt.Payments {
Expand Down Expand Up @@ -1449,10 +1449,10 @@ func ApplyAutoFees() {
liqPct := int(channelMap["to_us_msat"].(float64) * 100 / channelMap["total_msat"].(float64))

// check 10 minutes back to be sure
ts, ok := LastForwardTS.SafeRead(channelId)
ts, ok := LastForwardTS.Read(channelId)
if ok && ts > time.Now().Add(-time.Duration(10*time.Minute)).Unix() {
// forget failed HTLC to prevent duplicate action
failedForwardTS.SafeWrite(channelId, 0)
failedForwardTS.Write(channelId, 0)

if liqPct <= params.LowLiqPct {
// bump fee
Expand Down Expand Up @@ -1489,7 +1489,7 @@ func ApplyAutoFees() {
func PlotPPM(lndChannelId uint64) *[]DataPoint {
var plot []DataPoint

fo, ok := forwardsOut.SafeRead(lndChannelId)
fo, ok := forwardsOut.Read(lndChannelId)
if ok {
for _, e := range fo {
// ignore small forwards
Expand Down Expand Up @@ -1531,7 +1531,7 @@ func ForwardsLog(channelId uint64, fromTS int64) *[]DataPoint {
})

if channelId > 0 {
fi, ok := forwardsIn.SafeRead(channelId)
fi, ok := forwardsIn.Read(channelId)
if ok {
for _, e := range fi {
// ignore small forwards
Expand Down
4 changes: 2 additions & 2 deletions cmd/psweb/ln/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
}

// track timestamp of the last outbound forward per channel
LastForwardTS = safemap.NewSafeMap[uint64, int64]()
LastForwardTS = safemap.New[uint64, int64]()

// received via custom messages, per peer nodeId
LiquidBalances = make(map[string]*BalanceInfo)
Expand Down Expand Up @@ -439,7 +439,7 @@ func calculateAutoFee(channelId uint64, params *AutoFeeParams, liqPct int, oldFe
// must be definitely above threshold and cool-off period passed
if liqPct > params.LowLiqPct && lastUpdate < time.Now().Add(-time.Duration(params.CoolOffHours)*time.Hour).Unix() {
// check the inactivity period
if ts, ok := LastForwardTS.SafeRead(channelId); ok && ts < time.Now().AddDate(0, 0, -params.InactivityDays).Unix() {
if ts, ok := LastForwardTS.Read(channelId); ok && ts < time.Now().AddDate(0, 0, -params.InactivityDays).Unix() {
// decrease the fee
newFee -= params.InactivityDropPPM
newFee = newFee * (100 - params.InactivityDropPct) / 100
Expand Down
Loading

0 comments on commit f969af3

Please sign in to comment.