Skip to content

Commit

Permalink
added abstract RefreshableServiceProviderPlace
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-mlb committed Jan 14, 2025
1 parent 379db5e commit 40e4ccb
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 3 deletions.
141 changes: 141 additions & 0 deletions src/main/java/emissary/place/RefreshableServiceProviderPlace.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package emissary.place;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;

import org.apache.commons.collections4.CollectionUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* ServiceProviderPlace that supports on-demand refresh of its configuration
*/
public abstract class RefreshableServiceProviderPlace extends ServiceProviderPlace {

private final AtomicBoolean invalidated = new AtomicBoolean(false);

public RefreshableServiceProviderPlace() throws IOException {}

public RefreshableServiceProviderPlace(final String thePlaceLocation) throws IOException {
super(thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final String configFile, @Nullable final String theDir, final String thePlaceLocation)
throws IOException {
super(configFile, theDir, thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream, @Nullable final String theDir, final String thePlaceLocation)
throws IOException {
super(configStream, theDir, thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream) throws IOException {
super(configStream);
}

protected RefreshableServiceProviderPlace(final String configFile, final String placeLocation) throws IOException {
super(configFile, placeLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream, final String placeLocation) throws IOException {
super(configStream, placeLocation);
}

/**
* Get the invalid flag of the place. An invalidated place may indicate that the place has changes, such as new
* configuration, and may trigger a follow-on process to reconfigure, reinitialize, or re-create the place.
*
* @return true if the place has been invalidated, false otherwise
*/
public final boolean isInvalidated() {
return this.invalidated.get();
}

/**
* Invalidate a place that need to be refreshed.
*/
public final void invalidate() {
setInvalidated(true);
}

/**
* Set the invalid flag of the place
*
* @param invalid true if place is invalid, false otherwise
*/
private void setInvalidated(final boolean invalid) {
this.invalidated.set(invalid);
}

/**
* Reinitialize the place by reloading the configurator and reconfiguring the place. Must call {@link #invalidate()}
* before attempting to refresh the place.
*/
public final synchronized void refresh() {
try {
if (isInvalidated()) {
this.configG = reloadConfigurator(this.configLocs);
reconfigurePlace();
setInvalidated(false);
} else {
logger.warn("Cannot refresh place configuration without first calling invalidate; no reconfiguration performed");
}
} catch (IOException e) {
logger.error("Failed to reload configurator");
}
}

/**
* Reinitialize the place by reloading the configurator and reconfiguring the place. Must call {@link #invalidate()}
* before attempting to refresh the place.
*
* @param configStream the config data as an {@link InputStream}
*/
public final synchronized void refresh(final InputStream configStream) {
try {
if (isInvalidated()) {
this.configG = reloadConfigurator(configStream);
reconfigurePlace();
setInvalidated(false);
} else {
logger.warn("Cannot refresh place configuration with configStream without first calling invalidate; no reconfiguration performed");
}
} catch (IOException e) {
logger.error("Failed to reload configStream");
}
}

protected abstract void reconfigurePlace() throws IOException;

/**
* Reload the {@link Configurator}
*
* @param configLocations the list of configuration files to load
* @throws IOException if there is an issue loading the config
*/
private static Configurator reloadConfigurator(@Nullable final List<String> configLocations) throws IOException {
if (CollectionUtils.isNotEmpty(configLocations)) {
return ConfigUtil.getConfigInfo(configLocations);
}
throw new IOException("No config locations specified");
}

/**
* Reload the {@link Configurator}
*
* @param configStream the stream of configuration data
* @throws IOException if there is an issue loading the config
*/
private static Configurator reloadConfigurator(@Nullable final InputStream configStream) throws IOException {
if (configStream != null) {
return ConfigUtil.getConfigInfo(configStream);
}
throw new IOException("Null config stream supplied");
}

}
8 changes: 5 additions & 3 deletions src/main/java/emissary/place/ServiceProviderPlace.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public abstract class ServiceProviderPlace implements IServiceProviderPlace,
@Nullable
protected Configurator configG;

protected final List<String> configLocs = new ArrayList<>();

/**
* A <i><b>local</b></i> reference to the directory that this place resides in. Every JVM that contains 'places' must
* have a local directory
Expand Down Expand Up @@ -110,6 +112,8 @@ public abstract class ServiceProviderPlace implements IServiceProviderPlace,
@Nullable
protected String serviceDescription;

protected String placeLocation;

/**
* Static context logger
*/
Expand Down Expand Up @@ -253,12 +257,11 @@ protected Configurator loadConfigurator(@Nullable String placeLocation) throws I
if (placeLocation == null) {
placeLocation = this.getClass().getSimpleName();
}

this.placeLocation = placeLocation;
// Extract config data stream name from place location
// and try finding config info with and without the
// package name of this class (in that order)
String myPackage = this.getClass().getPackage().getName();
List<String> configLocs = new ArrayList<>();
// Dont use KeyManipulator for this, only works when hostname/fqdn has dots
int pos = placeLocation.lastIndexOf("/");
String serviceClass = (pos > -1 ? placeLocation.substring(pos + 1) : placeLocation);
Expand Down Expand Up @@ -925,7 +928,6 @@ protected void deregisterFromDirectory(List<String> keys) {
}
}


/**
* Remove a service proxy from the running place. Proxy strings not found registered will be ignored Will remove all
* keys that match the supplied proxy
Expand Down
116 changes: 116 additions & 0 deletions src/test/java/emissary/place/RefreshableServiceProviderPlaceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package emissary.place;

import emissary.core.IBaseDataObject;
import emissary.core.Namespace;
import emissary.directory.DirectoryEntry;
import emissary.test.core.junit5.UnitTest;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class RefreshableServiceProviderPlaceTest extends UnitTest {

private static final byte[] cfgData = ("SERVICE_KEY = \"UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest$6050\"\n" +
"KEY_1 = 200").getBytes();
private static final byte[] cfgDataReload = ("SERVICE_KEY = \"*.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest$5060\"\n" +
"KEY_1 = 300").getBytes();

@Nullable
private RefreshablePlaceTest place = null;

@Override
@BeforeEach
public void setUp() throws Exception {
InputStream config = new ByteArrayInputStream(cfgData);
place = new RefreshablePlaceTest(config, null, "http://localhost:8001/RefreshablePlaceTest");
}

@Override
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
assertNotNull(place);
place.shutDown();
place = null;
}

@Test
void testReconfigure() {
assertNotNull(place, "Place created and configured");
assertEquals("RefreshablePlaceTest", place.getPlaceName(), "Configured place name");
assertEquals("UNKNOWN", place.getPrimaryProxy(), "Primary proxy");
assertEquals("UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest", place.getKey(), "Key generation");
DirectoryEntry de = place.getDirectoryEntry();
assertNotNull(de, "Directory entry");
assertEquals(60, de.getCost(), "Cost in directory entry");
assertEquals(50, de.getQuality(), "Quality in directory entry");
assertEquals("Description not available", de.getDescription(), "Description in directory entry");
assertNotNull(place.configG);
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));
assertDoesNotThrow(() -> Namespace.lookup("http://localhost:8001/RefreshablePlaceTest"));

place.invalidate();
place.refresh(new ByteArrayInputStream(cfgDataReload));
assertNotNull(place, "Place created and configured");
assertEquals("RefreshablePlaceTest", place.getPlaceName(), "Configured place name");
// assertEquals("*", placeTest.getPrimaryProxy(), "Primary proxy");
assertEquals("UNKNOWN", place.getPrimaryProxy(), "Primary proxy");
// assertEquals("*.TEST_PLACE.ID.http://localhost:8001/PlaceTest", placeTest.getKey(), "Key generation");
assertEquals("UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest", place.getKey(), "Key generation");
de = place.getDirectoryEntry();
assertNotNull(de, "Directory entry");
// assertEquals(50, de.getCost(), "Cost in directory entry");
assertEquals(60, de.getCost(), "Cost in directory entry");
// assertEquals(40, de.getQuality(), "Quality in directory entry");
assertEquals(50, de.getQuality(), "Quality in directory entry");
assertEquals("Description not available", de.getDescription(), "Description in directory entry");
assertNotNull(place.configG);
assertEquals(300, place.configG.findIntEntry("KEY_1", 0));
assertDoesNotThrow(() -> Namespace.lookup("http://localhost:8001/RefreshablePlaceTest"));
}

@Test
void testInvalidate() {
assertNotNull(place, "Place created and configured");
assertFalse(place.isInvalidated());
assertNotNull(place.configG);
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));

place.refresh(new ByteArrayInputStream(cfgDataReload));
assertFalse(place.isInvalidated());
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));

place.invalidate();
assertTrue(place.isInvalidated());
place.refresh(new ByteArrayInputStream(cfgDataReload));
assertFalse(place.isInvalidated());
assertEquals(300, place.configG.findIntEntry("KEY_1", 0));
}

private static final class RefreshablePlaceTest extends RefreshableServiceProviderPlace {

public RefreshablePlaceTest(InputStream config, @Nullable String dir, @Nullable String loc) throws IOException {
super(config, dir, loc);
}

@Override
public void process(IBaseDataObject d) {
assertNotNull(d);
}

@Override
protected void reconfigurePlace() {}
}
}

0 comments on commit 40e4ccb

Please sign in to comment.