Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into remote-detector-calls
Browse files Browse the repository at this point in the history
  • Loading branch information
engechas authored Feb 10, 2024
2 parents 3a88a0c + 9c4dd59 commit 4a20f75
Show file tree
Hide file tree
Showing 81 changed files with 2,218 additions and 502 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT
- name: Run Kafka integration tests
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests '*kafka.buffer*'
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT
- name: Upload Unit Test Results
if: always()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DefaultPluginFactory implements PluginFactory {
@Inject
DefaultPluginFactory(
final PluginProviderLoader pluginProviderLoader,
final PluginCreator pluginCreator,
@Named("pluginCreator") final PluginCreator pluginCreator,
final PluginConfigurationConverter pluginConfigurationConverter,
final PluginBeanFactoryProvider pluginBeanFactoryProvider,
final PluginConfigurationObservableFactory pluginConfigurationObservableFactory,
Expand Down Expand Up @@ -113,7 +113,7 @@ private <T> ComponentPluginArgumentsContext getConstructionContext(final PluginS
final Class<?> pluginConfigurationType = pluginAnnotation.pluginConfigurationType();
final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting);
final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginClass, pluginSetting);
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting);

return new ComponentPluginArgumentsContext.Builder()
.withPluginSetting(pluginSetting)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@
public class ExtensionLoader {
private final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter;
private final ExtensionClassProvider extensionClassProvider;
private final PluginCreator pluginCreator;
private final PluginCreator extensionPluginCreator;

@Inject
ExtensionLoader(
final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter,
final ExtensionClassProvider extensionClassProvider,
final PluginCreator pluginCreator) {
@Named("extensionPluginCreator") final PluginCreator extensionPluginCreator) {
this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter;
this.extensionClassProvider = extensionClassProvider;
this.pluginCreator = pluginCreator;
this.extensionPluginCreator = extensionPluginCreator;
}

List<? extends ExtensionPlugin> loadExtensions() {
return extensionClassProvider.loadExtensionPluginClasses()
.stream()
.map(extensionClass -> {
final PluginArgumentsContext pluginArgumentsContext = getConstructionContext(extensionClass);
return pluginCreator.newPluginInstance(extensionClass, pluginArgumentsContext, convertClassToName(extensionClass));
return extensionPluginCreator.newPluginInstance(
extensionClass, pluginArgumentsContext, convertClassToName(extensionClass));
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher;
import org.springframework.context.annotation.DependsOn;

import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -15,6 +16,7 @@
import java.util.Set;

@Named
@DependsOn("extensionsApplier")
public class PluginConfigurationObservableRegister {
private final Set<PluginConfigPublisher> pluginConfigPublishers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
Expand All @@ -22,13 +20,15 @@
import java.util.Optional;
import java.util.stream.Collectors;

@Named
class PluginCreator {
private static final Logger LOG = LoggerFactory.getLogger(PluginCreator.class);

private final PluginConfigurationObservableRegister pluginConfigurationObservableRegister;

@Inject
PluginCreator() {
this.pluginConfigurationObservableRegister = null;
}

PluginCreator(final PluginConfigurationObservableRegister pluginConfigurationObservableRegister) {
this.pluginConfigurationObservableRegister = pluginConfigurationObservableRegister;
}
Expand All @@ -45,7 +45,9 @@ <T> T newPluginInstance(final Class<T> pluginClass,

final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes(), args);

pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments);
if (pluginConfigurationObservableRegister != null) {
pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments);
}

try {
return (T) constructor.newInstance(constructorArguments);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.dataprepper.plugin;

import org.springframework.context.annotation.Bean;

import javax.inject.Named;

@Named
public class PluginCreatorContext {
@Bean(name = "extensionPluginCreator")
public PluginCreator observablePluginCreator() {
return new PluginCreator();
}

@Bean(name = "pluginCreator")
public PluginCreator pluginCreator(
final PluginConfigurationObservableRegister pluginConfigurationObservableRegister) {
return new PluginCreator(pluginConfigurationObservableRegister);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() {

assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting),
equalTo(expectedInstance));
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
verify(beanFactoryProvider).get();
}

Expand Down Expand Up @@ -254,6 +256,8 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_
baseClass, pluginSetting, c -> 1);

verify(beanFactoryProvider).get();
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName));
final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue();
Expand Down Expand Up @@ -281,6 +285,8 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number
final Object plugin = createObjectUnderTest().loadPlugin(baseClass, pluginSetting, object);

verify(beanFactoryProvider).get();
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName), eq(object));
final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ class ExtensionLoaderTest {
@Mock
private ExtensionClassProvider extensionClassProvider;
@Mock
private PluginCreator pluginCreator;
private PluginCreator extensionPluginCreator;
@Captor
private ArgumentCaptor<PluginArgumentsContext> pluginArgumentsContextArgumentCaptor;

private ExtensionLoader createObjectUnderTest() {
return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, pluginCreator);
return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, extensionPluginCreator);
}

@Test
Expand All @@ -74,7 +74,7 @@ void loadExtensions_returns_single_extension_for_single_plugin_class() {
when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn(Collections.singleton(pluginClass));

final ExtensionPlugin expectedPlugin = mock(ExtensionPlugin.class);
when(pluginCreator.newPluginInstance(
when(extensionPluginCreator.newPluginInstance(
eq(pluginClass),
any(PluginArgumentsContext.class),
startsWith("extension_plugin")))
Expand All @@ -98,15 +98,15 @@ void loadExtensions_returns_single_extension_with_config_for_single_plugin_class
final String expectedPluginName = "test_extension_with_config";
when(extensionPluginConfigurationConverter.convert(eq(true), eq(TestExtensionConfig.class),
eq("/test_extension"))).thenReturn(testExtensionConfig);
when(pluginCreator.newPluginInstance(
when(extensionPluginCreator.newPluginInstance(
eq(TestExtensionWithConfig.class),
any(PluginArgumentsContext.class),
eq(expectedPluginName)))
.thenReturn(expectedPlugin);

final List<? extends ExtensionPlugin> extensionPlugins = createObjectUnderTest().loadExtensions();

verify(pluginCreator).newPluginInstance(eq(TestExtensionWithConfig.class),
verify(extensionPluginCreator).newPluginInstance(eq(TestExtensionWithConfig.class),
pluginArgumentsContextArgumentCaptor.capture(), eq(expectedPluginName));
assertThat(pluginArgumentsContextArgumentCaptor.getValue(), instanceOf(
ExtensionLoader.SingleConfigArgumentArgumentsContext.class));
Expand All @@ -128,7 +128,7 @@ void loadExtensions_returns_multiple_extensions_for_multiple_plugin_classes() {
final String expectedPluginName = ExtensionLoader.classNameToPluginName(pluginClass.getSimpleName());
final ExtensionPlugin extensionPlugin = mock((Class<ExtensionPlugin>)pluginClass);

when(pluginCreator.newPluginInstance(
when(extensionPluginCreator.newPluginInstance(
eq(pluginClass),
any(PluginArgumentsContext.class),
eq(expectedPluginName)))
Expand Down Expand Up @@ -156,7 +156,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_not_su

when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn(Collections.singleton(pluginClass));

when(pluginCreator.newPluginInstance(
when(extensionPluginCreator.newPluginInstance(
any(Class.class),
any(PluginArgumentsContext.class),
anyString()))
Expand All @@ -166,7 +166,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_not_su

final ArgumentCaptor<PluginArgumentsContext> contextArgumentCaptor =
ArgumentCaptor.forClass(PluginArgumentsContext.class);
verify(pluginCreator).newPluginInstance(
verify(extensionPluginCreator).newPluginInstance(
eq(pluginClass),
contextArgumentCaptor.capture(),
anyString());
Expand All @@ -183,7 +183,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_which_

when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn(Collections.singleton(pluginClass));

when(pluginCreator.newPluginInstance(
when(extensionPluginCreator.newPluginInstance(
any(Class.class),
any(PluginArgumentsContext.class),
anyString()))
Expand All @@ -193,7 +193,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_which_

final ArgumentCaptor<PluginArgumentsContext> contextArgumentCaptor =
ArgumentCaptor.forClass(PluginArgumentsContext.class);
verify(pluginCreator).newPluginInstance(
verify(extensionPluginCreator).newPluginInstance(
eq(pluginClass),
contextArgumentCaptor.capture(),
any());
Expand Down
Loading

0 comments on commit 4a20f75

Please sign in to comment.