Skip to content

Commit

Permalink
[Improvement-16985][Registry] SubscribeListener support set scope
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 24, 2025
1 parent 25108c8 commit 27fa00b
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public void notify(Event event) {
try {
// make sure the event is processed in order
synchronized (this) {
Event.Type type = event.type();
T server = parseServerFromHeartbeat(event.data());
Event.Type type = event.getType();
T server = parseServerFromHeartbeat(event.getEventData());
if (server == null) {
log.error("Unknown cluster change event: {}", event);
return;
Expand All @@ -58,6 +58,11 @@ public void notify(Event event) {
}
}

@Override
public SubscribeScope getSubscribeScope() {
return SubscribeScope.CHILDREN_ONLY;
}

abstract T parseServerFromHeartbeat(String serverHeartBeatJson);

public abstract void onServerAdded(T serverHeartBeat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,115 +17,30 @@

package org.apache.dolphinscheduler.registry.api;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

@Getter
@ToString
@Builder
@AllArgsConstructor
public class Event {

// The prefix which is watched
private String key;
// The path which is watched
private final String watchedPath;
// The full path where the event was generated
private String path;
private final String eventPath;
// The value corresponding to the path
private String data;
private final String eventData;
// The event type {ADD, REMOVE, UPDATE}
private Type type;

public Event(String key, String path, String data, Type type) {
this.key = key;
this.path = path;
this.data = data;
this.type = type;
}

public Event() {
}

public static EventBuilder builder() {
return new EventBuilder();
}

public String key() {
return this.key;
}

public String path() {
return this.path;
}

public String data() {
return this.data;
}

public Type type() {
return this.type;
}

public Event key(String key) {
this.key = key;
return this;
}

public Event path(String path) {
this.path = path;
return this;
}

public Event data(String data) {
this.data = data;
return this;
}

public Event type(Type type) {
this.type = type;
return this;
}

public String toString() {
return "Event(key=" + this.key() + ", path=" + this.path() + ", data=" + this.data() + ", type=" + this.type()
+ ")";
}

public enum Type {
ADD,
REMOVE,
UPDATE
}

public static class EventBuilder {

private String key;
private String path;
private String data;
private Type type;

EventBuilder() {
}

public EventBuilder key(String key) {
this.key = key;
return this;
}

public EventBuilder path(String path) {
this.path = path;
return this;
}

public EventBuilder data(String data) {
this.data = data;
return this;
}

public EventBuilder type(Type type) {
this.type = type;
return this;
}

public Event build() {
return new Event(key, path, data, type);
}

public String toString() {
return "Event.EventBuilder(key=" + this.key + ", path=" + this.path + ", data=" + this.data + ", type="
+ this.type + ")";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,23 @@

public interface SubscribeListener {

void notify(Event event);
void notify(final Event event);

SubscribeScope getSubscribeScope();

enum SubscribeScope {
/**
* Only watch the path itself
*/
PATH_ONLY,
/**
* Only watch the children of the path
*/
CHILDREN_ONLY,
/**
* Watch the path and all its children and the parent path
*/
ALL

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;

import java.util.List;

Expand Down Expand Up @@ -56,16 +57,25 @@ public AbstractHAServer(final Registry registry, final String selectorPath, fina

@Override
public void start() {
registry.subscribe(selectorPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
if (serverIdentify.equals(event.data())) {
statusChange(ServerStatus.STAND_BY);
} else {
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
registry.subscribe(selectorPath, new SubscribeListener() {

@Override
public void notify(Event event) {
if (Event.Type.REMOVE.equals(event.getType())) {
if (serverIdentify.equals(event.getEventData())) {
statusChange(ServerStatus.STAND_BY);
} else {
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}
}
}

@Override
public SubscribeScope getSubscribeScope() {
return SubscribeScope.PATH_ONLY;
}
});

if (participateElection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -158,7 +159,24 @@ public void subscribe(String path, SubscribeListener listener) {
watcherMap.computeIfAbsent(path,
$ -> client.getWatchClient().watch(watchKey, watchOption, watchResponse -> {
for (WatchEvent event : watchResponse.getEvents()) {
listener.notify(new EventAdaptor(event, path));
final String eventPath = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
switch (listener.getSubscribeScope()) {
case PATH_ONLY:
if (eventPath.equals(path)) {
listener.notify(toEvent(event, path));
}
break;
case CHILDREN_ONLY:
if (!eventPath.equals(path)) {
listener.notify(toEvent(event, path));
}
break;
case ALL:
listener.notify(toEvent(event, path));
break;
default:
throw new RegistryException("Unknown event scope: " + listener.getSubscribeScope());
}
}
}));
} catch (Exception e) {
Expand Down Expand Up @@ -373,30 +391,31 @@ private static ByteSequence byteSequence(String val) {
return ByteSequence.from(val, StandardCharsets.UTF_8);
}

static final class EventAdaptor extends Event {

public EventAdaptor(WatchEvent event, String key) {
key(key);

switch (event.getEventType()) {
case PUT:
if (event.getPrevKV().getKey().isEmpty()) {
type(Type.ADD);
} else {
type(Type.UPDATE);
}
break;
case DELETE:
type(Type.REMOVE);
break;
default:
break;
}
KeyValue keyValue = event.getKeyValue();
if (keyValue != null) {
path(keyValue.getKey().toString(StandardCharsets.UTF_8));
data(keyValue.getValue().toString(StandardCharsets.UTF_8));
}
private Event toEvent(final WatchEvent watchEvent, final String watchedPath) {
Event.Type eventType = null;
switch (watchEvent.getEventType()) {
case PUT:
if (watchEvent.getPrevKV().getKey().isEmpty()) {
eventType = Event.Type.ADD;
} else {
eventType = Event.Type.UPDATE;
}
break;
case DELETE:
eventType = Event.Type.REMOVE;
break;
default:
break;
}
final KeyValue keyValue = watchEvent.getKeyValue();
return Event.builder()
.type(eventType)
.watchedPath(watchedPath)
.eventPath(Optional.ofNullable(keyValue).map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
.orElse(null))
.eventData(Optional.ofNullable(keyValue).map(kv -> kv.getValue().toString(StandardCharsets.UTF_8))
.orElse(null))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,24 @@ public void testSubscribe() {
final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);

SubscribeListener subscribeListener = event -> {
System.out.println("Receive event: " + event);
if (event.type() == Event.Type.ADD) {
subscribeAdded.compareAndSet(false, true);
final SubscribeListener subscribeListener = new SubscribeListener() {

@Override
public void notify(Event event) {
if (event.getType() == Event.Type.ADD) {
subscribeAdded.compareAndSet(false, true);
}
if (event.getType() == Event.Type.REMOVE) {
subscribeRemoved.compareAndSet(false, true);
}
if (event.getType() == Event.Type.UPDATE) {
subscribeUpdated.compareAndSet(false, true);
}
}
if (event.type() == Event.Type.REMOVE) {
subscribeRemoved.compareAndSet(false, true);
}
if (event.type() == Event.Type.UPDATE) {
subscribeUpdated.compareAndSet(false, true);

@Override
public SubscribeScope getSubscribeScope() {
return SubscribeScope.PATH_ONLY;
}
};
String key = "/nodes/master" + System.nanoTime();
Expand Down
Loading

0 comments on commit 27fa00b

Please sign in to comment.