Skip to content

Commit

Permalink
Add JobBuilder to JetService [CORE-31] (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
burakgok authored and actions-user committed Mar 25, 2024
1 parent fe501b4 commit d122cb8
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 3 deletions.
41 changes: 41 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/jet/JetMemberSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2008-2024, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet;

import com.hazelcast.cluster.Member;
import com.hazelcast.function.PredicateEx;

/**
* Selects the members on which a specific job will run.
*
* @see JetService.JobBuilder#withMemberSelector(JetMemberSelector)
* @since 5.5
*/
@FunctionalInterface
public interface JetMemberSelector extends PredicateEx<Member> {

/**
* Selects all lite members on the cluster.
*/
JetMemberSelector ALL_LITE_MEMBERS = Member::isLiteMember;

/**
* Predicate to select members on which the job will run.
*/
@Override
boolean testEx(Member member);
}
58 changes: 55 additions & 3 deletions hazelcast/src/main/java/com/hazelcast/jet/JetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -154,8 +155,8 @@ default Job newJob(@Nonnull Pipeline pipeline) {
* #newLightJob(Pipeline, JobConfig)}.
*/
@Nonnull
default Job newLightJob(@Nonnull Pipeline p) {
return newLightJob(p, new JobConfig());
default Job newLightJob(@Nonnull Pipeline pipeline) {
return newLightJob(pipeline, new JobConfig());
}

/**
Expand Down Expand Up @@ -189,7 +190,7 @@ default Job newLightJob(@Nonnull Pipeline p) {
* You should not mutate the {@link JobConfig} or {@link Pipeline} instances
* after submitting them to this method.
*/
Job newLightJob(@Nonnull Pipeline p, @Nonnull JobConfig config);
Job newLightJob(@Nonnull Pipeline pipeline, @Nonnull JobConfig config);

/**
* Submits a job defined in the Core API with a default config.
Expand Down Expand Up @@ -299,4 +300,55 @@ default <T> Observable<T> newObservable() {
*/
@Nonnull
Collection<Observable<?>> getObservables();

/**
* Creates a {@code JobBuilder} for a new Jet job with {@link DAG} definition.
*
* @since 5.5
*/
JobBuilder newJobBuilder(@Nonnull DAG dag);

/**
* Creates a {@code JobBuilder} for a new Jet job with {@link Pipeline} definition.
*
* @since 5.5
*/
JobBuilder newJobBuilder(@Nonnull Pipeline pipeline);

/** @since 5.5 */
@NotThreadSafe
interface JobBuilder {

/**
* See {@link JobConfig} for details.
*/
JobBuilder withConfig(@Nonnull JobConfig jobConfig);

/**
* Selects the members on which the job will run.
*
* @see JetMemberSelector#ALL_LITE_MEMBERS
*/
JobBuilder withMemberSelector(@Nonnull JetMemberSelector memberSelector);

/**
* See {@link #newLightJob(Pipeline, JobConfig)} for details.
*/
JobBuilder asLightJob();

/**
* See {@link #newJob(Pipeline, JobConfig)} for details.
*
* @see #startIfAbsent()
*/
Job start();

/**
* See {@link #newJobIfAbsent(Pipeline, JobConfig)} for details.
*
* @throws UnsupportedOperationException for light jobs
* @see #start()
*/
Job startIfAbsent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.hazelcast.jet.config;

import com.hazelcast.jet.JetMemberSelector;

/**
* Some constants for the {@link JobConfig#getArgument(String)} method.
*/
Expand Down Expand Up @@ -52,6 +54,13 @@ public final class JobConfigArguments {
*/
public static final String KEY_JOB_IS_SUSPENDABLE = "__jet.jobIsSuspendable";

/**
* The key under which caller provides a {@link JetMemberSelector} for an isolated job.
* <p>
* Isolated jobs are available only in Enterprise Edition.
*/
public static final String KEY_ISOLATED_JOB_MEMBER_SELECTOR = "__jet.isolatedJobMemberSelector";

private JobConfigArguments() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetCacheManager;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.JetMemberSelector;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobAlreadyExistsException;
import com.hazelcast.jet.JobStateSnapshot;
Expand Down Expand Up @@ -52,11 +53,13 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.hazelcast.jet.config.JobConfigArguments.KEY_ISOLATED_JOB_MEMBER_SELECTOR;
import static com.hazelcast.jet.impl.JobRepository.exportedSnapshotMapName;
import static com.hazelcast.jet.impl.util.Util.distinctBy;
import static com.hazelcast.security.permission.ActionConstants.ACTION_ADD_RESOURCES;
Expand Down Expand Up @@ -410,4 +413,66 @@ public abstract Job newJobProxy(long jobId,
public abstract Map<M, GetJobIdsResult> getJobsInt(String onlyName, Long onlyJobId);

public abstract M getMasterId();

@Override
public JobBuilderImpl newJobBuilder(@Nonnull DAG dag) {
return new JobBuilderImpl(dag);
}

@Override
public JobBuilderImpl newJobBuilder(@Nonnull Pipeline pipeline) {
return new JobBuilderImpl(pipeline);
}

public class JobBuilderImpl implements JobBuilder {
private final @Nonnull Object jobDefinition;
private @Nullable JobConfig config;
private @Nullable JetMemberSelector memberSelector;
private boolean isLightJob;

JobBuilderImpl(@Nonnull DAG dag) {
jobDefinition = dag;
}

JobBuilderImpl(@Nonnull Pipeline pipeline) {
jobDefinition = pipeline;
}

@Override
public JobBuilderImpl withConfig(@Nonnull JobConfig jobConfig) {
this.config = jobConfig;
return this;
}

@Override
public JobBuilderImpl withMemberSelector(@Nonnull JetMemberSelector memberSelector) {
this.memberSelector = memberSelector;
return this;
}

@Override
public JobBuilderImpl asLightJob() {
isLightJob = true;
return this;
}

@Nonnull
private JobConfig getConfig() {
return Objects.requireNonNullElseGet(config, JobConfig::new)
.setArgument(KEY_ISOLATED_JOB_MEMBER_SELECTOR, memberSelector);
}

@Override
public Job start() {
return newJobInt(newJobId(), jobDefinition, getConfig(), isLightJob);
}

@Override
public Job startIfAbsent() {
if (isLightJob) {
throw new UnsupportedOperationException();
}
return newJobIfAbsent(jobDefinition, getConfig(), null);
}
}
}
Loading

0 comments on commit d122cb8

Please sign in to comment.