Skip to content

Commit

Permalink
Http perf improvements (#9)
Browse files Browse the repository at this point in the history
* http performance improvements
  • Loading branch information
v1r3n authored Dec 1, 2022
1 parent b4bbf39 commit 1334e07
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 8 deletions.
2 changes: 2 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation project(":orkes-conductor-archive")
implementation project(":orkes-conductor-persistence")

implementation 'io.orkes.conductor:orkes-conductor-common-protos:0.9.2'

//aws
implementation "com.amazonaws:aws-java-sdk-core:${versions.revAwsSdk}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,8 @@ public class OrkesWorkflowExecutor extends WorkflowExecutor {
private final ExecutionDAOFacade orkesExecutionDAOFacade;
private final SystemTaskRegistry systemTaskRegistry;
private final ExecutorService taskUpdateExecutor;

private final RedisExecutionDAO executionDAO;

private final MetricsCollector metricsCollector;

public OrkesWorkflowExecutor(
DeciderService deciderService,
MetadataDAO metadataDAO,
Expand All @@ -83,8 +80,7 @@ public OrkesWorkflowExecutor(
@Lazy SystemTaskRegistry systemTaskRegistry,
ParametersUtils parametersUtils,
IDGenerator idGenerator,
RedisExecutionDAO executionDAO,
MetricsCollector metricsCollector) {
RedisExecutionDAO executionDAO) {
super(
deciderService,
metadataDAO,
Expand All @@ -102,7 +98,6 @@ public OrkesWorkflowExecutor(
this.orkesExecutionDAOFacade = executionDAOFacade;
this.systemTaskRegistry = systemTaskRegistry;
this.executionDAO = executionDAO;
this.metricsCollector = metricsCollector;

int threadPoolSize = Runtime.getRuntime().availableProcessors() * 10;
this.taskUpdateExecutor =
Expand All @@ -126,6 +121,8 @@ public boolean offer(Runnable runnable) {
log.info("OrkesWorkflowExecutor initialized");
}



@Override
public void retry(String workflowId, boolean resumeSubworkflowTasks) {
WorkflowModel workflowModel = orkesExecutionDAOFacade.getWorkflowModel(workflowId, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class HttpSync extends WorkflowSystemTask {
public HttpSync(RestTemplateProvider restTemplateProvider, ObjectMapper objectMapper) {
super(TASK_TYPE_HTTP);
httpTask = new HttpTask(restTemplateProvider, objectMapper);
log.info("Using {}", restTemplateProvider);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.execution.tasks;

import com.netflix.conductor.tasks.http.HttpTask;
import com.netflix.conductor.tasks.http.providers.RestTemplateProvider;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Primary;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.time.Duration;
import java.util.List;
import java.util.Optional;

/**
* Provider for a customized RestTemplateBuilder. This class provides a default {@link RestTemplateBuilder} which can be
* configured or extended as needed.
*/
@Component
@Primary
public class OrkesRestTemplateProvider implements RestTemplateProvider {

@AllArgsConstructor
private static class RestTemplateHolder {
RestTemplate restTemplate;
HttpComponentsClientHttpRequestFactory requestFactory;
int readTimeout;
int connectTimeout;
}

private final ThreadLocal<RestTemplateHolder> threadLocalRestTemplate;
private final int defaultReadTimeout;
private final int defaultConnectTimeout;

@Autowired
public OrkesRestTemplateProvider(@Value("${conductor.tasks.http.readTimeout:250ms}") Duration readTimeout,
@Value("${conductor.tasks.http.connectTimeout:250ms}") Duration connectTimeout,
Optional<ClientHttpRequestInterceptor> interceptor) {
this.defaultReadTimeout = (int) readTimeout.toMillis();
this.defaultConnectTimeout = (int) connectTimeout.toMillis();
this.threadLocalRestTemplate = ThreadLocal.withInitial(() ->
{
RestTemplate restTemplate = new RestTemplate();
HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
requestFactory.setReadTimeout(defaultReadTimeout);
requestFactory.setConnectTimeout(defaultConnectTimeout);
restTemplate.setRequestFactory(requestFactory);
interceptor.ifPresent(it -> addInterceptor(restTemplate, it));
return new RestTemplateHolder(restTemplate, requestFactory, defaultReadTimeout, defaultConnectTimeout);
});

}

@Override
public RestTemplate getRestTemplate(HttpTask.Input input) {
RestTemplateHolder holder = threadLocalRestTemplate.get();
RestTemplate restTemplate = holder.restTemplate;
HttpComponentsClientHttpRequestFactory requestFactory = holder.requestFactory;

int newReadTimeout = Optional.ofNullable(input.getReadTimeOut()).orElse(defaultReadTimeout);
int newConnectTimeout = Optional.ofNullable(input.getConnectionTimeOut()).orElse(defaultConnectTimeout);
if (newReadTimeout != holder.readTimeout || newConnectTimeout != holder.connectTimeout) {
holder.readTimeout = newReadTimeout;
holder.connectTimeout = newConnectTimeout;
requestFactory.setReadTimeout(newReadTimeout);
requestFactory.setConnectTimeout(newConnectTimeout);
}

return restTemplate;
}

private void addInterceptor(RestTemplate restTemplate, ClientHttpRequestInterceptor interceptor) {
List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
if (!interceptors.contains(interceptor)) {
interceptors.add(interceptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
@Slf4j
public class SubWorkflowSync extends WorkflowSystemTask {

private static final String SUB_WORKFLOW_ID = "subWorkflowId";

private final SubWorkflow subWorkflow;
private final ObjectMapper objectMapper;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.orkes.conductor.rest;

import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.service.WorkflowService;
import io.orkes.conductor.common.model.WorkflowRun;
import io.swagger.v3.oas.annotations.Operation;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.concurrent.*;

import static com.netflix.conductor.rest.config.RequestMappingConstants.WORKFLOW;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;

@RestController
@RequestMapping(WORKFLOW)
@Slf4j
@RequiredArgsConstructor
public class WorkflowResourceSync {

public static final String REQUEST_ID_KEY = "_X-Request-Id";

private final WorkflowService workflowService;

private final ScheduledExecutorService executionMonitor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2);

@PostConstruct
public void startMonitor() {
log.info("Starting execution monitors");
}

@PostMapping(value = "execute/{name}/{version}", produces = APPLICATION_JSON_VALUE)
@Operation(summary = "Execute a workflow synchronously", tags = "workflow-resource")
@SneakyThrows
public WorkflowRun executeWorkflow(
@PathVariable("name") String name,
@PathVariable(value = "version", required = false) Integer version,
@RequestParam(value = "requestId", required = true) String requestId,
@RequestParam(value = "waitUntilTaskRef", required = false) String waitUntilTaskRef,
@RequestBody StartWorkflowRequest request) {

request.setName(name);
request.setVersion(version);
String workflowId = workflowService.startWorkflow(request);
request.getInput().put(REQUEST_ID_KEY, requestId);
Workflow workflow = workflowService.getExecutionStatus(workflowId, true);
if(workflow.getStatus().isTerminal() || workflow.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equalsIgnoreCase(waitUntilTaskRef))) {
return toWorkflowRun(workflow);
}
int maxTimeInMilis = 5_000; //5 sec
int sleepTime = 100; //millis
int loopCount = maxTimeInMilis / sleepTime;
for (int i = 0; i < loopCount; i++) {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
workflow = workflowService.getExecutionStatus(workflowId, true);
if(workflow.getStatus().isTerminal() || workflow.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equalsIgnoreCase(waitUntilTaskRef))) {
return toWorkflowRun(workflow);
}
}
workflow = workflowService.getExecutionStatus(workflowId, true);
return toWorkflowRun(workflow);
}

public static WorkflowRun toWorkflowRun(Workflow workflow) {
WorkflowRun run = new WorkflowRun();

run.setWorkflowId(workflow.getWorkflowId());
run.setRequestId((String) workflow.getInput().get(REQUEST_ID_KEY));
run.setCorrelationId(workflow.getCorrelationId());
run.setInput(workflow.getInput());
run.setCreatedBy(workflow.getCreatedBy());
run.setCreateTime(workflow.getCreateTime());
run.setOutput(workflow.getOutput());
run.setTasks(new ArrayList<>());
workflow.getTasks().forEach(task -> run.getTasks().add(task));
run.setPriority(workflow.getPriority());
if(workflow.getUpdateTime() != null) {
run.setUpdateTime(workflow.getUpdateTime());
}
run.setStatus(Workflow.WorkflowStatus.valueOf(workflow.getStatus().name()));
run.setVariables(workflow.getVariables());

return run;
}
}

0 comments on commit 1334e07

Please sign in to comment.