Skip to content

Commit

Permalink
Merge pull request #179 from entur/improve/mdc
Browse files Browse the repository at this point in the history
Improve/mdc
  • Loading branch information
testower authored Sep 9, 2024
2 parents 46e726a + 0cf8c9c commit 7235e48
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 59 deletions.
49 changes: 44 additions & 5 deletions src/main/java/org/entur/gbfs/GbfsSubscriptionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ public String subscribeV2(
return subscribe(new GbfsV2Subscription(options, consumer));
}

/**
* Start a subscription on a GBFS v2.x feed
* <p>
* Since v2.x is backwards-compatible with v1.x, v1.x feeds can also be
* consumed with this subscription.
* </p>
*
* @param options Options
* @param consumer A consumer that will handle receiving updates from the loader
* @param updateInterceptor A subscription update interceptor
* @return A string identifier
*/
public String subscribeV2(
GbfsSubscriptionOptions options,
Consumer<GbfsV2Delivery> consumer,
SubscriptionUpdateInterceptor updateInterceptor
) {
return subscribe(new GbfsV2Subscription(options, consumer, updateInterceptor));
}

/**
* Start a subscription on a GBFS v3.x feed
*
Expand All @@ -80,15 +100,34 @@ public String subscribeV3(
return subscribe(new GbfsV3Subscription(options, consumer));
}

/**
* Start a subscription on a GBFS v3.x feed
*
* @param options Options
* @param consumer A consumer that will handle receiving updates from the loader}
* @param updateInterceptor A subscription update interceptor
* @return A string identifier
*/
public String subscribeV3(
GbfsSubscriptionOptions options,
Consumer<GbfsV3Delivery> consumer,
SubscriptionUpdateInterceptor updateInterceptor
) {
return subscribe(new GbfsV3Subscription(options, consumer, updateInterceptor));
}

