Skip to content

Commit

Permalink
Merge pull request #19 from misterbykl/pr-branch
Browse files Browse the repository at this point in the history
Merging PR for pluggable PII-Detection function enhancement to RTDL.
  • Loading branch information
dipanjanb authored Mar 16, 2022
2 parents 1d5cf01 + 2141313 commit ce4eaa5
Show file tree
Hide file tree
Showing 14 changed files with 599 additions and 0 deletions.
Binary file added pii-detection/.DS_Store
Binary file not shown.
2 changes: 2 additions & 0 deletions pii-detection/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target/
/pii-detection.iml
10 changes: 10 additions & 0 deletions pii-detection/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM maven:3.6.3-jdk-11 AS builder
COPY src /usr/src/app/src
COPY pom.xml /usr/src/app
RUN mvn -f /usr/src/app/pom.xml clean package

FROM openjdk:8
WORKDIR /
COPY --from=builder /usr/src/app/target/pii-detection*jar-with-dependencies.jar pii-detection.jar
EXPOSE 5000
CMD java -jar pii-detection.jar
88 changes: 88 additions & 0 deletions pii-detection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
## Pii Detection
This module adds pii detection functionality to rtdl system as a seperate stateful function.
PII detection algorithm identifies **SSN** and **phone number** patterns in the value and applies masking where
PII values are replaced with **###**

### How the module works?
* Message that's been received through `/ingest` function in _ingest-service.go_ is produced to Kafka topic, **ingress**.
* _pii-detection_ consumes Kafka topic, **pii-detection**
* `PiiDetectionFn` checks incoming message and identifies if it includes **SSN** and **phone number** patterns
* The result is produced to Kafka topic, **ingest**

### The flow
`ingest` -> `pii-detection` -> `statefun-functions`

### How to add stateful function (pii-detection) in the chain
In order to introduce a stateful function between the `ingest` and the `statefun-functions` services without
touching the code of either:

1. Introduce another topic in Kafka and let `stateful-functions` use that as the input topic rather than `ingress` (**pii-detection**)
2. Implement a stateful function that serves its own service which reads from `ingress` and
writes to the Kafka topic that `stateful-functions` is listening to (**PiiDetectionFn**)
3. Add **module.yaml** including ingress, egress configuration as below


kind: io.statefun.endpoints.v2/http
spec:
functions: com.rtdl.sf.pii/*
urlPathTemplate: http://pii-detection:5000/
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.rtdl.sf.pii/ingress
address: redpanda:29092
consumerGroupId: pii-group
startupPosition:
type: latest
topics:
- topic: ingress
valueType: com.rtdl.sf.pii/IncomingMessage
targets:
- com.rtdl.sf.pii/pii-detection
---
kind: io.statefun.kafka.v1/egress
spec:
id: com.rtdl.sf.pii/egress
address: redpanda:29092
deliverySemantic:
type: exactly-once
transactionTimeout: 5sec

4. Modify **docker-compose.yml** as below


statefun-worker:
......
volumes:
......
- ./pii-detection/module.yaml:/opt/statefun/modules/pii-detection/module.yaml
statefun-manager:
......
volumes:
......
- ./pii-detection/module.yaml:/opt/statefun/modules/pii-detection/module.yaml


pii-detection:
build:
context: ./pii-detection
expose:
- 5000
5. Add **Dockerfile** as below


FROM maven:3.6.3-jdk-11 AS builder
COPY src /usr/src/app/src
COPY pom.xml /usr/src/app
RUN mvn -f /usr/src/app/pom.xml clean package

FROM openjdk:8
WORKDIR /
COPY --from=builder /usr/src/app/target/pii-detection*jar-with-dependencies.jar pii-detection.jar
EXPOSE 5000
CMD java -jar pii-detection.jar
28 changes: 28 additions & 0 deletions pii-detection/module.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
kind: io.statefun.endpoints.v2/http
spec:
functions: com.rtdl.sf.pii/*
urlPathTemplate: http://pii-detection:5000/
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.rtdl.sf.pii/ingress
address: redpanda:29092
consumerGroupId: pii-group
startupPosition:
type: latest

topics:
- topic: ingress
valueType: com.rtdl.sf.pii/IncomingMessage
targets:
- com.rtdl.sf.pii/pii-detection
---
kind: io.statefun.kafka.v1/egress
spec:
id: com.rtdl.sf.pii/egress
address: redpanda:29092
deliverySemantic:
type: exactly-once
transactionTimeout: 5sec
152 changes: 152 additions & 0 deletions pii-detection/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.rtdl.sf</groupId>
<artifactId>pii-detection</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>

<properties>
<statefun.version>3.2.0</statefun.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<!-- StateFun Java SDK -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-java</artifactId>
<version>${statefun.version}</version>
</dependency>

<!-- For custom type serialization (JSON and Protobuf) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.1</version>
</dependency>

<!-- Undertow web server -->
<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-core</artifactId>
<version>2.2.16.Final</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.11</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Build a fat executable jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.rtdl.sf.piidetection.PiiDetectionAppServer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Java code style -->
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>1.20.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
<removeUnusedImports/>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Protoc plugin to auto-generate Protobuf message files -->
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.1</version>
<executions>
<execution>
<id>generate-protobuf-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<includeStdTypes>true</includeStdTypes>
<protocVersion>3.11.4</protocVersion>
<cleanOutputFolder>true</cleanOutputFolder>
<outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Binary file added pii-detection/src/.DS_Store
Binary file not shown.
Binary file added pii-detection/src/main/.DS_Store
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.rtdl.sf.piidetection;

import com.rtdl.sf.piidetection.function.PiiDetectionFn;
import com.rtdl.sf.piidetection.undertow.UndertowHttpHandler;
import io.undertow.Undertow;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.StatefulFunctions;
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;

/**
* Pii detection main class.
* <p>
* - Defines stateful function type - PII_TYPE
* - Registers statefun function
* - Obtains a request-reply handler based on the spec
* - Builds HTTP server that hands off the request-body to the handler
*/
public class PiiDetectionAppServer {

public static void main(String[] args) {

StatefulFunctionSpec spec =
StatefulFunctionSpec.builder(PiiDetectionFn.PII_TYPE)
.withSupplier(PiiDetectionFn::new)
.build();

final StatefulFunctions functions = new StatefulFunctions();
functions.withStatefulFunction(spec);

final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();

final Undertow httpServer =
Undertow.builder()
.addHttpListener(5000, "0.0.0.0")
.setHandler(new UndertowHttpHandler(requestReplyHandler))
.build();

httpServer.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.rtdl.sf.piidetection.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rtdl.sf.piidetection.types.IncomingMessage;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.CompletableFuture;

/**
* PII detection stateful function class that implements the StatefulFunction interface.
* <p>
* - Checks if the incoming message is a type of IncomingMessage
* - Checks if the message_type is 'rtdl_205' (control-message)
* - If the message_type is NOT 'rtdl_205', then identifies SSN and phone number patterns in the value and applies masking
* - Produces the result into the Kafka topic, 'pii-detection'
*/
public class PiiDetectionFn implements StatefulFunction {

public static final TypeName PII_TYPE = TypeName.typeNameFromString("com.rtdl.sf.pii/pii-detection");
public static final TypeName PII_EGRESS = TypeName.typeNameFromString("com.rtdl.sf.pii/egress");
private static final Logger LOG = LoggerFactory.getLogger(PiiDetectionFn.class);

@Override
public CompletableFuture<Void> apply(Context context, Message message) {
try {
if (!message.is(IncomingMessage.TYPE)) {
LOG.error("Unknown type");
throw new IllegalStateException("Unknown type");
}

IncomingMessage incomingMessage = message.as(IncomingMessage.TYPE);
byte[] byteValue;

if ("rtdl_205".equalsIgnoreCase(incomingMessage.getMessage_type())) {
LOG.info("Incoming message type is 'rtdl_205' (control-message).");
byteValue = message.rawValue().toByteArray();
} else {
ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(incomingMessage);
String maskedJsonString = new PiiDetector().maskPII(jsonString);
IncomingMessage outgoingMessage = mapper.readValue(maskedJsonString, IncomingMessage.class);
byteValue = new ObjectMapper().writeValueAsBytes(outgoingMessage);
}

context.send(
KafkaEgressMessage.forEgress(PII_EGRESS)
.withTopic("pii-detection")
.withUtf8Key("message")
.withValue(byteValue)
.build());

} catch (Exception e) {
StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
LOG.error(sw.toString());
}
}

return context.done();
}
}
Loading

0 comments on commit ce4eaa5

Please sign in to comment.