There are many ways to create an operator for RxJS. In this version of RxJS, performance was the primary consideration, as such, operator creation in a way that adheres to the existing structures in this library may not be straight forward. This is an attempt to document how to create an operator either for yourself, or for this library.
For how to develop a custom operator for this library, see below.
In the most common case, users might like to create an operator to be used only by their app. These can be developed in any way the developer sees fit, but here are some guidelines:
- Operators should always return an Observable. You're performing operations on unknown sets of things to create new sets. It only makes sense to return a new set. If you create a method that returns something other than an Observable, it's not an operator, and that's fine.
- Be sure to manage subscriptions created inside of the Observable your operator returns. Your operator is going to have to
subscribe to the source (or
this
) inside of the returned Observable, be sure that it's returned as part of unsubscribe handler or subscription. - Be sure to handle exceptions from passed functions. If you're implementing an operator that takes a function as an argument,
when you call it, you'll want to wrap it in a
try/catch
and send the error down theerror()
path on the observable. - Be sure to teardown scarce resources in your unsubscribe handler of your returned Observable. If you're setting up event handlers or a web socket, or something like that, the unsubscribe handler is a great place to remove that event handler or close that socket.
function mySimpleOperator(someCallback) {
// We *could* do a `var self = this;` here to close over, but see next comment
return Observable.create(subscriber => {
// because we're in an arrow function `this` is from the outer scope.
var source = this;
// save our inner subscription
var subscription = source.subscribe(value => {
// important: catch errors from user-provided callbacks
try {
subscriber.next(someCallback(value));
} catch(err) {
subscriber.error(err);
}
},
// be sure to handle errors and completions as appropriate and
// send them along
err => subscriber.error(err),
() => subscriber.complete());
// to return now
return subscription;
});
}
There are a few ways to do this. It's really down to needs and preference:
- Use the ES7 function bind operator (
::
) available in transpilers like BabelJS:
someObservable::mySimpleOperator(x => x + '!');
- Create your own Observable subclass and override
lift
to return it:
class MyObservable extends Observable {
lift(operator) {
const observable = new MyObservable(); //<-- important part here
observable.source = this;
observable.operator = operator;
return observable;
}
// put it here .. or ..
customOperator() {
/* do things and return an Observable */
}
}
// ... put it here...
MyObservable.prototype.mySimpleOperator = mySimpleOperator;
- Patch
Observable.prototype
directly:
Observable.prototype.mySimpleOperator = mySimpleOperator;
// ... and later .../
someObservable.mySimpleOperator(x => x + '!');
If you don't want to patch the Observable prototype, you can also write the operator as a pure function that takes the input Observable as argument, instead of relying on the this
keyword.
Example implementation:
function mySimpleOperator(someCallback) {
// notice that we return a function here
return function mySimpleOperatorImplementation(source) {
return Observable.create(subscriber => {
var subscription = source.subscribe(value => {
try {
subscriber.next(someCallback(value));
} catch(err) {
subscriber.error(err);
}
},
err => subscriber.error(err),
() => subscriber.complete());
return subscription;
});
}
}
This can now be used with the pipe()
method on the Observable:
const obs = someObservable.pipe(mySimpleOperator(x => x + '!'));
We strongly recommend you to publish your custom operator as an npm package as a pure function to be used with let
. See the sections above. RxJS core already has over 100 operators, and we should not add more operators unless they are absolutely essential and add functionality not possible with existing operators.
Publishing it as a separate library will guarantee your operator to be immediately usable by the community, and we can start growing an RxJS ecosystem as opposed to making the RxJS library thicker and heavier. There are cases, however, where the new operator should be added to the core library.
Please publish your operator as a separate library before proposing it into RxJS. See the section above.
To create an operator for inclusion in this library, it's probably best to work from prior art. Something
like the filter
operator would be a good start. It's not expected that you'll be able to read
this section and suddenly be an expert operator contributor.
If you find yourself confused, DO NOT worry. Follow prior examples in the repo, submit a PR, and we'll work with you.
Hopefully the information provided here will give context to decisions made while developing operators in this library. There are a few things to know and (try to) understand while developing operators:
- All operator methods are actually created in separate modules from
Observable
. This is so developers can "build their own observable" by pulling in operator methods and adding them to observable in their own module. It also means operators can be brought in ad-hock and used directly, either with the ES7 function bind operator in Babel (::
) or by using it with.call()
. - Every operator has an
Operator
class. TheOperator
class is really aSubscriber
"factory". It's what gets passed into thelift
method to make the "magic" happen. It's sole job is to create the operation'sSubscriber
instance on subscription. - Every operator has a
Subscriber
class. This class does all of the logic for the operation. It's job is to handle values being nexted in (generally by overriding_next()
) and forward it along to thedestination
, which is the next observer in the chain.- It's important to note that the
destination
Observer set on anySubscriber
serves as more than just the destinations for the events passing through, If thedestination
is aSubscriber
it also is used to set up a shared underlyingSubscription
, which, in fact, is also aSubscriber
, and is the firstSubscriber
in the chain. - Subscribers all have
add
andremove
methods that are used for adding and removing inner subscriptions to the shared underlying subscription. - When you
subscribe
to an Observable, the functions or Observer you passed are used to create the finaldestination
Subscriber
for the chain. It's thisSubscriber
that is really also the sharedSubscription
for the operator chain.
- It's important to note that the
Please complete these steps for each new operator added to RxJS as a pull request:
- Add the operator to Rx
- It must have a
-spec.ts
tests file covering the canonical corner cases, with marble diagram tests - If possible, write a
asDiagram
test case too, for PNG diagram generation purposes - The spec file should have a type definition test at the end of the spec to verify type definition for various use cases
- The operator must be documented in JSDoc style in the implementation file, including also the PNG marble diagram image
- The operator should be listed in
doc/operators.md
in a category of operators - It should also be inserted in the operator decision tree file
doc/decision-tree-widget/tree.yml
- You may need to update
MIGRATION.md
if the operator differs from the corresponding one in RxJS v4
An "inner subscriber" or "inner subscription" is any subscription created inside of an operator's primary Subscriber. For example,
if you were to create your own "merge" operator of some sort, the Observables that are arriving on the source observable that you
want to "merge" will need to be subscribed to. Those subscriptions will be inner subscriptions. One interesting thing about
inner subscriptions in this library is that if you pass and set a destination
on them, they will try to use that destination
for their unsubscribe
calls. Meaning if you call unsubscribe
on them, it might not do anything. As such, it's usually desireable
not to set the destination
of inner subscriptions. An example of this might be the switch operators, that have a single underlying
inner subscription that needs to unsubscribe independent of the main subscription.
If you find yourself creating inner subscriptions, it might also be worth checking to see if the observable being passed _isScalar
,
because if it is, you can pull the value
out of it directly and improve the performance of your operator when it's operating over
scalar observables. For reference a scalar observable is any observable that has a single static value underneath it. Observable.of('foo')
will
return a ScalarObservable
, likewise, resolved PromiseObservable
s will act as scalars.