Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: changes registration of dispatchers #4511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class RemoteMessageDispatcherRegistryImpl implements RemoteMessageDispatc
private final Map<String, RemoteMessageDispatcher> dispatchers = new HashMap<>();

@Override
public void register(RemoteMessageDispatcher dispatcher) {
dispatchers.put(dispatcher.protocol(), dispatcher);
public void register(String protocol, RemoteMessageDispatcher dispatcher) {
dispatchers.put(protocol, dispatcher);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void shouldDispatchEventsOnProviderContractNegotiationStateChanges(EventRouter e
ContractDefinitionStore contractDefinitionStore,
PolicyDefinitionStore policyDefinitionStore,
AssetIndex assetIndex) {
dispatcherRegistry.register(succeedingDispatcher());
dispatcherRegistry.register("test", succeedingDispatcher());

when(identityService.verifyJwtToken(eq(tokenRepresentation), isA(VerificationContext.class))).thenReturn(Result.success(token));
eventRouter.register(ContractNegotiationEvent.class, eventSubscriber);
Expand Down Expand Up @@ -143,7 +143,6 @@ private ContractRequestMessage createContractOfferRequest(Policy policy, String
@NotNull
private RemoteMessageDispatcher succeedingDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any")));
return testDispatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock();

@RegisterExtension
static final RuntimeExtension RUNTIME = new RuntimePerClassExtension()
.setConfiguration(Map.of(
Expand All @@ -100,6 +98,7 @@ public class TransferProcessEventDispatchTest {
.registerServiceMock(ContractNegotiationStore.class, mock())
.registerServiceMock(ParticipantAgentService.class, mock())
.registerServiceMock(DataPlaneClientFactory.class, mock());
private final EventSubscriber eventSubscriber = mock();

@Test
void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service,
Expand Down Expand Up @@ -128,7 +127,7 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se

when(agent.getIdentity()).thenReturn(providerId);

dispatcherRegistry.register(getTestDispatcher());
dispatcherRegistry.register("test", getTestDispatcher());
when(policyArchive.findPolicyForContract(matches(transferRequest.getContractId()))).thenReturn(Policy.Builder.newInstance().target("assetId").build());
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
when(agentService.createFor(token)).thenReturn(agent);
Expand Down Expand Up @@ -194,7 +193,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
.policy(Policy.Builder.newInstance().build())
.build();
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
dispatcherRegistry.register(getTestDispatcher());
dispatcherRegistry.register("test", getTestDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);

var initiateResult = service.initiateTransfer(transferRequest);
Expand All @@ -213,7 +212,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
@Test
void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry,
ContractNegotiationStore negotiationStore, PolicyArchive policyArchive) {
dispatcherRegistry.register(getFailingDispatcher());
dispatcherRegistry.register("test", getFailingDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
var transferRequest = createTransferRequest();
var agreement = ContractAgreement.Builder.newInstance()
Expand All @@ -233,7 +232,6 @@ void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service,
@NotNull
private RemoteMessageDispatcher getTestDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
var ack = TransferProcessAck.Builder.newInstance().build();
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack)));
return testDispatcher;
Expand All @@ -242,7 +240,6 @@ private RemoteMessageDispatcher getTestDispatcher() {
@NotNull
private RemoteMessageDispatcher getFailingDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
when(testDispatcher.dispatch(any(), any())).thenReturn(failedFuture(new EdcException("cannot send message")));
return testDispatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;

import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;

Expand Down Expand Up @@ -127,7 +128,7 @@ public DspHttpRemoteMessageDispatcher dspHttpRemoteMessageDispatcher(ServiceExte
registerNegotiationPolicyScopes(dispatcher);
registerTransferProcessPolicyScopes(dispatcher);
registerCatalogPolicyScopes(dispatcher);
dispatcherRegistry.register(dispatcher);
dispatcherRegistry.register(DATASPACE_PROTOCOL_HTTP, dispatcher);
return dispatcher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRemoteMessageDispatcher;
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.response.DspHttpResponseBodyExtractor;
import org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.iam.AudienceResolver;
import org.eclipse.edc.spi.iam.IdentityService;
Expand Down Expand Up @@ -78,11 +77,6 @@ public DspHttpRemoteMessageDispatcherImpl(EdcHttpClient httpClient,
this.audienceResolver = audienceResolver;
}

@Override
public String protocol() {
return HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
}

@Override
public <T, M extends RemoteMessage> CompletableFuture<StatusResult<T>> dispatch(Class<T> responseType, M message) {
var handler = (MessageHandler<M, T>) this.handlers.get(message.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
import static org.mockito.AdditionalMatchers.and;
Expand Down Expand Up @@ -100,11 +99,6 @@ void setUp() {
when(tokenDecorator.decorate(any())).thenAnswer(a -> a.getArgument(0));
}

@Test
void protocol_returnDsp() {
assertThat(dispatcher.protocol()).isEqualTo(DATASPACE_PROTOCOL_HTTP);
}

@Test
void dispatch_noScope() {
var authToken = "token";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;

import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;

@Extension(value = CallbackEventDispatcherHttpExtension.NAME)
public class CallbackEventDispatcherHttpExtension implements ServiceExtension {

Expand Down Expand Up @@ -55,13 +57,13 @@ public void initialize(ServiceExtensionContext context) {
var baseDispatcher = new GenericHttpRemoteDispatcherImpl(client);
baseDispatcher.registerDelegate(new CallbackEventRemoteMessageDispatcher(typeManager.getMapper(), vault));

registry.register(baseDispatcher);
registry.register(CALLBACK_EVENT_HTTP, baseDispatcher);
}


private String resolveScheme(String scheme) {
if (scheme.equalsIgnoreCase("https") || scheme.equalsIgnoreCase("http")) {
return GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;
return CALLBACK_EVENT_HTTP;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,11 @@ protected GenericHttpRemoteDispatcherImpl(EdcHttpClient httpClient) {
this.httpClient = httpClient;
}

@Override
public String protocol() {
return CALLBACK_EVENT_HTTP;
}

@Override
public <T, M extends RemoteMessage> CompletableFuture<StatusResult<T>> dispatch(Class<T> responseType, M message) {
var delegate = (GenericHttpDispatcherDelegate<M, T>) delegates.get(message.getClass());
if (delegate == null) {
throw new EdcException(format("No %s message dispatcher found for message type %s", protocol(), message.getClass()));
throw new EdcException(format("No %s message dispatcher found for message type %s", CALLBACK_EVENT_HTTP, message.getClass()));
}
var request = delegate.buildRequest(message);
return httpClient.executeAsync(request, emptyList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatcher;

import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -50,10 +50,7 @@ void setUp(ServiceExtensionContext context, ObjectFactory factory) {
void initialize_shouldRegisterBothDispatcher(ServiceExtensionContext context) {

extension.initialize(context);
verify(registry).register(argThat(dispatcher(CALLBACK_EVENT_HTTP)));
verify(registry).register(eq(CALLBACK_EVENT_HTTP), isA(GenericHttpRemoteDispatcherImpl.class));
}

private ArgumentMatcher<GenericHttpRemoteDispatcherImpl> dispatcher(String scheme) {
return dispatcher -> dispatcher.protocol().equals(scheme);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
*/
public interface RemoteMessageDispatcher {

/**
* Return the protocol this dispatcher uses.
*/
String protocol();


/**
* Binds and sends the message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface RemoteMessageDispatcherRegistry {
/**
* Registers a dispatcher.
*/
void register(RemoteMessageDispatcher dispatcher);
void register(String protocol, RemoteMessageDispatcher dispatcher);

/**
* Sends the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/**
* The resolver translate the scheme part {@link CallbackAddress#getUri()} to an internal
* naming of {@link RemoteMessageDispatcher#protocol()} ()}
* naming of {@link RemoteMessageDispatcher}
*/
@FunctionalInterface
public interface CallbackProtocolResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

/**
* Registry for {@link CallbackProtocolResolver} resolvers. The registry resolves the scheme part {@link CallbackAddress#getUri()} to an internal
* naming of {@link RemoteMessageDispatcher#protocol()}
* naming of {@link RemoteMessageDispatcher}
*/
@ExtensionPoint
public interface CallbackProtocolResolverRegistry {
Expand Down
Loading