Skip to content

Commit

Permalink
Adding virtual threads as default thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
switschel committed Oct 29, 2024
1 parent 1b3f8c1 commit 02c1a71
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 121 deletions.
15 changes: 5 additions & 10 deletions dynamic-mapping-service/src/main/java/dynamic/mapping/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import dynamic.mapping.model.MappingTreeNode;
import dynamic.mapping.model.MappingTreeNodeSerializer;
Expand Down Expand Up @@ -98,17 +99,11 @@ public TaskExecutor taskExecutor() {
executor.setQueueCapacity(25);
return executor;
}
//Assuming we can process 25 messages in parallel per CPU-Core
@Bean("processingCachePool")
public ExecutorService processingThreadPool() {
return Executors.newCachedThreadPool();
//return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*25);
}

//Assuming we can process 10 messages in parallel per CPU-Core
@Bean("cachedThreadPool")
public ExecutorService cachedThreadPool() {
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*10);
@Bean("virtThreadPool")
public ExecutorService virtThreadPool() {
final ThreadFactory factory = Thread.ofVirtual().name("virtThread-",0).factory();
return Executors.newThreadPerTaskExecutor(factory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public abstract class AConnectorClient {
protected ConfigurationRegistry configurationRegistry;

@Getter
protected ExecutorService cachedThreadPool;
protected ExecutorService virtThreadPool;

private Future<?> connectTask;
private ScheduledExecutorService housekeepingExecutor = Executors
Expand Down Expand Up @@ -170,7 +170,7 @@ public void submitInitialize() {
// test if init task is still running, then we don't need to start another task
log.debug("Tenant {} - Called initialize(): {}", tenant, initializeTask == null || initializeTask.isDone());
if ((initializeTask == null || initializeTask.isDone())) {
initializeTask = cachedThreadPool.submit(() -> initialize());
initializeTask = virtThreadPool.submit(() -> initialize());
}
}

Expand Down Expand Up @@ -198,7 +198,7 @@ public void submitConnect() {
log.debug("Tenant {} - Called connect(): connectTask.isDone() {}", tenant,
connectTask == null || connectTask.isDone());
if (connectTask == null || connectTask.isDone()) {
connectTask = cachedThreadPool.submit(() -> connect());
connectTask = virtThreadPool.submit(() -> connect());
}
}

Expand All @@ -209,7 +209,7 @@ public void submitDisconnect() {
log.debug("Tenant {} - Called submitDisconnect(): connectTask.isDone() {}", tenant,
connectTask == null || connectTask.isDone());
if (connectTask == null || connectTask.isDone()) {
connectTask = cachedThreadPool.submit(() -> disconnect());
connectTask = virtThreadPool.submit(() -> disconnect());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public KafkaClient(ConfigurationRegistry configurationRegistry,
this.connectorIdent = connectorConfiguration.ident;
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.ident);
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public MQTTClient(ConfigurationRegistry configurationRegistry,
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.ident);
// this.connectorType = connectorConfiguration.connectorType;
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public MQTTServiceClient(ConfigurationRegistry configurationRegistry,
this.connectorName = connectorConfiguration.name;
this.connectorStatus = ConnectorStatusEvent.unknown(connectorConfiguration.name, connectorConfiguration.ident);
this.c8yAgent = configurationRegistry.getC8yAgent();
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.objectMapper = configurationRegistry.getObjectMapper();
this.additionalSubscriptionIdTest = additionalSubscriptionIdTest;
this.mappingServiceRepresentation = configurationRegistry.getMappingServiceRepresentations().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public class BootstrapService {
@Autowired
ConnectorConfigurationComponent connectorConfigurationComponent;

@Qualifier("cachedThreadPool")
private ExecutorService cachedThreadPool;
@Qualifier("virtThreadPool")
private ExecutorService virtThreadPool;

@Autowired
public void setCachedThreadPool(ExecutorService cachedThreadPool) {
this.cachedThreadPool = cachedThreadPool;
public void setVirtThreadPool(ExecutorService virtThreadPool) {
this.virtThreadPool = virtThreadPool;
}

@Value("${APP.additionalSubscriptionIdTest}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,62 +331,6 @@ public AConnectorClient.Certificate loadCertificateByName(String certificateName
return null;
}
}
//TODO Change this to use ExecutorService + Virtual Threads when available
public CompletableFuture<AbstractExtensibleRepresentation> createMEAOAsync(ProcessingContext<?> context)
throws ProcessingException {
return CompletableFuture.supplyAsync(() -> {
String tenant = context.getTenant();
StringBuffer error = new StringBuffer("");
C8YRequest currentRequest = context.getCurrentRequest();
String payload = currentRequest.getRequest();
API targetAPI = context.getMapping().getTargetAPI();
AbstractExtensibleRepresentation result = subscriptionsService.callForTenant(tenant, () -> {
MicroserviceCredentials contextCredentials = removeAppKeyHeaderFromContext(contextService.getContext());
return contextService.callWithinContext(contextCredentials, () -> {
AbstractExtensibleRepresentation rt = null;
try {
if (targetAPI.equals(API.EVENT)) {
EventRepresentation eventRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
EventRepresentation.class);
rt = eventApi.create(eventRepresentation);
log.info("Tenant {} - New event posted: {}", tenant, rt);
} else if (targetAPI.equals(API.ALARM)) {
AlarmRepresentation alarmRepresentation = configurationRegistry.getObjectMapper().readValue(
payload,
AlarmRepresentation.class);
rt = alarmApi.create(alarmRepresentation);
log.info("Tenant {} - New alarm posted: {}", tenant, rt);
} else if (targetAPI.equals(API.MEASUREMENT)) {
MeasurementRepresentation measurementRepresentation = jsonParser
.parse(MeasurementRepresentation.class, payload);
rt = measurementApi.create(measurementRepresentation);
log.info("Tenant {} - New measurement posted: {}", tenant, rt);
} else if (targetAPI.equals(API.OPERATION)) {
OperationRepresentation operationRepresentation = jsonParser
.parse(OperationRepresentation.class, payload);
rt = deviceControlApi.create(operationRepresentation);
log.info("Tenant {} - New operation posted: {}", tenant, rt);
} else {
log.error("Tenant {} - Not existing API!", tenant);
}
} catch (JsonProcessingException e) {
log.error("Tenant {} - Could not map payload: {} {}", tenant, targetAPI, payload);
error.append("Could not map payload: " + targetAPI + "/" + payload);
} catch (SDKException s) {
log.error("Tenant {} - Could not sent payload to c8y: {} {}: ", tenant, targetAPI, payload, s);
error.append("Could not sent payload to c8y: " + targetAPI + "/" + payload + "/" + s);
}
return rt;
});
});
if (!error.toString().equals("")) {
throw new CompletionException(new ProcessingException(error.toString()));
}
return result;
});
}

public AbstractExtensibleRepresentation createMEAO(ProcessingContext<?> context)
throws ProcessingException {
String tenant = context.getTenant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import dynamic.mapping.connector.kafka.KafkaClient;
import dynamic.mapping.connector.mqtt.MQTTClient;
import dynamic.mapping.connector.mqtt.MQTTServiceClient;
import dynamic.mapping.core.cache.InboundExternalIdCache;
import dynamic.mapping.model.MappingServiceRepresentation;
import dynamic.mapping.notification.C8YNotificationSubscriber;
import dynamic.mapping.processor.extension.ExtensibleProcessorInbound;
Expand Down Expand Up @@ -119,12 +118,7 @@ public void setServiceConfigurationComponent(@Lazy ServiceConfigurationComponent
@Getter
@Setter
@Autowired
private ExecutorService cachedThreadPool;

@Getter
@Setter
@Autowired
private ExecutorService processingCachePool;
private ExecutorService virtThreadPool;

public Map<MappingType, BasePayloadProcessorInbound<?>> createPayloadProcessorsInbound(String tenant) {
ExtensibleProcessorInbound extensibleProcessor = getExtensibleProcessors().get(tenant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class AsynchronousDispatcherInbound implements GenericMessageCallback {

private AConnectorClient connectorClient;

private ExecutorService cachedThreadPool;
private ExecutorService virtThreadPool;

private MappingComponent mappingComponent;

Expand All @@ -87,7 +87,7 @@ public class AsynchronousDispatcherInbound implements GenericMessageCallback {
public AsynchronousDispatcherInbound(ConfigurationRegistry configurationRegistry,
AConnectorClient connectorClient) {
this.connectorClient = connectorClient;
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();
this.mappingComponent = configurationRegistry.getMappingComponent();
this.configurationRegistry = configurationRegistry;
}
Expand All @@ -103,7 +103,7 @@ public static class MappingInboundTask<T> implements Callable<List<ProcessingCon
Timer inboundProcessingTimer;
Counter inboundProcessingCounter;
AConnectorClient connectorClient;
ExecutorService cachedThreadPool;
ExecutorService virtThreadPool;


public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapping> resolvedMappings,
Expand All @@ -123,7 +123,7 @@ public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapp
this.inboundProcessingCounter = Counter.builder("dynmapper_inbound_message_total")
.tag("tenant", connectorMessage.getTenant()).description("Total number of inbound messages")
.tag("connector", connectorMessage.getConnectorIdent()).register(Metrics.globalRegistry);
this.cachedThreadPool = configurationRegistry.getCachedThreadPool();
this.virtThreadPool = configurationRegistry.getVirtThreadPool();

}

Expand Down Expand Up @@ -261,7 +261,7 @@ public Future<List<ProcessingContext<?>>> processMessage(ConnectorMessage messag
return futureProcessingResult;
}

futureProcessingResult = cachedThreadPool.submit(
futureProcessingResult = virtThreadPool.submit(
new MappingInboundTask(configurationRegistry, resolvedMappings,
message, connectorClient));

Expand All @@ -275,7 +275,7 @@ public void onClose(String closeMessage, Throwable closeException) {

@Override
public void onMessage(ConnectorMessage message) {
Thread.startVirtualThread(() -> processMessage(message)).setName("vProcIn");
processMessage(message);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public abstract class BasePayloadProcessorInbound<T> {
public BasePayloadProcessorInbound(ConfigurationRegistry configurationRegistry) {
this.objectMapper = configurationRegistry.getObjectMapper();
this.c8yAgent = configurationRegistry.getC8yAgent();
this.processingCachePool = configurationRegistry.getProcessingCachePool();
this.virtCachePool = configurationRegistry.getVirtThreadPool();
}

protected C8YAgent c8yAgent;

protected ObjectMapper objectMapper;

protected ExecutorService processingCachePool;
protected ExecutorService virtCachePool;

public abstract ProcessingContext<T> deserializePayload(ProcessingContext<T> context, ConnectorMessage message)
throws IOException;
Expand Down Expand Up @@ -110,7 +110,7 @@ public ProcessingContext<T> substituteInTargetAndSend(ProcessingContext<T> conte
for (int i = 0; i < deviceEntries.size(); i++) {
// for (MappingSubstitution.SubstituteValue device : deviceEntries) {
int finalI = i;
contextFutureList.add(processingCachePool.submit(() -> {
contextFutureList.add(virtCachePool.submit(() -> {
MappingSubstitution.SubstituteValue device = deviceEntries.get(finalI);
int predecessor = -1;
DocumentContext payloadTarget = JsonPath.parse(mapping.target);
Expand Down
Loading

0 comments on commit 02c1a71

Please sign in to comment.