Skip to content

Commit

Permalink
Merge pull request #46
Browse files Browse the repository at this point in the history
Using SQLite database for sessions
  • Loading branch information
bsrinivas8687 authored Jun 30, 2021
2 parents 0b2cab1 + dfe179f commit 789b474
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 294 deletions.
30 changes: 25 additions & 5 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"

"github.com/sentinel-official/dvpn-node/context"
"github.com/sentinel-official/dvpn-node/lite"
Expand All @@ -34,8 +37,9 @@ func StartCmd() *cobra.Command {
Short: "Start VPN node",
RunE: func(cmd *cobra.Command, _ []string) error {
var (
home = viper.GetString(flags.FlagHome)
path = filepath.Join(home, types.ConfigFileName)
home = viper.GetString(flags.FlagHome)
configPath = filepath.Join(home, types.ConfigFileName)
databasePath = filepath.Join(home, types.DatabaseFileName)
)

log, err := utils.PrepareLogger()
Expand All @@ -44,9 +48,9 @@ func StartCmd() *cobra.Command {
}

v := viper.New()
v.SetConfigFile(path)
v.SetConfigFile(configPath)

log.Info("Reading the configuration file", "path", path)
log.Info("Reading the configuration file", "path", configPath)
cfg, err := types.ReadInConfig(v)
if err != nil {
return err
Expand Down Expand Up @@ -158,6 +162,22 @@ func StartCmd() *cobra.Command {
return err
}

log.Info("Opening the database", "path", databasePath)
database, err := gorm.Open(
sqlite.Open(databasePath),
&gorm.Config{
Logger: logger.Discard,
},
)
if err != nil {
return err
}

log.Info("Migrating database models...")
if err := database.AutoMigrate(&types.Session{}); err != nil {
return err
}

var (
ctx = context.NewContext()
router = mux.NewRouter()
Expand All @@ -172,7 +192,7 @@ func StartCmd() *cobra.Command {
WithConfig(cfg).
WithClient(client).
WithLocation(location).
WithSessions(types.NewSessions()).
WithDatabase(database).
WithBandwidth(bandwidth)

n := node.NewNode(ctx)
Expand Down
7 changes: 4 additions & 3 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gorilla/mux"
hubtypes "github.com/sentinel-official/hub/types"
tmlog "github.com/tendermint/tendermint/libs/log"
"gorm.io/gorm"

"github.com/sentinel-official/dvpn-node/lite"
"github.com/sentinel-official/dvpn-node/types"
Expand All @@ -18,9 +19,9 @@ type Context struct {
bandwidth *hubtypes.Bandwidth
client *lite.Client
config *types.Config
database *gorm.DB
location *types.GeoIPLocation
router *mux.Router
sessions *types.Sessions
}

func NewContext() *Context {
Expand All @@ -34,7 +35,7 @@ func (c *Context) WithLocation(v *types.GeoIPLocation) *Context { c.location = v
func (c *Context) WithLogger(v tmlog.Logger) *Context { c.logger = v; return c }
func (c *Context) WithRouter(v *mux.Router) *Context { c.router = v; return c }
func (c *Context) WithService(v types.Service) *Context { c.service = v; return c }
func (c *Context) WithSessions(v *types.Sessions) *Context { c.sessions = v; return c }
func (c *Context) WithDatabase(v *gorm.DB) *Context { c.database = v; return c }

func (c *Context) Address() hubtypes.NodeAddress { return c.Operator().Bytes() }
func (c *Context) Bandwidth() *hubtypes.Bandwidth { return c.bandwidth }
Expand All @@ -50,7 +51,7 @@ func (c *Context) Operator() sdk.AccAddress { return c.client.FromAdd
func (c *Context) RemoteURL() string { return c.Config().Node.RemoteURL }
func (c *Context) Router() *mux.Router { return c.router }
func (c *Context) Service() types.Service { return c.service }
func (c *Context) Sessions() *types.Sessions { return c.sessions }
func (c *Context) Database() *gorm.DB { return c.database }

func (c *Context) IntervalUpdateSessions() time.Duration {
return c.Config().Node.IntervalUpdateSessions
Expand Down
22 changes: 0 additions & 22 deletions context/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package context

import (
"encoding/base64"

sdk "github.com/cosmos/cosmos-sdk/types"
)

func (c *Context) RemovePeer(key string) error {
Expand All @@ -22,23 +20,3 @@ func (c *Context) RemovePeer(key string) error {

return nil
}

func (c *Context) RemoveSession(key string, address sdk.AccAddress) error {
c.Log().Info("Removing session from list", "key", key, "address", address)

c.Sessions().DeleteByKey(key)
c.Sessions().DeleteByAddress(address)

return nil
}

func (c *Context) RemovePeerAndSession(key string, address sdk.AccAddress) error {
if err := c.RemovePeer(key); err != nil {
return err
}
if err := c.RemoveSession(key, address); err != nil {
return err
}

return nil
}
6 changes: 2 additions & 4 deletions context/tx.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package context

import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
hubtypes "github.com/sentinel-official/hub/types"
nodetypes "github.com/sentinel-official/hub/x/node/types"
Expand Down Expand Up @@ -69,14 +67,14 @@ func (c *Context) UpdateNodeStatus() error {
func (c *Context) UpdateSessions(items ...types.Session) error {
c.Log().Info("Updating sessions...")

var messages []sdk.Msg
messages := make([]sdk.Msg, 0, len(items))
for _, item := range items {
messages = append(messages,
sessiontypes.NewMsgUpdateRequest(
c.Address(),
sessiontypes.Proof{
Id: item.ID,
Duration: time.Since(item.ConnectedAt),
Duration: item.UpdatedAt.Sub(item.CreatedAt),
Bandwidth: hubtypes.NewBandwidthFromInt64(item.Download, item.Upload),
},
nil,
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ require (
github.com/tendermint/tendermint v0.34.11
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
google.golang.org/grpc v1.38.0
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.11
)

replace (
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U=
github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ=
Expand Down Expand Up @@ -413,6 +418,8 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -1085,6 +1092,11 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.1.4 h1:PDzwYE+sI6De2+mxAneV9Xs11+ZyKV6oxD3wDGkaNvM=
gorm.io/driver/sqlite v1.1.4/go.mod h1:mJCeTFr7+crvS+TRnWc5Z3UvwxUN1BGBLMrf5LA9DYw=
gorm.io/gorm v1.20.7/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/gorm v1.21.11 h1:CxkXW6Cc+VIBlL8yJEHq+Co4RYXdSLiMKNvgoZPjLK4=
gorm.io/gorm v1.21.11/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
125 changes: 87 additions & 38 deletions node/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,48 @@ func (n *Node) jobSetSessions() error {
for ; ; <-t.C {
peers, err := n.Service().Peers()
if err != nil {
n.Log().Error("Failed to get connected peers", "error", err)
return err
}
n.Log().Info("Connected peers", "count", len(peers))

for i := 0; i < len(peers); i++ {
item := n.Sessions().GetByKey(peers[i].Key)
if item.Empty() {
n.Log().Error("Unknown connected peer", "peer", peers[i])
var item types.Session
n.Database().Model(
&types.Session{},
).Where(
&types.Session{
Key: peers[i].Key,
},
).First(&item)

if item.ID == 0 {
n.Log().Info("Unknown connected peer", "key", peers[i].Key)
if err := n.RemovePeer(peers[i].Key); err != nil {
return err
}

continue
}

item.Upload = peers[i].Upload
item.Download = peers[i].Download
n.Sessions().Update(item)

consumed := sdk.NewInt(item.Upload + item.Download)
if consumed.GT(item.Available) {
n.Log().Info("Peer quota exceeded", "id", item.ID,
"available", item.Available, "consumed", consumed)
n.Database().Model(
&types.Session{},
).Where(
&types.Session{
ID: item.ID,
},
).Updates(
&types.Session{
Upload: peers[i].Upload,
Download: peers[i].Download,
},
)

var (
available = sdk.NewInt(item.Available)
consumed = sdk.NewInt(peers[i].Upload + peers[i].Download)
)

if consumed.GT(available) {
n.Log().Info("Peer quota exceeded", "key", peers[i].Key)
if err := n.RemovePeer(item.Key); err != nil {
return err
}
Expand All @@ -65,11 +83,9 @@ func (n *Node) jobUpdateSessions() error {
t := time.NewTicker(n.IntervalUpdateSessions())
for ; ; <-t.C {
var items []types.Session
n.Sessions().Iterate(func(v types.Session) bool {
items = append(items, v)
return false
})
n.Log().Info("Iterated sessions", "count", len(items))
n.Database().Model(
&types.Session{},
).Find(&items)

for i := len(items) - 1; i >= 0; i-- {
session, err := n.Client().QuerySession(items[i].ID)
Expand All @@ -82,39 +98,72 @@ func (n *Node) jobUpdateSessions() error {
return err
}

remove, skip := func() (bool, bool) {
var (
nochange = items[i].Download == session.Bandwidth.Upload.Int64()
)
var (
removePeer = false
removeSession = false
skipUpdate = false
)

if items[i].Download == session.Bandwidth.Upload.Int64() {
skipUpdate = true
if items[i].CreatedAt.Before(session.StatusAt) {
removePeer = true
}

switch {
case nochange && items[i].ConnectedAt.Before(session.StatusAt):
n.Log().Info("Stale peer connection", "id", items[i].ID)
return true, true
case !subscription.Status.Equal(hubtypes.StatusActive):
n.Log().Info("Invalid subscription status", "id", items[i].ID, "nochange", nochange)
return true, nochange || subscription.Status.Equal(hubtypes.StatusInactive)
case !session.Status.Equal(hubtypes.StatusActive):
n.Log().Info("Invalid session status", "id", items[i].ID, "nochange", nochange)
return true, nochange || session.Status.Equal(hubtypes.StatusInactive)
default:
return false, false
n.Log().Info("Stale peer connection", "id", items[i].ID)
}
if !subscription.Status.Equal(hubtypes.StatusActive) {
removePeer = true
if subscription.Status.Equal(hubtypes.StatusInactive) {
removeSession, skipUpdate = true, true
}

n.Log().Info("Invalid subscription status", "id", items[i].ID)
}
if !session.Status.Equal(hubtypes.StatusActive) {
removePeer = true
if session.Status.Equal(hubtypes.StatusInactive) {
removeSession, skipUpdate = true, true
}
}()

if remove {
if err := n.RemovePeerAndSession(items[i].Key, items[i].Address); err != nil {
n.Log().Info("Invalid session status", "id", items[i].ID)
}

if removePeer {
if err := n.RemovePeer(items[i].Key); err != nil {
return err
}
}
if skip {

if removeSession {
n.Database().Model(
&types.Session{},
).Where(
&types.Session{
ID: items[i].ID,
},
).Update(
"address", "",
)
}

if skipUpdate {
items = append(items[:i], items[i+1:]...)
}
}

n.Database().Model(
&types.Session{},
).Where(
"address = ?", "",
).Delete(
&types.Session{},
)

if len(items) == 0 {
continue
}

if err := n.UpdateSessions(items...); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 789b474

Please sign in to comment.