A framework that has the following features:
- Registration of a bunch of Task Orchestration flows of execution during startup.
- Synchronous execution of a Task
- Asynchronous execution, that will schedule the Task, based on the Orchestration registered
Use the following maven dependency for bare minimal framework:
<dependency>
<groupId>com.livetheoogway.teflon</groupId>
<artifactId>teflon-framework</artifactId>
<version>1.0.0</version>
</dependency>
Use the following maven dependency for actor based Scheduler:
<dependency>
<groupId>com.livetheoogway.teflon</groupId>
<artifactId>teflon-rmq-actor</artifactId>
<version>1.0.0</version>
</dependency>
You will find this framework useful if you:
- prefer to visualize any piece of work as a Task
- want to divide the Task into mutually exclusive components
- Want to mix, match, and more importantly, reuse components of one Task, in another Task
A TaskDeclaration is composed of the following Components:
- Name - The name that is going to uniquely identify the declaration. The declaration chosen while executing a Task, will depend on the name of the Task
- Source - A source that emits an Input / stream of Inputs
- Interpreter - The interpreter that takes the Input from the Source and emits an Output
- Sink - A sink that consumes the Output
When a task is being executed -
- Source, Interpreter, Sink are initiated.
- Inputs from Source are streamed (in batches)
- Batches are then passed onto the Interpreter
- The Interpreted elements are then passed onto the Sink for consumption
- All the while, Stats are collected as to how many elements were processed, time taken for execution, etc.
Define an implementation of Source.java
@SourceDeclaration(emits = Integer.class)
public class NumberStreamGenerator implements Source<Integer> {
int i = 0, max = 10;
@Override
public List<Integer> getInput() throws Exception {
if (i <= max)
return Collections.singletonList(i++);
return null;
}
}
Define an Interpreter
@InterpreterDeclaration(takes = Integer.class, emits = String.class)
public class IterationInterpreter implements Interpreter<Integer, String> {
@Override
public List<String> interpret(List<Integer> integer) {
return integer.stream().map(k->"Iteration: " + integer).collect(Collectors.toList());
}
}
Define a Sink
@SinkDeclaration(takes = String.class)
public class ConsoleSink implements Sink<String> {
@Override
public void sink(List<String> item) {
items.forEach(System.out::println);
}
}
An finally a TaskDeclaration
@TaskDeclaration(
name = "number-generator",
source = NumberStreamGenerator.class,
interpreter = IterationInterpreter.class,
sink = ConsoleSink.class,
factoryType = FactoryType.INJECTION)
class SomeTask implements Task {
@Override
public String name() {
return "number-generator";
}
...
}
Build A Scheduler that will allow you to trigger the task
TaskScheduler taskScheduler = TaskScheduler.builder()
.classPath("com.livetheoogway.teflon.framework.factory")
.injectorProvider(() -> Guice.createInjector(<your module>))
.build();
// run it
taskScheduler.trigger(new SomeTask());
// or schedule it
taskScheduler.schedule(new SomeTask(), new StatusConsumer(){});
taskScheduler.scheduleAtFixedRate(new SomeTask(), new StatusConsumer(){}, 0, 1, TimeUnit.SECONDS);
An integration with the Dropwizard RabbitMQ Bundle
@Singleton
@TaskDeclaration(
name = "pdf-statement",
source = QuerySource.class,
interpreter = PdfDocumentCreator.class,
sink = EmailSendSink.class,
factoryType = FactoryType.INJECTION
)
public class StatementEngine extends TaskActor<MessageIdType, PdfQueryStatementTask> {
@Inject
public StatementEngine(TaskScheduler taskScheduler,
TeflonConfig config,
RMQConnection connection,
ObjectMapper mapper) {
super(MessageIdType.QUERY_PDF_STATEMENT, taskScheduler, config, connection, mapper, PdfQueryStatementTask.class);
}
}
Now, every new message in the RMQ, will automatically be triggered, with corresponding Sources, Interpreters, Sinks involved.
Messages (ie Tasks) will get acked automatically, after the successful execution of the task.
If not, they will be rejected/sidelined accordingly
- Scheduled execution
- Queued Execution of Tasks using distributed zookeeper queues
- Typical Source implementations in separate modules (Hbase, Es, Redis, etc)
- Use artifactory based paths, for Sources