Skip to content

Commit

Permalink
Merge pull request #4 from caiodearaujo/feature/multidevice
Browse files Browse the repository at this point in the history
Feature/multidevice
  • Loading branch information
caiodearaujo authored Sep 27, 2024
2 parents 91f0a67 + 7c58440 commit f8e81ae
Show file tree
Hide file tree
Showing 20 changed files with 609 additions and 259 deletions.
28 changes: 21 additions & 7 deletions api/src/data/device.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package data

import (
"github.com/uptrace/bun"
"strconv"
"time"

"github.com/uptrace/bun"
)

type Device struct {
bun.BaseModel `bun:"table:device,alias:dvc"`
ID int `json:"id" bun:"id,pk,autoincrement"`
DeviceID string `json:"device_id" bun:"device_id,notnull"`
JID string `json:"whatsapp_id" bun:"whatsapp_id,notnull,unique"`
PushName string `json:"push_name" bun:"push_name,notnull"`
BusinessName string `json:"business_name" bun:"business_name,"`
Timestamp time.Time `json:"timestamp" bun:"timestamp,notnull"`
Active bool `json:"active" bun:"active,notnull"`
CreatedAt time.Time `json:"created_at" bun:"created_at,notnull"`
}

type DeviceHandler struct {
bun.BaseModel `bun:"table:device_handler,alias:dev_hdl"`
Device
Active bool `json:"active" bun:"active,notnull"`
bun.BaseModel `bun:"table:device_handler,alias:dvc_hdl"`
ID int `json:"id" bun:"id,pk,autoincrement"`
DeviceID string `json:"device_id" bun:"device_id,notnull"`
Active bool `json:"active" bun:"active,notnull"`
ActiveAt time.Time `json:"timestamp" bun:"timestamp,notnull"`
InactiveAt time.Time `json:"inactive_at" bun:"inactive_at"`
Device *Device `bun:"rel:belongs-to,join:device_id=id"`
}

type DeviceWebhook struct {
bun.BaseModel `bun:"table:device_webhook,alias:dev_wh"`
bun.BaseModel `bun:"table:device_webhook,alias:dvc_wh"`
ID int `json:"id" bun:"id,pk,autoincrement"`
DeviceID string `json:"device_id" bun:"device_id,notnull"`
WebhookURL string `json:"webhook_url" bun:"webhook_url,notnull"`
Active bool `json:"active" bun:"active,notnull"`
Timestamp time.Time `json:"timestamp" bun:"timestamp,notnull"`
Device *Device `bun:"rel:belongs-to,join:device_id=id"`
}

func (m Device) DeviceID() string {
return strconv.Itoa(m.ID)
}
1 change: 1 addition & 0 deletions api/src/data/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package data

