Skip to content

Commit

Permalink
Improve slack plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
slietar committed Jun 29, 2023
1 parent e0c61b4 commit 2d1abe2
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 128 deletions.
2 changes: 1 addition & 1 deletion units/slack/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"scripts": {
"build": "npm run build:js && (npm test || true)",
"build:js": "esbuild src/index.tsx --bundle --external:pr1 --external:react --format=esm --minify --outdir=../src/pr1_s3/client",
"build:js": "esbuild src/index.tsx --bundle --external:pr1 --external:react --format=esm --minify --outdir=../src/pr1_slack/client",
"test": "tsc"
},
"devDependencies": {
Expand Down
44 changes: 8 additions & 36 deletions units/slack/client/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,27 @@ import { PluginName, ProtocolBlockName } from 'pr1-shared';


export interface ProcessData {
bucket: DynamicValue;
multipart: DynamicValue;
target: DynamicValue;

}

export interface ProcessLocation {
paused: boolean;
phase: 'complete' | 'create' | 'done' | 'part_upload' | 'upload';
progress: number;
body: string;
fileCount: number;
phase: number;
}

export default {
namespace: ('s3' as PluginName),
blocks: {
['_' as ProtocolBlockName]: createProcessBlockImpl<ProcessData, ProcessLocation>({
Component(props) {
let progress = (() => {
switch (props.location.phase) {
case 'complete':
return 0.9;
case 'create':
return 0;
case 'done':
return 1;
case 'part_upload':
return 0.1 + props.location.progress * 0.8;
case 'upload':
return props.location.progress;
}
})();

return (
<div>
<ProgressBar
description={() => (
<ExpandableText expandedValue="100%">
{(progress * 100).toFixed() + '%'}
</ExpandableText>
)}
paused={props.location.paused}
value={progress} />
</div>
);
return <p>Progress: {props.location.phase}/{props.location.fileCount + 1}</p>
},
createFeatures(data, location) {
return [
{ icon: 'cloud_upload',
description: 'Upload to S3',
label: <>Bucket {formatDynamicValue(data.bucket)}</> }
{ icon: 'chat',
description: 'Slack message',
label: location?.body ?? '...' }
];
}
})
Expand Down
2 changes: 1 addition & 1 deletion units/slack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies=[
"slack_sdk~=3.21.3"
]

[project.entry-points."pr1.units"]
[project.entry-points."automancer.plugins"]
slack = "pr1_slack"

[tool.setuptools.packages.find]
Expand Down
10 changes: 5 additions & 5 deletions units/slack/src/pr1_slack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from importlib.resources import files

from pr1.units.base import Metadata, MetadataIcon, logger as parent_logger
import automancer as am


namespace = "slack"
namespace = am.PluginName("slack")
version = 0

metadata = Metadata(
metadata = am.Metadata(
description="Slack",
icon=MetadataIcon(kind='icon', value="chat"),
icon=am.MetadataIcon(kind='icon', value="chat"),
title="Slack",
version="1.0"
)

client_path = files(__name__ + '.client')
logger = parent_logger.getChild(namespace)
logger = am.logger.getChild(namespace)

from .parser import Parser
54 changes: 31 additions & 23 deletions units/slack/src/pr1_slack/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pr1.fiber.parser import BaseParser, ProcessTransformer

from . import namespace
from .process import Process
from .process import process


class Parser(BaseParser):
Expand All @@ -11,9 +11,9 @@ class Parser(BaseParser):
def __init__(self, fiber):
super().__init__(fiber)

self.transformers = [ProcessTransformer(Process, {
self.transformers = [ProcessTransformer(process, {
'slack_send': am.Attribute(
description="Sends a message to a Slack channel.",
description="Send a message to a Slack channel.",
type=am.EvaluableContainerType(
am.RecordType({
'body': am.Attribute(
Expand All @@ -40,27 +40,35 @@ def __init__(self, fiber):
})
),
default=list(),
description="A list of files to attach to the message."
description="A list of files to attach to the message.",
documentation=["The `files:write` scope is required when using this option."]
),
'settings': am.RecordType({
'channel_id': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The Slack channel id, such as `C04NP6J8EMV`."
),
'icon_url': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The URL of the icon to use for the message.",
default=None
),
'token': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The Slack API token."
),
'user_name': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The name of the user sending the message."
)
})
'settings': am.Attribute(
am.RecordType({
'channel_id': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The Slack channel id, such as `C04NP6J8EMV`."
),
'icon_url': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The URL of the icon to use for the message.",
default=None
),
'token': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The Slack API token, starting with `xoxb-` (for bot tokens), `xoxp-` (for user tokens) or `xapp-` (for app-level tokens).",
documentation=[
"See [Access tokens](https://api.slack.com/authentication/token-types) for details."
]
),
'user_name': am.Attribute(
am.PotentialExprType(am.StrType()),
description="The name of the user sending the message.",
documentation=["The `chat:write.customize` scope is required when using this option."]
)
}),
description="Settings for the message.",
)
}),
depth=2
)
Expand Down
106 changes: 44 additions & 62 deletions units/slack/src/pr1_slack/process.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,104 @@
import asyncio
from logging import Logger
import math
from asyncio import Event
from dataclasses import dataclass
from typing import Literal, Optional, Protocol
from typing import Any, Optional, Protocol

import pr1 as am
from pr1.fiber.expr import export_value
from pr1.fiber.process import (BaseProcess, BaseProcessPoint, ProcessExecEvent,
ProcessFailureEvent, ProcessPauseEvent, ProcessTerminationEvent)
from pr1.master.analysis import MasterAnalysis, MasterError
from pr1.util.asyncio import AsyncIteratorThread
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

from . import logger, namespace
from . import namespace


class SlackFile(Protocol):
contents: am.FileRef
contents: am.DataRef
format: Optional[str]
name: Optional[str]

class SlackSettings(Protocol):
channel_id: str
icon_url: Optional[str]
token: str
user_name: str
user_name: Optional[str]

class ProcessData(Protocol):
body: str
files: list[SlackFile]
settings: SlackSettings


class SlackError(MasterError):
def __init__(self, exception: Exception, /):
super().__init__(exception.args[0])

class SourceError(MasterError):
def __init__(self, exception: OSError, /):
super().__init__(str(exception))


@dataclass(kw_only=True)
@dataclass(frozen=True, kw_only=True, slots=True)
class ProcessLocation:
body: str
file_count: int
phase: int

def export(self):
return {
"body": self.body,
"fileCount": self.file_count,
"phase": self.phase
}

@dataclass(kw_only=True)
class ProcessPoint(BaseProcessPoint):
class ProcessPoint(am.BaseProcessPoint):
pass

@am.provide_logger(logger)
class Process(BaseProcess[ProcessData, ProcessPoint]):
class Process(am.BaseClassProcess[ProcessData, ProcessLocation, ProcessPoint]):
name = "_"
namespace = namespace

def __init__(self, data: ProcessData, /, master):
self._data = data
self._halted = False
self._resume_event: Optional[Event] = None
def duration(self, data):
return am.DurationTerm(5.0)

self._logger: Logger
def export_data(self, data):
return {
"body": "" # TODO: Fill in with something like export_value(data.get("body"))
}

async def run(self, point, stack):
client = WebClient(token=self._data.settings.token)
async def __call__(self, context: am.ProcessContext[ProcessData, ProcessLocation, ProcessPoint]):
client = WebClient(token=context.data.settings.token)
phase = 0

def create_location():
return ProcessLocation(
file_count=len(self._data.files),
def send_location():
context.send_location(ProcessLocation(
body=context.data.body,
file_count=len(context.data.files),
phase=phase
)
))

yield ProcessExecEvent(
location=create_location()
)
send_location()

try:
result = await asyncio.to_thread(lambda: client.chat_postMessage(
channel=self._data.settings.channel_id,
text=self._data.body
response = await asyncio.to_thread(lambda: client.chat_postMessage(
channel=context.data.settings.channel_id,
text=context.data.body,
username=context.data.settings.user_name
))

phase += 1
send_location()

for data_file in context.data.files:
with data_file.contents.open(text=False) as file:
# TODO: Test what happends when data_file.name is None

for data_file in self._data.files:
with data_file.contents.open("r") as file:
result = await asyncio.to_thread(lambda: client.files_upload(
channels=self._data.settings.channel_id,
response: Any = await asyncio.to_thread(lambda: client.files_upload(
channels=context.data.settings.channel_id,
file=file,
filename=data_file.name,
filetype=data_file.format # To be standardized
))

print(result)
context.send_effect(am.GenericEffect(
"Uploaded file to Slack",
description=am.RichText("Uploaded file ", am.RichTextLink(response['file']['title'], url=response['file']['permalink'])),
icon="upload_file"
))

phase += 1
send_location()
except SlackApiError as e:
yield ProcessFailureEvent(
analysis=MasterAnalysis(errors=[SlackError(e)]),
location=create_location()
)
else:
yield ProcessTerminationEvent(
# analysis=MasterAnalysis(effects=[Effect(...)]),
location=create_location()
)

@staticmethod
def export_data(data):
return {
"body": export_value(data['body'])
}
raise am.ProcessFailureError from e


process = Process()

0 comments on commit 2d1abe2

Please sign in to comment.