Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka messages consumer to Framework #2624

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/proto-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
cp aliecs/core/protos/o2control.proto Control/protobuf
cp aliecs/common/protos/events.proto Control/protobuf/protos
cp aliecs/common/protos/common.proto Control/protobuf/protos/protos
cp aliecs/common/protos/events.proto Framework/Backend/protos
cp aliecs/common/protos/common.proto Framework/Backend/protos
cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto
rm -rf aliecs
- name: Check if there are any differences and create PR
Expand Down
7 changes: 7 additions & 0 deletions Framework/Backend/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const {
} = require('./errors/updateAndSendExpressResponseFromNativeError.js');
const { Logger } = require('./log/Logger');

const { getWebUiProtoIncludeDir } = require('./protobuf/getWebUiProtoIncludeDir');
const { AliEcsEventMessagesConsumer } = require('./kafka/AliEcsEventMessagesConsumer.js');

exports.ConsulService = ConsulService;

exports.HttpServer = HttpServer;
Expand Down Expand Up @@ -82,3 +85,7 @@ exports.UnauthorizedAccessError = UnauthorizedAccessError;
exports.grpcErrorToNativeError = grpcErrorToNativeError;

exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError;

exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir;

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
62 changes: 62 additions & 0 deletions Framework/Backend/kafka/AliEcsEventMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
const protobuf = require('protobufjs');
const path = require('node:path');
const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js');
const { getWebUiProtoIncludeDir } = require('../protobuf/getWebUiProtoIncludeDir.js');

// Customize protobuf loader to set the import directory, protobuf do not allow to do so...
const root = new protobuf.Root();
root.resolvePath = (origin, target) => {
if (path.isAbsolute(target)) {
return target;
}

return path.join(getWebUiProtoIncludeDir(), target);
};

root.loadSync(path.resolve(__dirname, '../protobuf/protos/events.proto'));
const EventMessage = root.lookupType('events.Event');

/**
* @callback MessageReceivedCallback
* @param {EventMessage} message received message
* @return {Promise<void>}
*/

/**
* Consumer that consume ECS event messages and pass them to previously-registered listeners
*/
class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {
super(kafkaClient, groupId, topics, EventMessage);
}

/**
* @inheritDoc
*/
getLoggerLabel() {
return 'ALI-ECS-EVENT-CONSUMER';
}
}

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;

exports.EventMessage = EventMessage;
98 changes: 98 additions & 0 deletions Framework/Backend/kafka/KafkaMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
const { LogManager } = require('../log/LogManager.js');

/**
* Generic Kafka Message consumer extracting objects according to a protobuf definition
* @template T extends import('protobufjs').Type
*/
class KafkaMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
* @param {import('protobufjs').Type} protoType the type definition of the handled message
*/
constructor(kafkaClient, groupId, topics, protoType) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;
this._protoType = protoType;

/**
* @type {MessageReceivedCallback[]}
* @private
*/
this._listeners = [];

this._logger = LogManager.getLogger(this.getLoggerLabel());
}

/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = this._protoType.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
return;

Check warning on line 56 in Framework/Backend/kafka/KafkaMessagesConsumer.js

View check run for this annotation

Codecov / codecov/patch

Framework/Backend/kafka/KafkaMessagesConsumer.js#L55-L56

Added lines #L55 - L56 were not covered by tests
}
this._logger.debugMessage(`Received message on ${topic}`);

