Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ability to work on device groups (partitions) #13

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 52 additions & 11 deletions samples/blocks/HttpOutputBlock.mon
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,33 @@ using apama.analyticsbuilder.Activation;
using apama.analyticsbuilder.ABConstants;
using apama.analyticsbuilder.L10N;
using apama.analyticsbuilder.Value;
using apama.analyticsbuilder.TimerParams;
using com.apama.util.AnyExtractor;

using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;


event HTTPHandler
{
string deviceId;
string host;
BlockBase base;
/** Handle the HTTP response.*/
action handleResponse(Response res) {
// $base.createTimer(0.01, res); // Creates a timer with the specified duration.
TimerParams tp := TimerParams.relative(0.01).withPayload(res).withPartition(deviceId);
base.createTimerWith(tp);

log "Called handleResponse: " + deviceId + "/" + res.payload.data.toString() at DEBUG;
if not res.isSuccess() {
log "Unable to connect " +host+". Error code: " + res.statusMessage at WARN;
}
}
}

/**
* Event definition of the parameters for the HTTP Output block.
*/
Expand Down Expand Up @@ -60,6 +80,16 @@ event HTTPOutput_$Parameters {
/**Default value for tlsEnabled.*/
constant boolean $DEFAULT_tlsEnabled := false;

/**
* Wrap body.
*
* If selected, the body will be wrapped as described above otherwise only the pure payload is sent.
*/
boolean wrapBody;

/**Default value for wrapBody.*/
constant boolean $DEFAULT_wrapBody := true;

/** Validate that the values for all the parameters have been provided. */
action $validate() {
BlockBase.throwsOnEmpty(host, "host", self);
Expand All @@ -77,9 +107,9 @@ event HTTPOutput_$Parameters {
* An example of HTTP request from the block:
* <code>
Content-Type: application/json

{
"modelName":"model_0",
"deviceId":"84588865186",
"value": {
"value":true,
"timestamp":"1563466239",
Expand Down Expand Up @@ -139,24 +169,30 @@ event HTTPOutput {
action $process(Activation $activation, Value $input_value, dictionary<string, any> $modelScopeParameters) {
string modelName := $modelScopeParameters.getOrDefault(ABConstants.MODEL_NAME_IDENTIFIER).valueToString();

any data := {"modelName":<any>modelName, "value":$input_value }; // $input_value is a Value object with fields value, timestamp, properties - this will be output as a JSON object.
any data;
string deviceId := AnyExtractor($activation.partition).getString("");

if $parameters.wrapBody {
data := {"modelName":<any>modelName, "value":$input_value, "deviceId": deviceId }; // $input_value is a Value object with fields value, timestamp, properties - this will be output as a JSON object.
} else {
if $input_value.properties.size() = 0 {
// if properties are empty the use the input value
data := $input_value.value;
} else {
data := $input_value.properties;
}
}


// Create the request event.
Request req := transport.createPOSTRequest($parameters.path, data);

// Execute the request and pass the callback action.
req.execute(handleResponse);
log "Processing for partition: " + deviceId at DEBUG;
req.execute(HTTPHandler(deviceId, $parameters.host, $base).handleResponse);
$base.profile(BlockBase.PROFILE_OUTPUT);
}

/** Handle the HTTP response.*/
action handleResponse(Response res) {
$base.createTimer(0.01, res); // Creates a timer with the specified duration.

if not res.isSuccess() {
log "Unable to connect " +$parameters.host+". Error code: " + res.statusMessage at WARN;
}
}

/**
* This action is called by the framework when the timer is triggered. The framework provides the value of the payload which was passed while creating the timer.
Expand All @@ -167,9 +203,14 @@ event HTTPOutput {
Response response := <Response> $payload;
dictionary<string, any> propertyValues := {};
any k;

log "Response raw: " + response.payload.data.toString() at DEBUG;
for k in response.payload.data.getKeys() {
propertyValues[k.valueToString()] := response.payload.data.getEntry(k);
}

log "Response parsed: " + propertyValues.toString() at DEBUG;

$setOutput_responseBody($activation, Value(true, $activation.timestamp, propertyValues));
$setOutput_statusCode($activation, response.statusCode.toFloat());
}
Expand Down