-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Attestation pool take 1: interop #176
base: interop
Are you sure you want to change the base?
Conversation
|
chain/src/main/java/org/ethereum/beacon/chain/pool/reactor/IdentificationProcessor.java
Outdated
Show resolved
Hide resolved
* <p>Implementor MAY throw an {@link AssertionError} if it's been called before inner state has | ||
* been initialised. | ||
*/ | ||
public interface StatefulProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the purpose of this interface.
You either assert
that a component is inited or just skip an entry if not inited. Couldn't this be the internal logic of this component?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, we have checkers which returns value of boolean
type saying whether check passed or not. These checkers may have several inputs and state initialized from either one or all of those inputs. Since, check MUST returns true/false
there is no room for merely discarding the value fed to a checker. We might use Optional<Boolean>
for that but I am not sure that it would be a better choice.
Regarding a component which is statefull and has several inbound streams - I'm still not sure what should be the proper pattern for it |
I agree with that. It looks like Reactor is not meant to handle stateful processors and was created for another purposes. This way it looks more like a misuse. Let's keep it for now as it is and replace Reactor with a proper thing later. |
public IdentificationProcessor( | ||
UnknownAttestationPool pool, | ||
Schedulers schedulers, | ||
Flux<ReceivedAttestation> source, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider using more general Publisher
as an input. I believe Flux.from(Publisher)
shortcuts the call when the parameter is Flux
so this should be for free
} | ||
} | ||
|
||
public DirectProcessor<ReceivedAttestation> getUnknownAttestations() { | ||
return unknownAttestations; | ||
public OutsourcePublisher<ReceivedAttestation> getIdentified() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't expose OutsourcePublisher
implementation until the caller is supposed to write items to this stream. Wouldn't returning Publisher
suits?
} | ||
} | ||
|
||
public Flux<ReceivedAttestation> getValid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here, m.b. just Publisher
?
public Flux<ReceivedAttestation> getInvalid() { | ||
return invalid; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't you finally end up with the laconic form as we discussed?
validatedAtt = att.map(att -> Pair.of(att, checker.check(att))).publish()
validAtt = validatedAtt.filter(a -> a.right).map(a -> a.left)
invalidAtt = validatedAtt.filter(a -> !a.right).map(a -> a.left)
@Override | ||
public void subscribe(CoreSubscriber<? super ReceivedAttestation> actual) { | ||
out.subscribe(actual); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not expose the Publisher<ReceivedAttestation>
here to make it uniform across the whole lib?
The general feeling is that using RX shouldn't require such amount of boilerplate code |
…con-chain-java into feature/attestation-pool
…con-chain-java into feature/attestation-pool
…/fork-choice-issues
(i.e. lexicographically, from left to right, unsigned bytes).
Fix/fork choice issues
No description provided.