Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Dingtalk Notification #96

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datahub-actions/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def get_long_description():
"executor = datahub_actions.plugin.action.execution.executor_action:ExecutorAction",
"slack = datahub_actions.plugin.action.slack.slack:SlackNotificationAction",
"teams = datahub_actions.plugin.action.teams.teams:TeamsNotificationAction",
"dingtalk_datahub = datahub_actions.plugin.action.dingtalk.dingtalk.DingtalkNotification",
"metadata_change_sync = datahub_actions.plugin.action.metadata_change_sync.metadata_change_sync:MetadataChangeSyncAction",
"tag_propagation = datahub_actions.plugin.action.tag.tag_propagation_action:TagPropagationAction",
"term_propagation = datahub_actions.plugin.action.term.term_propagation_action:TermPropagationAction",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import json
import logging

import requests
from datahub.configuration.common import ConfigModel
from datahub.metadata.schema_classes import EntityChangeEventClass as EntityChangeEvent
from pydantic.types import SecretStr
from ratelimit import limits, sleep_and_retry

from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.utils.datahub_util import DATAHUB_SYSTEM_ACTOR_URN
from datahub_actions.utils.social_util import (
get_message_from_entity_change_event,
get_welcome_message,
)

logger = logging.getLogger(__name__)


@sleep_and_retry
@limits(calls=1, period=1) # 1 call per second
def post_message(webhook_url, keyword, content):
headers = {"Content-Type": "application/json"}
data = {
"msgtype": "text",
"text": {"content": keyword + content},
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
logger.info("Message send successfully")
else:
logger.info("Message send failed")


class DingtalkNotificationConfig(ConfigModel):
webhook_url: SecretStr
keyword: SecretStr
default_channel: str
base_url: str = "http://localhost:9002/"
suppress_system_activity: bool = True


class DingtalkNotification(Action):
def name(self):
return "DingtalkNotification"

def close(self) -> None:
pass

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
action_config = DingtalkNotificationConfig.parse_obj(config_dict or {})
logger.info(f"Dingtalk notification action configured with {action_config}")
return cls(action_config, ctx)

def __init__(self, action_config: DingtalkNotificationConfig, ctx: PipelineContext):
self.action_config = action_config
self.ctx = ctx
post_message(
self.action_config.webhook_url.get_secret_value(),
self.action_config.keyword.get_secret_value(),
get_welcome_message(self.action_config.base_url).text,
)

def act(self, event: EventEnvelope) -> None:
try:
message = json.dumps(json.loads(event.as_json()), indent=4)
logger.debug(f"Received event: {message}")
if event.event_type == "EntityChangeEvent_v1":
assert isinstance(event.event, EntityChangeEvent)
if (
event.event.auditStamp.actor == DATAHUB_SYSTEM_ACTOR_URN
and self.action_config.suppress_system_activity
):
return None

semantic_message = get_message_from_entity_change_event(
event.event,
self.action_config.base_url,
self.ctx.graph.graph if self.ctx.graph else None,
channel="dingtalk",
)
post_message(
self.action_config.webhook_url.get_secret_value(),
self.action_config.keyword.get_secret_value(),
semantic_message,
)
else:
logger.debug("Skipping message because it didn't match our filter")
except Exception as e:
logger.debug("Failed to process event", e)
2 changes: 1 addition & 1 deletion datahub-actions/src/datahub_actions/utils/name_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class SchemaFieldNameResolver(DefaultNameResolver):
def get_entity_name(
self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph]
) -> str:
return DatasetUrn._get_simple_field_path_from_v2_field_path(
return DatasetUrn.get_simple_field_path_from_v2_field_path(
entity_urn.get_entity_id()[1]
)

Expand Down
8 changes: 4 additions & 4 deletions datahub-actions/tests/unit/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def test_create():
# Validate Pipeline is initialized
assert valid_pipeline.name is not None
assert valid_pipeline.source is not None
assert type(valid_pipeline.source) == TestEventSource
assert type(valid_pipeline.source) is TestEventSource
assert valid_pipeline.transforms is not None
assert len(valid_pipeline.transforms) == 2 # Filter + Custom
assert type(valid_pipeline.transforms[0]) == FilterTransformer
assert type(valid_pipeline.transforms[1]) == TestTransformer
assert type(valid_pipeline.transforms[0]) is FilterTransformer
assert type(valid_pipeline.transforms[1]) is TestTransformer
assert valid_pipeline.action is not None
assert type(valid_pipeline.action) == TestAction
assert type(valid_pipeline.action) is TestAction
assert valid_pipeline._shutdown is False
assert valid_pipeline._stats is not None
assert valid_pipeline._retry_count == 3
Expand Down
6 changes: 5 additions & 1 deletion docker/actions.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ SCHEMA_REGISTRY_URL=http://schema-registry:8081

# System Auth -- Required if Metadata Service Authentication is enabled.
# DATAHUB_SYSTEM_CLIENT_ID=__datahub_system
# DATAHUB_SYSTEM_CLIENT_SECRET=JohnSnowKnowsNothing
# DATAHUB_SYSTEM_CLIENT_SECRET=JohnSnowKnowsNothing

# DATAHUB_ACTIONS_DINGTALK_ENABLED
# DATAHUB_ACTIONS_DINGTALK_DATAHUB_WEBHOOK_URL
# DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL
38 changes: 38 additions & 0 deletions docker/config/dingtalk_action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2021 Acryl Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: datahub_dingtalk_action
enabled: ${DATAHUB_ACTIONS_DINGTALK_ENABLED:-false}
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
topic_routes:
mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1}
pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1}
action:
type: dingtalk_datahub
config:
# Action-specific configs (map)
webhook_url: ${DATAHUB_ACTIONS_DINGTALK_DATAHUB_WEBHOOK_URL}
base_url: ${DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL:-http://localhost:9002}
keyword: ${DATAHUB_ACTIONS_DINGTALK_KEYWORD:-Datahub}
default_channel: ${DATAHUB_ACTIONS_DINGTALK_CHANNEL:-dingtalk}
suppress_system_activity: ${DATAHUB_ACTIONS_DINGTALK_SUPPRESS_SYSTEM_ACTIVITY:-true}

datahub:
server: "http://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}"
extra_headers:
Authorization: "Basic ${DATAHUB_SYSTEM_CLIENT_ID:-__datahub_system}:${DATAHUB_SYSTEM_CLIENT_SECRET:-JohnSnowKnowsNothing}"
7 changes: 4 additions & 3 deletions docker/datahub-actions/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ ARG APP_ENV=prod

FROM acryldata/datahub-ingestion-base:latest as prod-install
COPY datahub-actions /actions-src
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
RUN mkdir -p /etc/datahub/actions && mkdir -p /tmp/datahub/logs/actions/system
RUN cd /actions-src && \
pip install "." && \
pip install '.[all]' && \
pip install "." -i https://pypi.tuna.tsinghua.edu.cn/simple && \
pip install '.[all]' -i https://pypi.tuna.tsinghua.edu.cn/simple && \
# This is required to fix security vulnerability in htrace-core4
rm -f /usr/local/lib/python3.10/site-packages/pyspark/jars/htrace-core4-4.1.0-incubating.jar

Expand All @@ -46,4 +47,4 @@ FROM ${APP_ENV}-install as final
USER datahub
RUN curl -s "https://get.sdkman.io" | bash
RUN /bin/bash -c "source /$HOME/.sdkman/bin/sdkman-init.sh; sdk version; sdk install java 8.0.332-zulu"
CMD dockerize -wait ${DATAHUB_GMS_PROTOCOL:-http}://$DATAHUB_GMS_HOST:$DATAHUB_GMS_PORT/health -timeout 240s /start_datahub_actions.sh
CMD dockerize -wait ${DATAHUB_GMS_PROTOCOL:-http}://$DATAHUB_GMS_HOST:$DATAHUB_GMS_PORT/health -timeout 240s /start_datahub_actions.sh
Loading