func TablesPostgres() []interface{} {
return []interface{}{
(*Device)(nil),
(*DeviceHandler)(nil),
(*DeviceWebhook)(nil),
(*WebhookMessage)(nil),
Expand Down
5 changes: 4 additions & 1 deletion api/src/data/webhook.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package data

import (
"github.com/uptrace/bun"
"time"

"github.com/uptrace/bun"
)

type WebhookMessage struct {
bun.BaseModel `bun:"table:webhook_message,alias:wh_msg"`
ID int `json:"id" bun:"id,pk,autoincrement"`
DeviceID string `json:"device_id" bun:"device_id,notnull"`
Message string `json:"message" bun:"message,notnull"`
Response string `json:"response" bun:"response,notnull"`
WebhookURL string `json:"webhook_url" bun:"webhook_url,notnull"`
CodeResponse int `json:"code_response" bun:"code_response,notnull"`
Timestamp time.Time `json:"timestamp" bun:"timestamp,notnull"`
}
137 changes: 89 additions & 48 deletions api/src/events/event.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,117 @@
package events

import (
"context"
"fmt"
"log"
"time"
"whatsgoingon/data"
"whatsgoingon/helpers"
myStore "whatsgoingon/store"

"github.com/ztrue/tracerr"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/types/events"

"whatsgoingon/data"
"whatsgoingon/helpers"
"whatsgoingon/store"
myStore "whatsgoingon/store"
)

// Initialize the listener by getting all device IDs and adding them to the listener.
func InitListener() {
helpers.BulkUpdateDeviceHandlerOff()
clientIds, err := helpers.GetAllClientIDs()
failOnError(err, "Get clientIds failed")
store.BulkUpdateDeviceHandlerOff()

whatsappIDs, err := helpers.GetAllWhatsappIDs()
helpers.FailOnError(err, "Get deviceIDs failed")

for _, clientId := range clientIds {
AddToListeners(clientId)
for _, jid := range whatsappIDs {
AddToListeners(jid)
}

}
// Add client to message listeners
func AddToListeners(whatsappID string) {
_, err := StartMessageListener(whatsappID)
if err != nil {
helpers.FailOnError(err, fmt.Sprintf("Error starting message listener for %s", whatsappID))
return
}

func failOnError(err error, msg string) {
device, err := store.GetDeviceByJID(whatsappID)
if err != nil {
log.Fatalf("%s: %s", msg, err)
helpers.FailOnError(err, fmt.Sprintf("Error getting device by JID: %v", whatsappID))
return
}

// Insert into table if the listener started successfully.
deviceHandler := &data.DeviceHandler{
DeviceID: device.DeviceID(),
ActiveAt: time.Now(),
Active: true,
}

if res, err := store.InsertIntoTable(deviceHandler); err != nil {
helpers.FailOnError(err, fmt.Sprintf("Error inserting into table: %v", res))
}
}

func AddToListeners(clientID string) {
err, client := StartMessageListener(clientID)
// Start the message listener for the device ID.
func StartMessageListener(whatsappID string) (*whatsmeow.Client, error) {
client, err := helpers.GetWhatsAppClientByJID(whatsappID)
if err != nil {
log.Printf("Error starting message listener for %s: %v", clientID, err)
} else {
res, err := helpers.InsertIntoTable(&data.DeviceHandler{
Device: data.Device{
DeviceID: clientID,
PushName: client.Store.PushName,
BusinessName: client.Store.BusinessName,
Timestamp: time.Now(),
},
Active: true,
})
if err != nil {
log.Printf("Error inserting into table: %v", err)
} else {
log.Printf("Inserted into table: %v", res)
}
helpers.FailOnError(err, fmt.Sprintf("Error getting client by ID: %v", whatsappID))
}

device, _ := store.GetDeviceByJID(whatsappID)
webhookUrl, webhookActive, _ := store.GetWebhookURLByDeviceID(device.DeviceID())

//Add event handler for incoming messages
client.AddEventHandler(func(evt interface{}) {
log.Printf("2 ---> Event received %T", evt)

// Handle the message event.
if msgEvent, ok := evt.(*events.Message); ok {
handleMessageEvent(msgEvent, client, device.DeviceID(), webhookUrl, webhookActive)
}
})

log.Printf("Starting message listener for %s", device.DeviceID())
return client, nil
}

func StartMessageListener(clientID string) (error, *whatsmeow.Client) {
client, err := helpers.GetClientById(clientID)
// handle the message event, save it to he store, and send async tasks
func handleMessageEvent(msgEvent *events.Message, client *whatsmeow.Client, deviceID, webhhookURL string, webhookActive bool) {
ctx := context.Background()
// Save message to store.
err, content := myStore.SaveMessage(*msgEvent, client)
if err != nil {
tracerr.Print(err)
return err, nil
} else {
webhookUrl, _ := helpers.GetWebhookURLForClientID(clientID)
client.AddEventHandler(func(evt interface{}) {
log.Printf("Event received %T", evt)
switch v := evt.(type) {
case *events.Message:
if err, content := myStore.SaveMessage(*v, client); err != nil {
tracerr.Print(err)
} else {
helpers.SendMessageToRedis(*content, clientID) // async
helpers.SendWebhook(*content, clientID, webhookUrl) // async
helpers.FailOnError(err, "Error saving message to store")
return
}

// Process sending to Redis and Webhook concurrently.
go helpers.SendMessageToRedis(ctx, *content, deviceID)
go helpers.SendWebhook(*content, deviceID, webhhookURL, webhookActive)
}

// NewClientHandler returns a function that handles the client events.
func NewClientHandler(client *whatsmeow.Client) func(interface{}) {
return func(evt interface{}) {
log.Printf("1 ---> Event received %T", evt)
if _, ok := evt.(*events.Connected); ok {
storeDevice := client.Store
device := &data.Device{
JID: storeDevice.ID.String(),
PushName: storeDevice.PushName,
BusinessName: storeDevice.BusinessName,
CreatedAt: time.Now(),
Active: true,
}
device, err := store.InsertDeviceIfNotExists(device)
if err != nil {
helpers.FailOnError(err, "Error inserting new device or device already exists")
return
}
})
log.Printf("Starting message listener for %s", clientID)
AddToListeners(device.JID)

}
}
return nil, client
}
}
43 changes: 0 additions & 43 deletions api/src/helpers/bun_device.go

This file was deleted.

50 changes: 0 additions & 50 deletions api/src/helpers/bun_pg.go

This file was deleted.

11 changes: 11 additions & 0 deletions api/src/helpers/error_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package helpers

import (
"log"
)

func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
10 changes: 10 additions & 0 deletions api/src/helpers/json_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package helpers

import (
"encoding/json"
"whatsgoingon/data"
)

func MarshalMessageToJSON(content data.StoredMessage) ([]byte, error) {
return json.Marshal(content)
}
Loading

0 comments on commit f8e81ae

Please sign in to comment.