Skip to content

Commit

Permalink
Working threads, IM rooms, and backfill commands. Reactions still nee…
Browse files Browse the repository at this point in the history
…d work
  • Loading branch information
Pinny Markowitz committed Feb 23, 2024
1 parent 56b904f commit 58c247a
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 5 deletions.
49 changes: 49 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"fmt"
"go.mau.fi/mautrix-slack/database"
"net/url"
"strings"

Expand All @@ -40,6 +41,8 @@ func (br *SlackBridge) RegisterCommands() {
cmdLogout,
cmdSyncTeams,
cmdDeletePortal,
cmdBackfillPortal,
cmdBackfillAllPortals,
)
}

Expand Down Expand Up @@ -197,3 +200,49 @@ func fnDeletePortal(ce *WrappedCommandEvent) {
ce.Portal.cleanup(false)
ce.Log.Infofln("Deleted portal")
}

var cmdBackfillPortal = &commands.FullHandler{
Func: wrapCommand(fnBackfillPortal),
Name: "backfill-portal",
RequiresPortal: true,
}

func fnBackfillPortal(ce *WrappedCommandEvent) {
userTeam := ce.User.GetUserTeam(ce.Portal.Key.TeamID)
ce.User.establishTeamClient(userTeam)

backfillSinglePortal(ce.Portal, userTeam)
ce.Log.Infofln("Backfilled portal")
}

var cmdBackfillAllPortals = &commands.FullHandler{
Func: wrapCommand(fnBackfillAllPortals),
Name: "backfill-all-portals",
RequiresLogin: true,
}

func fnBackfillAllPortals(ce *WrappedCommandEvent) {
if len(ce.Args) != 2 {
ce.Reply("**Usage**: $cmdprefix backfill-all-portals <email> <domain>")
return
}

domain := strings.TrimSuffix(ce.Args[1], ".slack.com")
userTeam := ce.Bridge.DB.UserTeam.GetBySlackDomain(ce.User.MXID, ce.Args[0], domain)
ce.User.establishTeamClient(userTeam)

portals := ce.Bridge.DB.Portal.GetAllForUserTeam(userTeam.Key)
portalCount := len(portals)
for i, dbPortal := range portals {
portal := ce.Bridge.GetPortalByID(dbPortal.Key)
backfillSinglePortal(portal, userTeam)
ce.Log.Infofln("Completed backfill for %d of %d portals", i+1, portalCount)
}
}

func backfillSinglePortal(portal *Portal, userTeam *database.UserTeam) {
portal.slackMessageLock.Lock()
defer portal.slackMessageLock.Unlock()

portal.traditionalBackfill(userTeam)
}
13 changes: 13 additions & 0 deletions database/messagequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,16 @@ func (mq *MessageQuery) GetLast(key PortalKey) *Message {

return mq.New().Scan(row)
}

func (mq *MessageQuery) ClearAllForPortal(key PortalKey) error {
query := "DELETE FROM message WHERE team_id=$1 AND channel_id=$2"

resp, err := mq.db.Exec(query, key.TeamID, key.ChannelID)
if err != nil {
mq.log.Errorfln("failed to clear all message rows for portal: %#v", key)
return err
}
rowCount, _ := resp.RowsAffected()
mq.log.Debugfln("cleared %d message rows for portal: %#v", rowCount, key)
return nil
}
83 changes: 80 additions & 3 deletions historysync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,38 @@ func (bridge *SlackBridge) handleHistorySyncsLoop() {
}

