Skip to content

Commit

Permalink
Merge pull request #236 from dozyio/feat-globaldb-updates
Browse files Browse the repository at this point in the history
feat: update globaldb example
  • Loading branch information
hsanjuan authored Nov 8, 2024
2 parents cbaacc1 + 431e8f3 commit 0f14947
Show file tree
Hide file tree
Showing 5 changed files with 593 additions and 221 deletions.
1 change: 1 addition & 0 deletions examples/globaldb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data/
86 changes: 86 additions & 0 deletions examples/globaldb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Example GlobalDB CLI

This repository contains an example command-line interface (CLI) tool for joining a global, permissionless, CRDT-based database using CRDTs, IPFS & libp2p.

## Features

- Join a global CRDT-based database with IPFS.
- Store and retrieve key-value pairs in a distributed datastore.
- Subscribe to a pubsub topic to receive updates in real-time.
- Bootstrap and connect to other peers in the network.
- Operate in daemon mode for continuous operation.
- Simple CLI commands to interact with the database.

## Building

To build the example GlobalDB CLI, clone this repository and build the binary:

```bash
git clone https://github.com/ipfs/go-ds-crdt
cd examples/globaldb
go build -o globaldb
```

Ensure that you have Go installed and set up in your environment.

## Usage

Run the CLI with:

```bash
./globaldb [options]
```

### Options

- `-daemon`: Run in daemon mode.
- `-datadir`: Specify a directory for storing the local database and keys.

### Commands

Once running, the CLI provides the following interactive commands:

- `list`: List all items in the store.
- `get <key>`: Retrieve the value for a specified key.
- `put <key> <value>`: Store a value with a specified key.
- `connect <multiaddr>`: Connect to a peer using its multiaddress.
- `debug <on/off/peers/subs>`: Enable or disable debug logging, list connected peers, show pubsub subscribers
- `exit`: Quit the CLI.

### Example

Starting the CLI:

```bash
./globaldb -datadir /path/to/data
```

Interacting with the database:

```plaintext
> put exampleKey exampleValue
> get exampleKey
[exampleKey] -> exampleValue
> list
[exampleKey] -> exampleValue
> connect /ip4/192.168.1.3/tcp/33123/p2p/12D3KooWEkgRTTXGsmFLBembMHxVPDcidJyqFcrqbm9iBE1xhdXq
```

### Daemon Mode

To run in daemon mode, use:

```bash
./globaldb -daemon -datadir /path/to/data
```

The CLI will keep running, periodically reporting the number of connected peers and those subscribed to the crdt topic.

## Technical Details

The GlobalDB CLI leverages the following components:

- **IPFS Lite**: Provides a lightweight IPFS node for peer-to-peer networking.
- **Libp2p PubSub**: Enables decentralized communication using the GossipSub protocol.
- **CRDTs**: Ensure conflict-free synchronization of data across distributed peers.
- **Badger Datastore**: A high-performance datastore for storing key-value pairs.
153 changes: 125 additions & 28 deletions examples/globaldb/globaldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package main
import (
"bufio"
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -27,20 +28,31 @@ import (
"github.com/libp2p/go-libp2p/core/peer"

ipfslite "github.com/hsanjuan/ipfs-lite"
"github.com/mitchellh/go-homedir"

multiaddr "github.com/multiformats/go-multiaddr"
)

var (
logger = logging.Logger("globaldb")
listen, _ = multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/33123")
topicName = "globaldb-example"
netTopic = "globaldb-example-net"
config = "globaldb-example"
)

func main() {
daemonMode := flag.Bool("daemon", false, "Run in daemon mode")
dataDir := flag.String("datadir", "", "Use a custom data directory")
port := flag.String("port", "0", "Specify the TCP port to listen on")

flag.Parse()

if *port != "" {
parsedPort, err := strconv.ParseUint(*port, 10, 32)
if err != nil || parsedPort > 65535 {
logger.Fatal("Specify a valid TCP port")
}
}

// Bootstrappers are using 1024 keys. See:
// https://github.com/ipfs/infra/issues/378
crypto.MinRsaKeyBits = 1024
Expand All @@ -49,11 +61,26 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := homedir.Dir()
if err != nil {
logger.Fatal(err)
data := ""

if dataDir == nil || *dataDir == "" {
dir, err := os.MkdirTemp("", "globaldb-example")
if err != nil {
logger.Fatal(err)
}
defer os.RemoveAll(dir)
data = dir + "/" + config
} else {
// check if the directory exists or create it
_, err := os.Stat(*dataDir)
if os.IsNotExist(err) {
err = os.Mkdir(*dataDir, 0755)
if err != nil {
logger.Fatal(err)
}
}
data = *dataDir + "/" + config
}
data := filepath.Join(dir, config)

store, err := badger.NewDatastore(data, &badger.DefaultOptions)
if err != nil {
Expand All @@ -73,14 +100,14 @@ func main() {
if err != nil {
logger.Fatal(err)
}
err = ioutil.WriteFile(keyPath, data, 0400)
err = os.WriteFile(keyPath, data, 0400)
if err != nil {
logger.Fatal(err)
}
} else if err != nil {
logger.Fatal(err)
} else {
key, err := ioutil.ReadFile(keyPath)
key, err := os.ReadFile(keyPath)
if err != nil {
logger.Fatal(err)
}
Expand All @@ -95,6 +122,8 @@ func main() {
logger.Fatal(err)
}

listen, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/" + *port)

h, dht, err := ipfslite.SetupLibp2p(
ctx,
priv,
Expand Down Expand Up @@ -188,29 +217,26 @@ func main() {

fmt.Printf(`
Peer ID: %s
Listen address: %s
Topic: %s
Data Folder: %s
Listen addresses:
%s
Ready!
Commands:
> list -> list items in the store
> get <key> -> get value for a key
> put <key> <value> -> store value on a key
> exit -> quit
`,
pid, listen, topicName, data,
pid, topicName, data, listenAddrs(h),
)

if len(os.Args) > 1 && os.Args[1] == "daemon" {
if *daemonMode {
fmt.Println("Running in daemon mode")
go func() {
for {
fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(connectedPeers(h)))
fmt.Printf(
"%s - %d connected peers - %d peers in topic\n",
time.Now().Format(time.Stamp),
len(connectedPeers(h)),
len(topic.ListPeers()),
)
time.Sleep(10 * time.Second)
}
}()
Expand All @@ -225,6 +251,22 @@ Commands:
return
}

commands := `
> (l)ist -> list items in the store
> (g)get <key> -> get value for a key
> (p)ut <key> <value> -> store value on a key
> (d)elete <key> -> delete a key
> (c)onnect <multiaddr> -> connect a multiaddr
> print -> Print DAG
> debug <on/off/peers/subs> -> enable/disable debug logging
show connected peers
show pubsub subscribers
> exit -> quit
`
fmt.Printf("%s", commands)

fmt.Printf("> ")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
Expand All @@ -240,9 +282,15 @@ Commands:
switch cmd {
case "exit", "quit":
return
case "?", "help", "h":
fmt.Printf("%s", commands)
fmt.Printf("> ")
continue
case "debug":
if len(fields) < 2 {
fmt.Println("debug <on/off/peers>")
fmt.Println("debug <on/off/peers/subs>")
fmt.Println("> ")
continue
}
st := fields[1]
switch st {
Expand All @@ -261,8 +309,12 @@ Commands:
fmt.Println(a)
}
}
case "subs":
for _, p := range topic.ListPeers() {
fmt.Println(p.String())
}
}
case "list":
case "l", "list":
q := query.Query{}
results, err := crdt.Query(ctx, q)
if err != nil {
Expand All @@ -275,10 +327,10 @@ Commands:
}
fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value))
}
case "get":
case "g", "get":
if len(fields) < 2 {
fmt.Println("get <key>")
fmt.Println("> ")
fmt.Printf("> ")
continue
}
k := ds.NewKey(fields[1])
Expand All @@ -288,10 +340,10 @@ Commands:
continue
}
fmt.Printf("[%s] -> %s\n", k, string(v))
case "put":
case "p", "put":
if len(fields) < 3 {
fmt.Println("put <key> <value>")
fmt.Println("> ")
fmt.Printf("> ")
continue
}
k := ds.NewKey(fields[1])
Expand All @@ -301,6 +353,42 @@ Commands:
printErr(err)
continue
}
case "d", "delete":
if len(fields) < 2 {
fmt.Println("delete <key>")
fmt.Printf("> ")
continue
}
k := ds.NewKey(fields[1])
err := crdt.Delete(ctx, k)
if err != nil {
printErr(err)
continue
}
case "c", "connect":
if len(fields) < 2 {
fmt.Println("connect <mulitaddr>")
fmt.Printf("> ")
continue
}
ma, err := multiaddr.NewMultiaddr(fields[1])
if err != nil {
printErr(err)
continue
}
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
printErr(err)
continue
}
h.Peerstore().AddAddr(peerInfo.ID, peerInfo.Addrs[0], 300)
err = h.Connect(ctx, *peerInfo)
if err != nil {
printErr(err)
continue
}
case "print":
crdt.PrintDAG()
}
fmt.Printf("> ")
}
Expand All @@ -321,3 +409,12 @@ func connectedPeers(h host.Host) []*peer.AddrInfo {
}
return pinfos
}

func listenAddrs(h host.Host) string {
var addrs []string
for _, c := range h.Addrs() {
ma, _ := multiaddr.NewMultiaddr(c.String() + "/p2p/" + h.ID().String())
addrs = append(addrs, ma.String())
}
return strings.Join(addrs, "\n")
}
Loading

0 comments on commit 0f14947

Please sign in to comment.