Skip to content

Commit

Permalink
added docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Navid2zp committed Apr 17, 2021
1 parent d610057 commit bc9904f
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 65 deletions.
3 changes: 3 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (

var logger *logging.Logger

// initAPI initiates the configuration for api service.
func initAPI() {
logger = logging.NewLogger("api")
if !config.Config.Settings.Debug {
gin.SetMode(gin.ReleaseMode)
}
}

// adminMiddleware checks the request headers for secret key provided in config file.
func adminMiddleware(c *gin.Context) {
secret := c.Request.Header.Get("Secret")
if secret == "" {
Expand All @@ -30,6 +32,7 @@ func adminMiddleware(c *gin.Context) {
c.Next()
}

// Serve serves the API.
func Serve() {
initAPI()

Expand Down
14 changes: 0 additions & 14 deletions api/helpers.go

This file was deleted.

3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
)

// ServiceConfig represents the config data for the application.
type ServiceConfig struct {
Monitor struct {
Host string
Expand Down Expand Up @@ -36,6 +37,7 @@ type ServiceConfig struct {

var Config ServiceConfig

// getConfigNameAndPath returns the path and name of the config file.
func getConfigNameAndPath(configPath string) (string, string) {
path, err := filepath.Abs(filepath.Dir(os.Args[0]))
name := "config"
Expand All @@ -50,6 +52,7 @@ func getConfigNameAndPath(configPath string) (string, string) {
return name, path
}

// InitConfig initializes the application config.
func InitConfig(configPath string) (*ServiceConfig, error) {
dir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
Expand Down
16 changes: 15 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

var ErrDBNotFound = errors.New("no such database found")

// Node represents a node in monitoring service.
type Node struct {
FormationID string `db:"formationid" json:"formation_id"`
ID int `db:"nodeid" json:"id"`
Expand All @@ -32,6 +33,7 @@ type Node struct {
IsCoordinator bool `db:"-" json:"-"`
}

// Worker represents a worker in coordinators.
type Worker struct {
ID int `db:"nodeid" json:"id"`
GroupID int `db:"groupid" json:"group_id"`
Expand All @@ -46,19 +48,22 @@ type Worker struct {
ShouldHaveShards bool `db:"shouldhaveshards" json:"should_have_shards"`
}

// Coordinator represents a coordinator database
type Coordinator struct {
PrimaryNodeID int `db:"primary_node_id" json:"primary_node_id"`
Name string `db:"primary_name" json:"name"`
Host string `db:"primary_host" json:"host"`
Port int `db:"primary_port" json:"port"`
}

// GetNodes returns all the node monitored in the monitor.
func GetNodes() ([]*Node, error) {
var nodes []*Node
err := monitorDB.Select(&nodes, `select * from pgautofailover.node`)
return nodes, err
}

// GetPrimaryCoordinator returns the primary coordinator info given the database name.
func GetPrimaryCoordinator(dbname string) (*Coordinator, error) {
var db *database
if db = findDatabase(dbname); db == nil {
Expand All @@ -67,6 +72,7 @@ func GetPrimaryCoordinator(dbname string) (*Coordinator, error) {
return db.getCoordinator()
}

// GetPrimaryWorkers returns all the workers available in a database.
func GetPrimaryWorkers(dbname string) ([]*Worker, error) {
var db *database
var workers []*Worker
Expand All @@ -76,6 +82,7 @@ func GetPrimaryWorkers(dbname string) ([]*Worker, error) {
return db.getPrimaryWorkers()
}

// GetCoordinators returns a list of all the primary and non-primary coordinator nodes.
func GetCoordinators(dbname string) ([]*Node, error) {
var coordinators []*Node
var db *database
Expand All @@ -87,19 +94,22 @@ func GetCoordinators(dbname string) ([]*Node, error) {
return coordinators, err
}

// getPrimaryWorkers returns all the primary workers for a database.
func (d *database) getPrimaryWorkers() ([]*Worker, error) {
var workers []*Worker
err := d.db.Select(&workers, `SELECT * from pg_dist_node where noderole = 'primary';`)
return workers, err
}

// getCoordinator returns the coordinator for the database.
func (d *database) getCoordinator() (*Coordinator, error) {
var node Coordinator
err := monitorDB.Get(&node,
`select * from pgautofailover.get_primary($1);`, d.formation)
return &node, err
}

// isPrimary checks if the worker is a primary node in the monitor.
func (w *Worker) isPrimary() (bool, *Node, error) {
var newNode Node
err := monitorDB.Get(&newNode, `select * from pgautofailover.node
Expand All @@ -114,11 +124,15 @@ func (w *Worker) isPrimary() (bool, *Node, error) {
return false, &newNode, err
}

func (w *Worker) updateCoordinator(newHost string, newPort int, db *database) error {
// updateWorkerInCoordinator updates a worker node in the database.
func (w *Worker) updateWorkerInCoordinator(newHost string, newPort int, db *database) error {
_, err := db.db.Exec(`select * from citus_update_node($1, $2, $3);`, w.ID, newHost, newPort)
return err
}

// connect connects to the database
// checks the coordinator state and the previous connections
// establishes a new one if connection is lost primary node changed
func (d *database) connect(coordinatorNode *Coordinator) error {
var err error
if d.db == nil {
Expand Down
6 changes: 6 additions & 0 deletions core/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ type database struct {
db *sqlx.DB
}

// databases holds the list of all databases to be monitored
var databases []*database
// monitorDB is database instance for monitor
var monitorDB *sqlx.DB

// openMonitoringConnection opens a connection to monitor database
func openMonitoringConnection() {
var err error

Expand All @@ -38,6 +41,7 @@ func openMonitoringConnection() {
}
}

// setupDatabases adds all the listed databases in the config file to databases.
func setupDatabases() {
for _, db := range config.Config.Coordinators {
coordinator := database{
Expand All @@ -53,6 +57,7 @@ func setupDatabases() {
}
}

// openDBConnection opens a database connection.
func openDBConnection(host, username, dbname, password string, port int) (*sqlx.DB, error) {
return sqlx.Connect(
"postgres",
Expand All @@ -67,6 +72,7 @@ func openDBConnection(host, username, dbname, password string, port int) (*sqlx.
)
}

// findDatabase finds a database in monitoring databases list given the database name.
func findDatabase(dbname string) *database {
for _, db := range databases {
if db.dbname == dbname {
Expand Down
50 changes: 1 addition & 49 deletions core/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"github.com/Navid2zp/citus-failover/logging"
"github.com/lib/pq"
_ "github.com/lib/pq"
"time"
)


var logger *logging.Logger


// InitMonitor initializes the configurations for monitoring databases.
func InitMonitor() {
openMonitoringConnection()
setupDatabases()
Expand All @@ -36,30 +36,6 @@ func (ni *NullInt64) ToInt64() int64 {
return ni.Int64
}

func Int64ToNullInt64(i int64) NullInt64 {
if i == 0 {
return NullInt64{}
}
return NullInt64{
sql.NullInt64{
Int64: i,
Valid: true,
},
}
}

func TimeToNullTime(t time.Time, null bool) NullTime {
if null {
return NullTime{}
}
return NullTime{
pq.NullTime{
Time: t,
Valid: true,
},
}
}

// UnmarshalJSON for NullInt64
func (ni *NullInt64) UnmarshalJSON(b []byte) error {
err := json.Unmarshal(b, &ni.Int64)
Expand Down Expand Up @@ -131,30 +107,6 @@ func (ns *NullString) ToString() string {
return ns.String
}

func StringToNullString(s string) NullString {
if s == "" {
return NullString{}
}
return NullString{
sql.NullString{
String: s,
Valid: true,
},
}
}

func Float64ToNullFloat64(f float64) NullFloat64 {
if f == 0 {
return NullFloat64{}
}
return NullFloat64{
sql.NullFloat64{
Float64: f,
Valid: true,
},
}
}

type NullTime struct {
pq.NullTime
}
Expand Down
6 changes: 5 additions & 1 deletion core/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"
)

// validateWorker validates if the worker is primary
func (d *database) validateWorker(worker *Worker) {
isPrimary, newNode, err := worker.isPrimary()
if err != nil {
Expand All @@ -14,7 +15,7 @@ func (d *database) validateWorker(worker *Worker) {
}
if !isPrimary {
logger.WorkerStateChange(worker.Host, newNode.Host, d.dbname, worker.Port, newNode.Port)
err = worker.updateCoordinator(newNode.Host, newNode.Port, d)
err = worker.updateWorkerInCoordinator(newNode.Host, newNode.Port, d)
if err != nil {
logger.WorkerUpdateFailed(err, d.dbname)
return
Expand All @@ -23,6 +24,7 @@ func (d *database) validateWorker(worker *Worker) {
}
}

// validateWorkers iterates over all the workers for a database and validates them.
func (d *database) validateWorkers() {
workers, err := d.getPrimaryWorkers()
if err != nil {
Expand All @@ -34,6 +36,7 @@ func (d *database) validateWorkers() {
}
}

// monitor periodically monitors the database for non-primary nodes.
func (d *database) monitor() {
for {
time.Sleep(time.Duration(config.Config.Settings.CheckInterval) * time.Millisecond)
Expand All @@ -57,6 +60,7 @@ func (d *database) monitor() {
}


// Monitor runs the monitoring for all databases.
func Monitor() {

for _, db := range databases {
Expand Down
1 change: 1 addition & 0 deletions logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Logger struct {
l *zap.Logger
}

// NewLogger creates and returns a new Logger pointer
func NewLogger(service string) *Logger {
//zap.NewProductionConfig()
//cfg := zap.NewProductionConfig()
Expand Down

0 comments on commit bc9904f

Please sign in to comment.