func (portal *Portal) traditionalBackfill(userTeam *database.UserTeam) (int, error) {
portal.log.Errorfln("Starting backfill for portal: %#v", portal.Key)
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()

// removing prior tracking records for this Team
txn, err := portal.bridge.DB.Begin()
if err == nil {
err = portal.bridge.DB.Message.ClearAllForPortal(portal.Key)
}
if err == nil {
err = txn.Commit()
}
if err != nil {
portal.log.Errorfln("Error while clearing existing messages for portal: %#v", err)
return 0, err
}

// collect only latest timestamp of each batch as ConversationBatch
batches, err := portal.collectBatchesForTraditionalBackfill(userTeam)
if err != nil {
return 0, err
}

// backfilling requires the bot to be present
if portal.IsPrivateChat() {
err := portal.bridge.Bot.EnsureJoined(portal.MXID, appservice.EnsureJoinedParams{BotOverride: portal.MainIntent().Client})
if err != nil {
portal.log.Errorln("Error while ensure join bot for backfilling IM")
return 0, err
}
}

threadCollection := &ThreadCollection{}
heap.Init(threadCollection)
idx := 0
Expand Down Expand Up @@ -234,6 +260,10 @@ func (portal *Portal) traditionalBackfillSingleMessage(userTeam *database.UserTe
return nil
}
converted := portal.ConvertSlackMessage(userTeam, &message.Msg)
convertedContent := converted.Event
if convertedContent != nil && converted.SlackTimestamp != converted.SlackThreadTs {
portal.addThreadMetadata(convertedContent, converted.SlackThreadTs)
}
ts := parseSlackTimestamp(converted.SlackTimestamp)

converted.SlackReactions = message.Reactions
Expand All @@ -248,11 +278,13 @@ func (portal *Portal) traditionalBackfillSingleMessage(userTeam *database.UserTe
return nil
}
intent := puppet.IntentFor(portal)

var fileEventIDs []id.EventID
for _, file := range converted.FileAttachments {
resp, err := portal.sendMatrixMessage(intent, event.EventMessage, file.Event, nil, ts.UnixMilli())
if err != nil {
portal.log.Errorfln("Error while backfilling attached file: %#v", file)
portal.log.Errorfln("Error details: %#v", err)
return err
}
fileEventIDs = append(fileEventIDs, resp.EventID)
Expand All @@ -262,28 +294,73 @@ func (portal *Portal) traditionalBackfillSingleMessage(userTeam *database.UserTe
resp, err := portal.sendMatrixMessage(intent, event.EventMessage, converted.Event, nil, ts.UnixMilli())
if err != nil {
portal.log.Errorfln("Error while backfilling message: %#v", converted)
portal.log.Errorfln("Error details: %#v", err)
return err
}
eventId = resp.EventID
}

//for _, reaction := range converted.SlackReactions {
// slackReaction := strings.Trim(reaction.Name, ":")
// emoji := portal.bridge.GetEmoji(slackReaction, userTeam)
//
// for _, user := range reaction.Users {
// var content event.ReactionEventContent
// content.RelatesTo = event.RelatesTo{
// Type: event.RelAnnotation,
// EventID: eventId,
// Key: emoji,
// }
// extraContent := map[string]any{}
// if strings.HasPrefix(emoji, "mxc://") {
// extraContent["fi.mau.slack.reaction"] = map[string]any{
// "name": slackReaction,
// "mxc": emoji,
// }
// if !portal.bridge.Config.Bridge.CustomEmojiReactions {
// content.RelatesTo.Key = slackReaction
// }
// }
// reactionPuppet := portal.bridge.GetPuppetByID(portal.Key.TeamID, user)
// if reactionPuppet == nil {
// portal.log.Errorfln("Not backfilling reaction: can't find puppet for Slack user %s", user)
// continue
// }
// reactionPuppet.UpdateInfo(userTeam, true, nil)
// eventContent := event.Content{
// Raw: map[string]interface{}{},
// Parsed: content,
// }
// if reactionPuppet.CustomMXID != "" {
// eventContent.Raw[doublePuppetKey] = doublePuppetValue
// }
// intent := reactionPuppet.IntentFor(portal)
// intent.SendMassagedMessageEvent(portal.MXID, event.EventReaction, eventContent, ts.UnixMilli())
// }
//}

txn, err := portal.bridge.DB.Begin()
if err != nil {
return err
}

effectiveThreadTimeStamp := ""
if converted.SlackTimestamp != converted.SlackThreadTs {
effectiveThreadTimeStamp = converted.SlackThreadTs
}

for ix, file := range converted.FileAttachments {
attachment := portal.bridge.DB.Attachment.New()
attachment.Channel = portal.Key
attachment.SlackFileID = file.SlackFileID
attachment.SlackMessageID = converted.SlackTimestamp
attachment.MatrixEventID = fileEventIDs[ix]
attachment.SlackThreadID = converted.SlackThreadTs
attachment.SlackThreadID = effectiveThreadTimeStamp
attachment.Insert(txn)
}
if converted.Event != nil {
portal.log.Debugfln("Committing convertedMessage: %#v", converted)
portal.markMessageHandled(txn, converted.SlackTimestamp, converted.SlackThreadTs, eventId, converted.SlackAuthor)
portal.markMessageHandled(txn, converted.SlackTimestamp, effectiveThreadTimeStamp, eventId, converted.SlackAuthor)
}

portal.Update(txn)
Expand Down Expand Up @@ -653,7 +730,7 @@ func (portal *Portal) backfill(userTeam *database.UserTeam, messages []slack.Mes
Content: event.Content{Parsed: &content},
})
}

if len(req.Events) == 0 {
return nil, nil
}
Expand Down
15 changes: 13 additions & 2 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,7 @@ func (user *User) slackMessageHandler(userTeam *database.UserTeam) {
user.BridgeStates[userTeam.Key.TeamID].Send(status.BridgeState{StateEvent: status.StateUnknownError, Message: "Disconnected for unknown reason"})
}

func (user *User) connectTeam(userTeam *database.UserTeam) error {
user.log.Infofln("Connecting %s to Slack userteam %s (%s)", user.MXID, userTeam.Key, userTeam.TeamName)
func (user *User) establishTeamClient(userTeam *database.UserTeam) {
slackOptions := []slack.Option{
slack.OptionLog(SlackgoLogger{user.log.Sub(fmt.Sprintf("SlackGo/%s", userTeam.Key))}),
//slack.OptionDebug(user.bridge.Config.Logging.PrintLevel <= 0),
Expand All @@ -528,6 +527,11 @@ func (user *User) connectTeam(userTeam *database.UserTeam) error {
slackOptions = append(slackOptions, slack.OptionCookie("d", userTeam.CookieToken))
}
userTeam.Client = slack.New(userTeam.Token, slackOptions...)
}

func (user *User) connectTeam(userTeam *database.UserTeam) error {
user.log.Infofln("Connecting %s to Slack userteam %s (%s)", user.MXID, userTeam.Key, userTeam.TeamName)
user.establishTeamClient(userTeam)

// test Slack connection before trying to go further
_, err := userTeam.Client.GetUserProfile(&slack.GetUserProfileParameters{})
Expand Down Expand Up @@ -632,6 +636,13 @@ func (user *User) SyncPortals(userTeam *database.UserTeam, force bool) error {
}
}

//for _, dbPortal := range portals {
// portal := user.bridge.GetPortalByID(dbPortal.Key)
// if portal.IsPrivateChat() {
// portal.traditionalBackfill(userTeam)
// }
//}

return nil
}

Expand Down

0 comments on commit 58c247a

Please sign in to comment.