Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load getTaskLocation on task startup in mmless ingestion #17350

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -31,6 +32,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand Down Expand Up @@ -177,6 +179,11 @@ private void writeTaskPayload(Task task) throws IOException
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
{
try {
/* It's okay to store taskLocation because podIP only changes on pod restart, and we have to set restartPolicy to Never
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
taskLocation = getTaskLocationFromK8s();
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
Expand Down Expand Up @@ -254,24 +261,8 @@ protected TaskLocation getTaskLocation()
if we decide we need to change this later.
**/
if (taskLocation == null) {
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
}

Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
log.warn("Unknown task location for [%s]", taskId);
return TaskLocation.unknown();
}

return taskLocation;
Expand Down Expand Up @@ -378,4 +369,28 @@ private void updateState(State[] acceptedStates, State targetState)
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}

@VisibleForTesting
protected TaskLocation getTaskLocationFromK8s()
{
Pod pod = kubernetesClient.getPeonPodWithRetries(taskId.getK8sJobName());
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
throw new ISE("Could not find location of running task [%s]", taskId);
}

return TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(
pod.getMetadata() != null && pod.getMetadata().getAnnotations() != null ?
pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false") :
"false"
),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.dsl.LogWatch;
Expand Down Expand Up @@ -57,6 +58,7 @@
public class KubernetesPeonLifecycleTest extends EasyMockSupport
{
private static final String ID = "id";
private static final String IP = "ip";
private static final TaskStatus SUCCESS = TaskStatus.success(ID);

@Mock KubernetesPeonClient kubernetesClient;
Expand Down Expand Up @@ -286,6 +288,9 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -337,7 +342,9 @@ public void test_join() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -393,7 +400,9 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
).anyTimes();
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -445,7 +454,9 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.expectLastCall().once();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -493,7 +504,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -545,7 +558,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -585,7 +600,9 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -768,7 +785,7 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown()
}

@Test
public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -780,8 +797,6 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());

replayAll();

Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
Expand All @@ -790,35 +805,7 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

Pod pod = new PodBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());

verifyAll();
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
public void test_getTaskLocationFromK8s()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -839,12 +826,11 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();

replayAll();

TaskLocation location = peonLifecycle.getTaskLocation();

TaskLocation location = peonLifecycle.getTaskLocationFromK8s();
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Expand All @@ -854,43 +840,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu
}

@Test
public void test_getTaskLocation_saveTaskLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

Pod pod = new PodBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.withNewStatus()
.withPodIP("ip")
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();

replayAll();

TaskLocation location = peonLifecycle.getTaskLocation();
peonLifecycle.getTaskLocation();
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Assert.assertEquals(ID, location.getK8sPodName());

verifyAll();
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
public void test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -912,11 +862,11 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithT
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();

replayAll();

TaskLocation location = peonLifecycle.getTaskLocation();
TaskLocation location = peonLifecycle.getTaskLocationFromK8s();

Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(-1, location.getPort());
Expand All @@ -938,7 +888,6 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();

replayAll();
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
Expand All @@ -952,4 +901,11 @@ private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, Kubern
stateField.setAccessible(true);
stateField.set(peonLifecycle, new AtomicReference<>(state));
}

private PodStatus getPodStatusWithIP()
{
PodStatus podStatus = new PodStatus();
podStatus.setPodIP(IP);
return podStatus;
}
}
Loading