Skip to content

Commit

Permalink
Merge pull request #3 from CloudSlang/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
genadi-hp committed May 8, 2016
2 parents 15c894e + 53a50eb commit 60a56f3
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 149 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*******************************************************************************
* (c) Copyright 2014 Hewlett-Packard Development Company, L.P.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License v2.0 which accompany this distribution.
*
* The Apache License is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
*******************************************************************************/


package io.cloudslang.engine.queue.services;

public interface BusyWorkersService {
boolean isWorkerBusy(String workerId);
void findBusyWorkers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ public interface ExecutionQueueRepository {
Set<Long> getFinishedExecStateIds();

List<ExecutionMessage> findByStatuses(int maxSize, ExecStatus... statuses);
List<String> getBusyWorkers(ExecStatus... statuses);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*******************************************************************************
* (c) Copyright 2014 Hewlett-Packard Development Company, L.P.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License v2.0 which accompany this distribution.
*
* The Apache License is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
*******************************************************************************/

package io.cloudslang.engine.queue.services;
import io.cloudslang.engine.queue.entities.ExecStatus;
import io.cloudslang.engine.queue.repositories.ExecutionQueueRepository;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;

public class BusyWorkersServiceImpl implements BusyWorkersService{

private final Logger logger = Logger.getLogger(BusyWorkersServiceImpl.class);
private List<String> busyWorkers = new ArrayList<>();

@Autowired
private ExecutionQueueRepository executionQueueRepository;

@Override
@Transactional(readOnly = true)
public boolean isWorkerBusy(String workerId) {
return busyWorkers.contains(workerId);
}
@Override
@Transactional(readOnly = true)
public void findBusyWorkers() {
long startTime = 0;
if (logger.isDebugEnabled()) {
startTime = System.currentTimeMillis();
logger.debug("Querying for busy workers...");
}
busyWorkers = executionQueueRepository.getBusyWorkers(ExecStatus.ASSIGNED);

if (logger.isDebugEnabled()) {
long endTime = System.currentTimeMillis();
logger.debug("Queried for busy workers, the following workers are busy: " + this.busyWorkers + ". Query took: " + (endTime - startTime) + " ms to complete");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ final public class ExecutionQueueServiceImpl implements ExecutionQueueService {
@Autowired
private ExecutionAssignerService executionAssignerService;

@Autowired
private BusyWorkersService busyWorkersService;

@Autowired(required = false)
private List<QueueListener> listeners = Collections.emptyList();

Expand Down Expand Up @@ -136,7 +139,10 @@ private List<ExecutionMessage> filterToPersistMessages(List<ExecutionMessage> me
@Override
@Transactional(readOnly = true)
public List<ExecutionMessage> poll(String workerId, int maxSize, ExecStatus... statuses) {
List<ExecutionMessage> result = executionQueueRepository.poll(workerId, maxSize, statuses);
List<ExecutionMessage> result = new ArrayList<>();
//check if the worker has work before actually polling for work
if(busyWorkersService.isWorkerBusy(workerId))
result = executionQueueRepository.poll(workerId, maxSize, statuses);

for (QueueListener listener : listeners) {
listener.onPoll(result, result.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,20 @@ public void testPoll(){
Assert.assertFalse(result.isEmpty());
}

@Test
public void testGetBusyWorkersBusyWorker(){
List<ExecutionMessage> msg = new ArrayList<>();
ExecutionMessage execMsg = generateMessage("group1","msg1", 1);
execMsg.setWorkerId("worker1");
execMsg.setStatus(ExecStatus.IN_PROGRESS);
execMsg.incMsgSeqId();
msg.add(execMsg);
executionQueueRepository.insertExecutionStates(msg);
executionQueueRepository.insertExecutionQueue(msg,1L);
List<String> busyWorkers = executionQueueRepository.getBusyWorkers(ExecStatus.ASSIGNED);
Assert.assertNotNull(busyWorkers);
}

private ExecutionMessage generateMessage(String groupName,String msgId, int msg_seq_id) {
byte[] payloadData;
payloadData = "This is just a test".getBytes();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*******************************************************************************
* (c) Copyright 2014 Hewlett-Packard Development Company, L.P.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License v2.0 which accompany this distribution.
*
* The Apache License is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
*******************************************************************************/


package io.cloudslang.engine.queue.services;

import io.cloudslang.engine.queue.entities.ExecStatus;
import io.cloudslang.engine.queue.repositories.ExecutionQueueRepository;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.Mockito.*;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration
public class BusyWorkersServiceTest {



@Autowired
private BusyWorkersService busyWorkersService;

@Autowired
private ExecutionQueueRepository executionQueueRepository;

@Before
public void setUp()
{
MockitoAnnotations.initMocks(this);
reset(executionQueueRepository);
}

@Test
public void testIdleWorker(){
List<String> busyWorkers = new ArrayList<>();
when(executionQueueRepository.getBusyWorkers(ExecStatus.ASSIGNED)).thenReturn(busyWorkers);
Assert.assertFalse(busyWorkersService.isWorkerBusy("worker1"));
}

@Test
public void testBusyWorker(){
List<String> busyWorkers = new ArrayList<>();
busyWorkers.add("worker1");
when(executionQueueRepository.getBusyWorkers(ExecStatus.ASSIGNED)).thenReturn(busyWorkers);
busyWorkersService.findBusyWorkers();
Assert.assertTrue(busyWorkersService.isWorkerBusy("worker1"));
}


@Configuration
static class EmptyConfig {
@Bean
public BusyWorkersService busyWorkersService(){
return new BusyWorkersServiceImpl();
}

@Bean
public ExecutionQueueRepository executionQueueRepository(){
return mock(ExecutionQueueRepository.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class ExecutionQueueServiceTest {
@Autowired
public WorkerNodeService workerNodeService;

@Autowired
private BusyWorkersService busyWorkersService;

@Autowired
private JdbcTemplate jdbcTemplate;

Expand Down Expand Up @@ -190,7 +193,8 @@ public void pollWithoutAckTestMixMsg() throws Exception {
groupWorkerMap.put("group1", "worker1");
groupWorkerMap.put("group1", "worker2");
when(workerNodeService.readGroupWorkersMapActiveAndRunningAndVersion("")).thenReturn(groupWorkerMap);

when(busyWorkersService.isWorkerBusy("worker1")).thenReturn(true);
when(busyWorkersService.isWorkerBusy("worker1")).thenReturn(true);
List<ExecutionMessage> msgInQueue = executionQueueService.pollMessagesWithoutAck(100, 0);
Assert.assertEquals(0, msgInQueue.size());

Expand Down Expand Up @@ -314,6 +318,11 @@ public WorkerNodeService workerNodeService() {
return mock(WorkerNodeService.class);
}

@Bean
public BusyWorkersService busyWorkersService() {
return mock(BusyWorkersService.class);
}

@Bean
public VersionService queueVersionService() {
return mock(VersionService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class QueueDispatcherServiceTest {
@Mock
private ExecutionQueueService executionQueueService;

@Mock
private BusyWorkersService busyWorkersService;

@InjectMocks
private QueueDispatcherService queueDispatcherService = new QueueDispatcherServiceImpl();
Expand Down Expand Up @@ -75,6 +77,7 @@ public void testDispatch() throws Exception {
@Test
public void testPoll() throws Exception {
Date now = new Date();
when(busyWorkersService.isWorkerBusy("workerId")).thenReturn(true);
queueDispatcherService.poll("workerId",5);
verify(executionQueueService,times(1)).poll("workerId", 5, ExecStatus.ASSIGNED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.cloudslang.engine.queue.entities.Payload;
import io.cloudslang.engine.queue.repositories.ExecutionQueueRepository;
import io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl;
import io.cloudslang.engine.queue.services.BusyWorkersService;
import io.cloudslang.engine.queue.services.ExecutionQueueService;
import io.cloudslang.engine.queue.services.ExecutionQueueServiceImpl;
import io.cloudslang.engine.queue.services.assigner.ExecutionAssignerService;
Expand Down Expand Up @@ -65,6 +66,9 @@ public class QueueCleanerServiceTest {
@Autowired
public QueueCleanerService queueCleanerService;

@Autowired
private BusyWorkersService busyWorkersService;

@Autowired
private JdbcTemplate jdbcTemplate;

Expand All @@ -83,7 +87,7 @@ public void cleanTest() throws Exception {

ExecutionMessage message25 = generateMessage(2, "group1", "2", ExecStatus.IN_PROGRESS, 1);
ExecutionMessage message26 = generateMessage(2, "group1", "2", ExecStatus.FINISHED, 2);

when(busyWorkersService.isWorkerBusy("myWorker")).thenReturn(true);
msgs.clear();
msgs.add(message15);
executionQueueService.enqueue(msgs);
Expand Down Expand Up @@ -184,6 +188,11 @@ WorkerNodeService workerNodeService() {
return mock(WorkerNodeService.class);
}

@Bean
public BusyWorkersService busyWorkersService() {
return mock(BusyWorkersService.class);
}

@Bean
VersionService queueVersionService() {
return mock(VersionService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import io.cloudslang.engine.node.services.WorkersMBean;
import io.cloudslang.engine.queue.entities.ExecutionMessageConverter;
import io.cloudslang.engine.queue.repositories.ExecutionQueueRepositoryImpl;
import io.cloudslang.engine.queue.services.ExecutionQueueServiceImpl;
import io.cloudslang.engine.queue.services.QueueDispatcherServiceImpl;
import io.cloudslang.engine.queue.services.QueueListenerImpl;
import io.cloudslang.engine.queue.services.QueueStateIdGeneratorServiceImpl;
import io.cloudslang.engine.queue.services.ScoreEventFactoryImpl;
import io.cloudslang.engine.queue.services.*;
import io.cloudslang.engine.queue.services.assigner.ExecutionAssignerServiceImpl;
import io.cloudslang.engine.queue.services.cleaner.QueueCleanerServiceImpl;
import io.cloudslang.engine.queue.services.recovery.ExecutionRecoveryServiceImpl;
Expand Down Expand Up @@ -107,6 +103,7 @@ public class EngineBeanDefinitionParser extends AbstractBeanDefinitionParser {
put(WorkerDbSupportServiceImpl.class, null);
put(ScoreDeprecatedImpl.class, null);
put(ScoreEngineJobsImpl.class,"scoreEngineJobs");
put(BusyWorkersServiceImpl.class,"busyWorkersService");
}};

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
<task:scheduled ref="scoreEngineJobs" method="cleanQueueJob" fixed-delay="60000" initial-delay="120000" />
<task:scheduled ref="scoreEngineJobs" method="recoveryVersionJob" fixed-delay="30000" initial-delay="6000" />
<task:scheduled ref="scoreEngineJobs" method="executionRecoveryJob" fixed-delay="120000" initial-delay="120000" />
<task:scheduled ref="busyWorkersService" method="findBusyWorkers" fixed-delay="200" initial-delay="200" />
</task:scheduled-tasks>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class EngineTest {
private QueueDispatcherService dispatcherService;

@Test
public void baseEngineTest() {
public void baseEngineTest() throws InterruptedException {
// register worker
workerNodeService.create("uuid", "password", "host", "dir");
workerNodeService.activate("uuid");
Expand All @@ -79,7 +79,7 @@ public void baseEngineTest() {
ExecutionPlan executionPlan = createExecutionPlan();
TriggeringProperties triggeringProperties = TriggeringProperties.create(executionPlan);
score.trigger(triggeringProperties);

Thread.sleep(300);
List<ExecutionMessage> messages = dispatcherService.poll("uuid", 10);

assertThat(messages).hasSize(1);
Expand Down

0 comments on commit 60a56f3

Please sign in to comment.