-
Notifications
You must be signed in to change notification settings - Fork 48
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for postgres as store engine
- Loading branch information
1 parent
f9aeb57
commit 6ae736b
Showing
7 changed files
with
476 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
FROM postgres:16 | ||
|
||
RUN apt-get update && apt-get install -y \ | ||
postgresql-server-dev-16 \ | ||
build-essential \ | ||
libpq-dev \ | ||
wget | ||
|
||
RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.5.tar.gz \ | ||
&& tar -xvzf v1.6.5.tar.gz \ | ||
&& cd pg_cron-1.6.5 \ | ||
&& make && make install \ | ||
&& cd .. && rm -rf v1.6.5.tar.gz pg_cron-1.6.5 | ||
|
||
RUN echo "shared_preload_libraries = 'pg_cron'" >> /usr/share/postgresql/postgresql.conf.sample \ | ||
&& echo "cron.database_name = 'testdb'" >> /usr/share/postgresql/postgresql.conf.sample |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
CREATE TABLE locks ( | ||
name TEXT PRIMARY KEY, | ||
leaderID TEXT NOT NULL | ||
); | ||
|
||
CREATE TABLE kv ( | ||
key TEXT PRIMARY KEY, | ||
value BYTEA | ||
); | ||
|
||
CREATE OR REPLACE FUNCTION notify_changes() | ||
RETURNS TRIGGER AS $$ | ||
BEGIN | ||
IF TG_OP = 'INSERT' THEN | ||
PERFORM cron.schedule('delete_' || NEW.name, '6 seconds', FORMAT('DELETE FROM locks WHERE name = %L', NEW.name)); | ||
PERFORM pg_notify('lock_change', 'INSERT:' || NEW.leaderID::text); | ||
END IF; | ||
|
||
IF TG_OP = 'DELETE' THEN | ||
PERFORM cron.unschedule('delete_' || OLD.name); | ||
PERFORM pg_notify('lock_change', 'DELETE:' || OLD.leaderID::text); | ||
END IF; | ||
|
||
RETURN NULL; | ||
END; | ||
$$ LANGUAGE plpgsql; | ||
|
||
CREATE TRIGGER lock_change_trigger | ||
AFTER INSERT OR DELETE ON locks | ||
FOR EACH ROW EXECUTE FUNCTION notify_changes(); | ||
|
||
CREATE EXTENSION IF NOT EXISTS pg_cron; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,291 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
* | ||
*/ | ||
package postgresql | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/apache/kvrocks-controller/consts" | ||
"github.com/apache/kvrocks-controller/logger" | ||
"github.com/apache/kvrocks-controller/store/engine" | ||
"github.com/lib/pq" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
lockTTL = 6 * time.Second | ||
listenerMinReconnectInterval = 10 * time.Second | ||
listenerMaxReconnectInterval = 1 * time.Minute | ||
defaultElectPath = "/kvrocks/controller/leader" | ||
) | ||
|
||
type Config struct { | ||
Addrs []string `yaml:"addrs"` | ||
Username string `yaml:"username"` | ||
Password string `yaml:"password"` | ||
DBName string `yaml:"db_name"` | ||
NotifyChannel string `yaml:"notify_channel"` | ||
ElectPath string `yaml:"elect_path"` | ||
} | ||
|
||
type Postgresql struct { | ||
db *sql.DB | ||
listener *pq.Listener | ||
|
||
leaderMu sync.Mutex | ||
leaderID string | ||
myID string | ||
electPath string | ||
isReady atomic.Bool | ||
|
||
quitCh chan struct{} | ||
wg sync.WaitGroup | ||
lockReleaseCh chan bool | ||
leaderChangeCh chan bool | ||
} | ||
|
||
func New(id string, cfg *Config) (*Postgresql, error) { | ||
if len(id) == 0 { | ||
return nil, errors.New("id must NOT be a empty string") | ||
} | ||
|
||
connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", cfg.Username, cfg.Password, cfg.Addrs[0], cfg.DBName) | ||
db, err := sql.Open("postgres", connStr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
listener := pq.NewListener(connStr, listenerMinReconnectInterval, listenerMaxReconnectInterval, nil) | ||
err = listener.Listen(cfg.NotifyChannel) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
electPath := defaultElectPath | ||
if cfg.ElectPath != "" { | ||
electPath = defaultElectPath | ||
} | ||
|
||
p := &Postgresql{ | ||
myID: id, | ||
electPath: electPath, | ||
db: db, | ||
listener: listener, | ||
quitCh: make(chan struct{}), | ||
lockReleaseCh: make(chan bool), | ||
leaderChangeCh: make(chan bool), | ||
} | ||
p.isReady.Store(false) | ||
p.wg.Add(2) | ||
p.initLeaderId() | ||
go p.electLoop() | ||
go p.observeLeaderEvent() | ||
return p, nil | ||
} | ||
|
||
func (p *Postgresql) ID() string { | ||
return p.myID | ||
} | ||
|
||
func (p *Postgresql) Leader() string { | ||
p.leaderMu.Lock() | ||
defer p.leaderMu.Unlock() | ||
return p.leaderID | ||
} | ||
|
||
func (p *Postgresql) LeaderChange() <-chan bool { | ||
return p.leaderChangeCh | ||
} | ||
|
||
func (p *Postgresql) IsReady(ctx context.Context) bool { | ||
for { | ||
select { | ||
case <-p.quitCh: | ||
return false | ||
case <-time.After(100 * time.Millisecond): | ||
if p.isReady.Load() { | ||
return true | ||
} | ||
case <-ctx.Done(): | ||
return p.isReady.Load() | ||
} | ||
} | ||
} | ||
|
||
func (p *Postgresql) Get(ctx context.Context, key string) ([]byte, error) { | ||
var value []byte | ||
query := "SELECT value FROM kv WHERE key = $1" | ||
|
||
row := p.db.QueryRow(query, key) | ||
err := row.Scan(&value) | ||
if err == sql.ErrNoRows { | ||
return nil, consts.ErrNotFound | ||
} | ||
if err != nil { | ||
return nil, err | ||
} | ||
return value, nil | ||
} | ||
|
||
func (p *Postgresql) Exists(ctx context.Context, key string) (bool, error) { | ||
_, err := p.Get(ctx, key) | ||
if err != nil { | ||
if errors.Is(err, consts.ErrNotFound) { | ||
return false, nil | ||
} | ||
return false, err | ||
} | ||
return true, nil | ||
} | ||
|
||
func (p *Postgresql) Set(ctx context.Context, key string, value []byte) error { | ||
query := "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value" | ||
_, err := p.db.Exec(query, key, value) | ||
return err | ||
} | ||
|
||
func (p *Postgresql) Delete(ctx context.Context, key string) error { | ||
query := "DELETE FROM kv WHERE key = $1" | ||
_, err := p.db.Exec(query, key) | ||
return err | ||
} | ||
|
||
func (p *Postgresql) List(ctx context.Context, prefix string) ([]engine.Entry, error) { | ||
prefixWithWildcard := prefix + "%" | ||
query := "SELECT * from kv WHERE key LIKE $1" | ||
rows, err := p.db.Query(query, prefixWithWildcard) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer rows.Close() | ||
|
||
prefixLen := len(prefix) | ||
entries := make([]engine.Entry, 0) | ||
for rows.Next() { | ||
var key string | ||
var value []byte | ||
|
||
err := rows.Scan(&key, &value) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if key == prefix { | ||
continue | ||
} | ||
|
||
key = strings.TrimLeft(key[prefixLen+1:], "/") | ||
if strings.ContainsRune(key, '/') { | ||
continue | ||
} | ||
entries = append(entries, engine.Entry{ | ||
Key: key, | ||
Value: value, | ||
}) | ||
} | ||
return entries, nil | ||
} | ||
|
||
func (p *Postgresql) electLoop() { | ||
defer p.wg.Done() | ||
for { | ||
select { | ||
case <-p.quitCh: | ||
return | ||
default: | ||
} | ||
|
||
query := "INSERT INTO locks (name, leaderID) VALUES ($1, $2) ON CONFLICT DO NOTHING" | ||
_, err := p.db.Exec(query, p.electPath, p.myID) | ||
if err != nil { | ||
time.Sleep(lockTTL / 3) | ||
continue | ||
} | ||
|
||
select { | ||
case <-p.lockReleaseCh: | ||
continue | ||
case <-p.quitCh: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (p *Postgresql) observeLeaderEvent() { | ||
defer p.wg.Done() | ||
|
||
for { | ||
select { | ||
case <-p.quitCh: | ||
return | ||
case notification := <-p.listener.Notify: | ||
p.isReady.Store(true) | ||
|
||
data := strings.SplitN(notification.Extra, ":", 2) | ||
if len(data) != 2 { | ||
logger.Get().With( | ||
zap.Error(fmt.Errorf("failed to parse notification data: expected two parts separated by a colon")), | ||
).Error("Failed to parse notification data") | ||
} | ||
|
||
operation := data[0] | ||
leaderID := data[1] | ||
|
||
if operation == "INSERT" { | ||
p.leaderMu.Lock() | ||
p.leaderID = leaderID | ||
p.leaderMu.Unlock() | ||
p.leaderChangeCh <- true | ||
} else { | ||
p.lockReleaseCh <- true | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (p *Postgresql) initLeaderId() error { | ||
var leaderId string | ||
query := "SELECT leaderID FROM locks WHERE name = $1" | ||
row := p.db.QueryRow(query, p.electPath) | ||
err := row.Scan(&leaderId) | ||
if err == sql.ErrNoRows { | ||
p.leaderID = "" | ||
return nil | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
p.leaderID = leaderId | ||
return nil | ||
} | ||
|
||
func (p *Postgresql) Close() error { | ||
close(p.quitCh) | ||
p.wg.Wait() | ||
p.listener.Close() | ||
return p.db.Close() | ||
} |
Oops, something went wrong.