/**
* Update all subscriptions
*/
public void update() {
Optional
.ofNullable(customThreadPool)
.orElse(ForkJoinPool.commonPool())
.execute(() ->
subscriptions.values().parallelStream().forEach(GbfsSubscription::update)
subscriptions
.values()
.parallelStream()
.forEach(subscription ->
Optional
.ofNullable(customThreadPool)
.orElse(ForkJoinPool.commonPool())
.execute(subscription::update)
);
}

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/entur/gbfs/SubscriptionUpdateInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.entur.gbfs;

/**
* An interceptor that can be notified before and after a subscription's update
*/
public interface SubscriptionUpdateInterceptor {
/**
* Called before a subscription is updated
*/
void beforeUpdate();

/**
* Called after a subscription was updated
*/
void afterUpdate();
}
8 changes: 0 additions & 8 deletions src/main/java/org/entur/gbfs/http/FailedToGetException.java

This file was deleted.

8 changes: 0 additions & 8 deletions src/main/java/org/entur/gbfs/http/InvalidURLException.java

This file was deleted.

70 changes: 50 additions & 20 deletions src/main/java/org/entur/gbfs/loader/v2/GbfsV2Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.function.Consumer;
import org.entur.gbfs.GbfsSubscriptionOptions;
import org.entur.gbfs.SubscriptionUpdateInterceptor;
import org.entur.gbfs.loader.GbfsSubscription;
import org.entur.gbfs.validation.GbfsValidator;
import org.entur.gbfs.validation.GbfsValidatorFactory;
Expand All @@ -42,14 +43,19 @@
import org.mobilitydata.gbfs.v2_3.system_pricing_plans.GBFSSystemPricingPlans;
import org.mobilitydata.gbfs.v2_3.system_regions.GBFSSystemRegions;
import org.mobilitydata.gbfs.v2_3.vehicle_types.GBFSVehicleTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to represent a subscription to GBFS feeds for a single system
*/
public class GbfsV2Subscription implements GbfsSubscription {

private static final Logger LOG = LoggerFactory.getLogger(GbfsV2Subscription.class);

private final GbfsSubscriptionOptions subscriptionOptions;
private final Consumer<GbfsV2Delivery> consumer;
private final SubscriptionUpdateInterceptor updateInterceptor;
private GbfsV2Loader loader;

public GbfsV2Subscription(
Expand All @@ -58,6 +64,17 @@ public GbfsV2Subscription(
) {
this.subscriptionOptions = subscriptionOptions;
this.consumer = consumer;
this.updateInterceptor = null;
}

public GbfsV2Subscription(
GbfsSubscriptionOptions subscriptionOptions,
Consumer<GbfsV2Delivery> consumer,
SubscriptionUpdateInterceptor updateInterceptor
) {
this.subscriptionOptions = subscriptionOptions;
this.consumer = consumer;
this.updateInterceptor = updateInterceptor;
}

/**
Expand Down Expand Up @@ -87,26 +104,39 @@ public boolean getSetupComplete() {
* to the consumer if the update had changes
*/
public void update() {
if (loader.update()) {
GbfsV2Delivery delivery = new GbfsV2Delivery(
loader.getDiscoveryFeed(),
loader.getFeed(GBFSGbfsVersions.class),
loader.getFeed(GBFSSystemInformation.class),
loader.getFeed(GBFSVehicleTypes.class),
loader.getFeed(GBFSStationInformation.class),
loader.getFeed(GBFSStationStatus.class),
loader.getFeed(GBFSFreeBikeStatus.class),
loader.getFeed(GBFSSystemHours.class),
loader.getFeed(GBFSSystemCalendar.class),
loader.getFeed(GBFSSystemRegions.class),
loader.getFeed(GBFSSystemPricingPlans.class),
loader.getFeed(GBFSSystemAlerts.class),
loader.getFeed(GBFSGeofencingZones.class),
Boolean.TRUE.equals(subscriptionOptions.enableValidation())
? validateFeeds()
: null
);
consumer.accept(delivery);
if (updateInterceptor != null) {
updateInterceptor.beforeUpdate();
}

try {
if (loader.update()) {
GbfsV2Delivery delivery = new GbfsV2Delivery(
loader.getDiscoveryFeed(),
loader.getFeed(GBFSGbfsVersions.class),
loader.getFeed(GBFSSystemInformation.class),
loader.getFeed(GBFSVehicleTypes.class),
loader.getFeed(GBFSStationInformation.class),
loader.getFeed(GBFSStationStatus.class),
loader.getFeed(GBFSFreeBikeStatus.class),
loader.getFeed(GBFSSystemHours.class),
loader.getFeed(GBFSSystemCalendar.class),
loader.getFeed(GBFSSystemRegions.class),
loader.getFeed(GBFSSystemPricingPlans.class),
loader.getFeed(GBFSSystemAlerts.class),
loader.getFeed(GBFSGeofencingZones.class),
Boolean.TRUE.equals(subscriptionOptions.enableValidation())
? validateFeeds()
: null
);
consumer.accept(delivery);
}
} catch (RuntimeException e) {
LOG.error("Exception occurred during update", e);
throw e;
} finally {
if (updateInterceptor != null) {
updateInterceptor.afterUpdate();
}
}
}

Expand Down
66 changes: 48 additions & 18 deletions src/main/java/org/entur/gbfs/loader/v3/GbfsV3Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.function.Consumer;
import org.entur.gbfs.GbfsSubscriptionOptions;
import org.entur.gbfs.SubscriptionUpdateInterceptor;
import org.entur.gbfs.loader.GbfsSubscription;
import org.entur.gbfs.validation.GbfsValidator;
import org.entur.gbfs.validation.GbfsValidatorFactory;
Expand All @@ -40,14 +41,19 @@
import org.mobilitydata.gbfs.v3_0.system_regions.GBFSSystemRegions;
import org.mobilitydata.gbfs.v3_0.vehicle_status.GBFSVehicleStatus;
import org.mobilitydata.gbfs.v3_0.vehicle_types.GBFSVehicleTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to represent a subscription to GBFS feeds for a single system
*/
public class GbfsV3Subscription implements GbfsSubscription {

private static final Logger LOG = LoggerFactory.getLogger(GbfsV3Subscription.class);

private final GbfsSubscriptionOptions subscriptionOptions;
private final Consumer<GbfsV3Delivery> consumer;
private final SubscriptionUpdateInterceptor updateInterceptor;
private GbfsV3Loader loader;

public GbfsV3Subscription(
Expand All @@ -56,6 +62,17 @@ public GbfsV3Subscription(
) {
this.subscriptionOptions = subscriptionOptions;
this.consumer = consumer;
this.updateInterceptor = null;
}

public GbfsV3Subscription(
GbfsSubscriptionOptions subscriptionOptions,
Consumer<GbfsV3Delivery> consumer,
SubscriptionUpdateInterceptor updateInterceptor
) {
this.subscriptionOptions = subscriptionOptions;
this.consumer = consumer;
this.updateInterceptor = updateInterceptor;
}

/**
Expand Down Expand Up @@ -84,24 +101,37 @@ public boolean getSetupComplete() {
* to the consumer if the update had changes
*/
public void update() {
if (loader.update()) {
GbfsV3Delivery delivery = new GbfsV3Delivery(
loader.getDiscoveryFeed(),
loader.getFeed(GBFSGbfsVersions.class),
loader.getFeed(GBFSSystemInformation.class),
loader.getFeed(GBFSVehicleTypes.class),
loader.getFeed(GBFSStationInformation.class),
loader.getFeed(GBFSStationStatus.class),
loader.getFeed(GBFSVehicleStatus.class),
loader.getFeed(GBFSSystemRegions.class),
loader.getFeed(GBFSSystemPricingPlans.class),
loader.getFeed(GBFSSystemAlerts.class),
loader.getFeed(GBFSGeofencingZones.class),
Boolean.TRUE.equals(subscriptionOptions.enableValidation())
? validateFeeds()
: null
);
consumer.accept(delivery);
if (updateInterceptor != null) {
updateInterceptor.beforeUpdate();
}

try {
if (loader.update()) {
GbfsV3Delivery delivery = new GbfsV3Delivery(
loader.getDiscoveryFeed(),
loader.getFeed(GBFSGbfsVersions.class),
loader.getFeed(GBFSSystemInformation.class),
loader.getFeed(GBFSVehicleTypes.class),
loader.getFeed(GBFSStationInformation.class),
loader.getFeed(GBFSStationStatus.class),
loader.getFeed(GBFSVehicleStatus.class),
loader.getFeed(GBFSSystemRegions.class),
loader.getFeed(GBFSSystemPricingPlans.class),
loader.getFeed(GBFSSystemAlerts.class),
loader.getFeed(GBFSGeofencingZones.class),
Boolean.TRUE.equals(subscriptionOptions.enableValidation())
? validateFeeds()
: null
);
consumer.accept(delivery);
}
} catch (RuntimeException e) {
LOG.error("Exception occurred during update", e);
throw e;
} finally {
if (updateInterceptor != null) {
updateInterceptor.afterUpdate();
}
}
}

Expand Down
51 changes: 51 additions & 0 deletions src/test/java/org/entur/gbfs/GBFSSubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.entur.gbfs.loader.v2.GbfsV2Delivery;
import org.entur.gbfs.loader.v3.GbfsV3Delivery;
Expand All @@ -27,6 +28,31 @@ void testSubscription() throws URISyntaxException, InterruptedException {
loader.unsubscribe(subscriber);
}

@Test
void testSubscriptionUpdateInterceptor()
throws URISyntaxException, InterruptedException {
waiter = new CountDownLatch(3);
GbfsSubscriptionManager loader = new GbfsSubscriptionManager();
String subscriber = loader.subscribeV2(
getTestOptions("file:src/test/resources/gbfs/lillestrombysykkel/gbfs.json", "nb"),
getTestConsumer(),
new SubscriptionUpdateInterceptor() {
@Override
public void beforeUpdate() {
waiter.countDown();
}

@Override
public void afterUpdate() {
waiter.countDown();
}
}
);
loader.update();
Assertions.assertTrue(waiter.await(1, TimeUnit.SECONDS));
loader.unsubscribe(subscriber);
}

@Test
void testSubscriptionWithCustomThreadPool()
throws URISyntaxException, InterruptedException {
Expand Down Expand Up @@ -83,6 +109,31 @@ void testV3Subscription() throws URISyntaxException, InterruptedException {
loader.unsubscribe(subscriber);
}

@Test
void testV3SubscriptionUpdateInterceptor()
throws URISyntaxException, InterruptedException {
waiter = new CountDownLatch(3);
GbfsSubscriptionManager loader = new GbfsSubscriptionManager();
String subscriber = loader.subscribeV3(
getV3TestOptions("file:src/test/resources/gbfs/v3/getaroundstavanger/gbfs.json"),
getV3TestConsumer(),
new SubscriptionUpdateInterceptor() {
@Override
public void beforeUpdate() {
waiter.countDown();
}

@Override
public void afterUpdate() {
waiter.countDown();
}
}
);
loader.update();
Assertions.assertTrue(waiter.await(1, TimeUnit.SECONDS));
loader.unsubscribe(subscriber);
}

Consumer<GbfsV3Delivery> getV3TestConsumer() {
return delivery -> {
Assertions.assertNotNull(delivery);
Expand Down

0 comments on commit 7235e48

Please sign in to comment.