Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2160] added some unit tests for gobblin temporal module #4059

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ dependencies {
testCompile project(":gobblin-example")

testCompile externalDependency.testng
testCompile externalDependency.mockito
testCompile externalDependency.mockitoInline
testCompile externalDependency.powerMockApi
testCompile externalDependency.powerMockModule
testCompile externalDependency.hadoopYarnMiniCluster
testCompile externalDependency.curatorFramework
testCompile externalDependency.curatorTest


testCompile ('com.google.inject:guice:3.0') {
force = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;

import com.google.common.annotations.VisibleForTesting;


import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;

Expand Down Expand Up @@ -115,8 +118,9 @@ protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final WorkflowAddr
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}

@VisibleForTesting
/** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */
protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) {
// (only pause when an appreciable number of leaves)
// TODO: use a configuration value, for simpler adjustment, rather than hard-code
return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT
Expand All @@ -130,11 +134,9 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi
* List<Integer> naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren);
* @return each sub-tree's desired size, in ascending sub-tree order
*/
protected static List<Integer> consolidateSubTreeGrandChildren(
final int numSubTreesPerSubTree,
final int numChildrenTotal,
final int numSubTreeChildren
) {
@VisibleForTesting
public static List<Integer> consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree,
Comment on lines +137 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you added @VisibleForTesting and changed modifier to public for testing this method. However, this change is not required if we follow the same package structure for the test class and we can access protected methods without exposing them publicly. We can revert the modifier change and place the test class in the same package instead.

final int numChildrenTotal, final int numSubTreeChildren) {
if (numSubTreesPerSubTree <= 0) {
return Lists.newArrayList();
} else if (isSqrt(numSubTreeChildren, numChildrenTotal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
Set<String> output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
Assert.assertEquals(output.size(), 11);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.ddm.utils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;

import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;


public class JobStateUtilTest {

private JobState jobState;
private FileSystem fileSystem;

@BeforeMethod
public void setUp() {
jobState = Mockito.mock(JobState.class);
fileSystem = Mockito.mock(FileSystem.class);
}

@Test
public void testOpenFileSystem() throws IOException {

Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getProperties()).thenReturn(new Properties());

FileSystem fs = JobStateUtils.openFileSystem(jobState);

Assert.assertNotNull(fs);
Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testCreateSource() throws ReflectiveOperationException {
Mockito.when(jobState.getProp(Mockito.anyString()))
.thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource");
Source<?, ?> source = JobStateUtils.createSource(jobState);
Assert.assertNotNull(source);
}

@Test
public void testOpenTaskStateStoreUncached() throws URISyntaxException {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test");
Mockito.when(jobState.getJobId()).thenReturn("testJobId");
Mockito.when(jobState.getJobName()).thenReturn("testJobName");
Mockito.when(fileSystem.makeQualified(Mockito.any()))
.thenReturn(new Path("file:///test/testJobName/testJobId/output"));
Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output"));

StateStore<TaskState> stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem);

Assert.assertNotNull(stateStore);
}

@Test
public void testGetFileSystemUri() {
Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test");
URI fsUri = JobStateUtils.getFileSystemUri(jobState);
Assert.assertEquals(URI.create("file:///test"), fsUri);
Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString());
}

@Test
public void testGetWorkDirRoot() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path rootPath = JobStateUtils.getWorkDirRoot(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath);
Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString());
}

@Test
public void testGetWorkUnitsPath() {
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState);
Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath);
}

@Test
public void testGetTaskStateStorePath() throws IOException {
Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path"));
Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp");
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem);
Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath);
}

@Test
public void testWriteJobState() throws IOException {
Path workDirRootPath = new Path("/tmp");
FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class);
Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos);

JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem);

Mockito.verify(fileSystem).create(Mockito.any(Path.class));
Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean());
}

@Test
public void testGetSharedResourcesBroker() {
Mockito.when(jobState.getProperties()).thenReturn(System.getProperties());
Mockito.when(jobState.getJobName()).thenReturn("testJob");
Mockito.when(jobState.getJobId()).thenReturn("jobId123");
Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.workflow.impl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update package to org.apache.gobblin.temporal.util.nesting.workflow


import java.time.Duration;
import java.util.List;
import java.util.Optional;

import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;

import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;


@RunWith(PowerMockRunner.class)
@PrepareForTest(Workflow.class)
public class AbstractNestingExecWorkflowImplTest {

@Mock
private Workload<String> mockWorkload;

@Mock
private WorkflowAddr mockWorkflowAddr;

@Mock
private Workload.WorkSpan<String> mockWorkSpan;

@Mock
private Promise<Object> mockPromise;

private AbstractNestingExecWorkflowImpl<String, Object> workflow;

@BeforeClass
public void setup() {
// PowerMockito is required to mock static methods in the Workflow class
Mockito.mockStatic(Workflow.class);
Mockito.mockStatic(Async.class);
Mockito.mockStatic(Promise.class);
this.mockWorkload = Mockito.mock(Workload.class);
this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class);
this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class);
this.mockPromise = Mockito.mock(Promise.class);

workflow = new AbstractNestingExecWorkflowImpl<String, Object>() {
@Override
protected Promise<Object> launchAsyncActivity(String task) {
return mockPromise;
}
};
}

@Test
public void testPerformWorkload_NoWorkSpan() {
// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty());

// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());

// Assert
Assert.assertEquals(0, result);
Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50);

// Assert
Assert.assertEquals(Duration.ZERO, result);
}

@Test
public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() {
// Act
Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150);

// Assert
Assert.assertEquals(
Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT),
result);
}

@Test
public void testConsolidateSubTreeGrandChildren() {
// Act
List<Integer> result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2);

// Assert
Assert.assertEquals(3, result.size());
Assert.assertEquals(Integer.valueOf(0), result.get(0));
Assert.assertEquals(Integer.valueOf(0), result.get(1));
Assert.assertEquals(Integer.valueOf(6), result.get(2));
}

@Test(expectedExceptions = AssertionError.class)
public void testPerformWorkload_LaunchesChildWorkflows() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've covered some scenarios, like handling an empty WorkSpan and testing the pause duration. However, the current tests don't fully capture the functionality(creating child workflows, subtrees handling, result calculation and edge cases) of the performWorkload method. It would be useful to add overall coverage for a method when we are adding tests for it, this ensures that the tests fulfill the original intent of thoroughly validating the behavior of missing tests. This would also help avoid the need for additional work on this method again later, unless we are making changes to it.

// Arrange
Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan));
Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5);
Mockito.when(mockWorkSpan.next()).thenReturn("task1");
Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false);

// Mock the child workflow
Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise);
Mockito.when(mockPromise.get()).thenReturn(5);
// Act
int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty());
}
}