try {
await this._handleEvent(this._protoType.toObject(
this._protoType.decode(message.value),
{ enums: String },
));
} catch (error) {
this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`);

Check warning on line 66 in Framework/Backend/kafka/KafkaMessagesConsumer.js

View check run for this annotation

Codecov / codecov/patch

Framework/Backend/kafka/KafkaMessagesConsumer.js#L66

Added line #L66 was not covered by tests
}
},
});
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {T} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
try {
await listener(message);
} catch (error) {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);

Check warning on line 83 in Framework/Backend/kafka/KafkaMessagesConsumer.js

View check run for this annotation

Codecov / codecov/patch

Framework/Backend/kafka/KafkaMessagesConsumer.js#L83

Added line #L83 was not covered by tests
}
}
}

/**
* Return the label to be used by the logger
*
* @return {string} the logger label
*/
getLoggerLabel() {
return 'EVENT-CONSUMER';

Check warning on line 94 in Framework/Backend/kafka/KafkaMessagesConsumer.js

View check run for this annotation

Codecov / codecov/patch

Framework/Backend/kafka/KafkaMessagesConsumer.js#L94

Added line #L94 was not covered by tests
}
}

exports.KafkaMessagesConsumer = KafkaMessagesConsumer;
1 change: 1 addition & 0 deletions Framework/Backend/protobuf/getWebUiProtoIncludeDir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
exports.getWebUiProtoIncludeDir = () => __dirname;
48 changes: 48 additions & 0 deletions Framework/Backend/protobuf/protos/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2024 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/


syntax = "proto3";

package common;
option java_package = "ch.cern.alice.o2.control.common";
option go_package = "github.com/AliceO2Group/Control/common/protos;pb";

//////////////// Common types ///////////////

message User {
// The unique CERN identifier of this user.
optional int32 externalId = 1;
// The unique identifier of this entity.
optional int32 id = 2;
// Name of the user.
string name = 3;
}

message WorkflowTemplateInfo {
string name = 1;
string description = 2;
string path = 3;
bool public = 4; // whether the environment is public or not
}
148 changes: 148 additions & 0 deletions Framework/Backend/protobuf/protos/events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2024 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

syntax = "proto3";

package events;
option java_package = "ch.cern.alice.o2.control.events";
option go_package = "github.com/AliceO2Group/Control/common/protos;pb";

import public "protos/common.proto";

//////////////// Common event messages ///////////////

enum OpStatus {
NULL = 0;
STARTED = 1;
ONGOING = 2;
DONE_OK = 3;
DONE_ERROR = 4;
DONE_TIMEOUT = 5;
}

message Ev_MetaEvent_MesosHeartbeat {
}

message Ev_MetaEvent_CoreStart {
string frameworkId = 1;
}

message Ev_MetaEvent_FrameworkEvent {
string frameworkId = 1;
string message = 2;
}

message Ev_EnvironmentEvent {
string environmentId = 1;
string state = 2;
uint32 runNumber = 3; // only when the environment is in the running state
string error = 4;
string message = 5; // any additional message concerning the current state or transition
string transition = 6;
string transitionStep = 7;
OpStatus transitionStatus = 8;
map<string, string> vars = 9; // consolidated environment variables at the root role of the environment
common.User lastRequestUser = 10;
common.WorkflowTemplateInfo workflowTemplateInfo = 11;
}

message Traits {
string trigger = 1;
string await = 2;
string timeout = 3;
bool critical = 4;
}

message Ev_TaskEvent {
string name = 1; // task name, based on the name of the task class
string taskid = 2; // task id, unique
string state = 3; // state machine state for this task
string status = 4; // active/inactive etc.
string hostname = 5;
string className = 6; // name of the task class from which this task was spawned
Traits traits = 7;
string environmentId = 8;
string path = 9; // path to the parent taskRole of this task within the environment
}

message Ev_CallEvent {
string func = 1; // name of the function being called, within the workflow template context
OpStatus callStatus = 2; // progress or success/failure state of the call
string return = 3; // return value of the function
Traits traits = 4;
string output = 5; // any additional output of the function
string error = 6; // error value, if returned
string environmentId = 7;
string path = 8; // path to the parent callRole of this call within the environment
}

message Ev_RoleEvent {
string name = 1; // role name
string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles
string state = 3; // state machine state for this role
string rolePath = 4; // path to this role within the environment
string environmentId = 5;
}

message Ev_IntegratedServiceEvent {
string name = 1; // name of the context, usually the path of the callRole that calls a given integrated service function e.g. readout-dataflow.dd-scheduler.terminate
string error = 2; // error message, if any
string operationName = 3; // name of the operation, usually the name of the integrated service function being called e.g. ddsched.PartitionTerminate()"
OpStatus operationStatus = 4; // progress or success/failure state of the operation
string operationStep = 5; // if the operation has substeps, this is the name of the current substep, like an API call or polling phase
OpStatus operationStepStatus = 6; // progress or success/failure state of the current substep
string environmentId = 7;
string payload = 8; // any additional payload, depending on the integrated service; there is no schema, it can even be the raw return structure of a remote API call
}

message Ev_RunEvent {
string environmentId = 1;
uint32 runNumber = 2;
string state = 3;
string error = 4;
string transition = 5;
OpStatus transitionStatus = 6;
map<string, string> vars = 7;
common.User lastRequestUser = 8;
}

message Event {
int64 timestamp = 1;
int64 timestampNano = 2;
reserved 3 to 10;
reserved 17 to 100;

oneof Payload {
Ev_EnvironmentEvent environmentEvent = 11;
Ev_TaskEvent taskEvent = 12;
Ev_RoleEvent roleEvent = 13;
Ev_CallEvent callEvent = 14;
Ev_IntegratedServiceEvent integratedServiceEvent = 15;
Ev_RunEvent runEvent = 16;

Ev_MetaEvent_FrameworkEvent frameworkEvent = 101;
Ev_MetaEvent_MesosHeartbeat mesosHeartbeatEvent = 102;
Ev_MetaEvent_CoreStart coreStartEvent = 103;
}
}
Loading
Loading