-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #37 from Tradeshift/anyOf
Introduce "allOf", as antonym to the existing "alternatively"
- Loading branch information
Showing
8 changed files
with
170 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
57 changes: 57 additions & 0 deletions
57
ts-reaktive-marshal/src/main/java/com/tradeshift/reaktive/marshal/CombinedProtocol.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package com.tradeshift.reaktive.marshal; | ||
|
||
import static com.tradeshift.reaktive.marshal.ReadProtocol.isNone; | ||
import static com.tradeshift.reaktive.marshal.ReadProtocol.none; | ||
|
||
import java.util.function.Function; | ||
|
||
import javaslang.collection.Seq; | ||
import javaslang.collection.Vector; | ||
import javaslang.control.Try; | ||
|
||
/** | ||
* Forwards read events to multiple alternative protocols, emitting whenever any of the alternatives emit. | ||
* If multiple alternatives emit for the same event, all results are emitted. | ||
* If at least one alternative emits for an event, any errors on other alternatives are ignored. | ||
* If all alternatives yield errors for an event, the errors are concatenated and escalated. | ||
*/ | ||
public class CombinedProtocol<E,T> implements ReadProtocol<E,Seq<T>> { | ||
private final Seq<ReadProtocol<E,T>> alternatives; | ||
|
||
public CombinedProtocol(Seq<ReadProtocol<E,T>> alternatives) { | ||
this.alternatives = alternatives; | ||
} | ||
|
||
@Override | ||
public Reader<E,Seq<T>> reader() { | ||
Seq<Reader<E,T>> readers = alternatives.map(p -> p.reader()); | ||
return new Reader<E,Seq<T>>() { | ||
@Override | ||
public Try<Seq<T>> reset() { | ||
return perform(r -> r.reset()); | ||
} | ||
|
||
@Override | ||
public Try<Seq<T>> apply(E evt) { | ||
return perform(r -> r.apply(evt)); | ||
} | ||
|
||
private Try<Seq<T>> perform(Function<Reader<E,T>, Try<T>> f) { | ||
Try<Seq<T>> result = none(); | ||
for (Reader<E,T> reader: readers) { | ||
Try<T> readerResult = f.apply(reader); | ||
if (!isNone(readerResult)) { | ||
if (isNone(result) || (result.isFailure() && readerResult.isSuccess())) { | ||
result = readerResult.map(Vector::of); | ||
} else if (!result.isFailure() && readerResult.isSuccess()) { | ||
result = result.map(seq -> seq.append(readerResult.get())); | ||
} else if (readerResult.isFailure() && result.isFailure()) { | ||
result = Try.failure(new IllegalArgumentException(result.failed().get().getMessage() + ", alternatively " + readerResult.failed().get().getMessage())); | ||
} | ||
} | ||
} | ||
return result; | ||
} | ||
}; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
ts-reaktive-marshal/src/test/java/com/tradeshift/reaktive/marshal/CombinedProtocolSpec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package com.tradeshift.reaktive.marshal; | ||
|
||
import static com.tradeshift.reaktive.json.JSONProtocol.integerValue; | ||
import static com.tradeshift.reaktive.json.JSONProtocol.stringValue; | ||
import static com.tradeshift.reaktive.marshal.Protocol.combine; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
import static org.forgerock.cuppa.Cuppa.describe; | ||
import static org.forgerock.cuppa.Cuppa.it; | ||
|
||
import org.forgerock.cuppa.junit.CuppaRunner; | ||
import org.junit.runner.RunWith; | ||
|
||
import com.tradeshift.reaktive.json.JSONEvent; | ||
import com.tradeshift.reaktive.json.jackson.Jackson; | ||
|
||
import javaslang.collection.Seq; | ||
|
||
@RunWith(CuppaRunner.class) | ||
public class CombinedProtocolSpec {{ | ||
describe("Protocol.allOf", () -> { | ||
Jackson jackson = new Jackson(); | ||
|
||
it("should emit multiple results if multiple readers emit on the same event", () -> { | ||
ReadProtocol<JSONEvent, Seq<Integer>> protocol = combine( | ||
integerValue, | ||
integerValue.map(i -> i * 2) | ||
); | ||
|
||
assertThat(jackson.parse("42", protocol.reader()).findFirst().get()).containsExactly(42, 84); | ||
}); | ||
|
||
it("should emit an event if one reader emits and another yields an error", () -> { | ||
ReadProtocol<JSONEvent, Seq<Object>> protocol = combine( | ||
stringValue.map(s -> (Object) s), | ||
integerValue.map(i -> (Object) i) | ||
); | ||
|
||
assertThat(jackson.parse("\"hello\"", protocol.reader()).findFirst().get()).containsExactly("hello"); | ||
}); | ||
|
||
it("should yield an error if all readers yield errors", () -> { | ||
ReadProtocol<JSONEvent, Seq<Integer>> protocol = combine( | ||
integerValue, | ||
integerValue.map(i -> i * 2) | ||
); | ||
|
||
assertThatThrownBy(() -> jackson.parse("\"hello\"", protocol.reader())).hasMessageContaining("Expecting signed 32-bit integer"); | ||
}); | ||
}); | ||
}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters