Skip to content

Commit

Permalink
Improve types and get rid of @@observable nonsense
Browse files Browse the repository at this point in the history
  • Loading branch information
kayahr committed Jan 16, 2025
1 parent 97ae447 commit 508309a
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 29 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ if (isUnsubscribable(something)) {

## RxJS compatibility

RxJS' Observable implementation is not standard-conform, especially because it uses a `pipe` method which is not part of the proposed standard. So in order to use Observable instances of this library in RxJS pipes you have to convert the observable with RxJS' `from` function:
RxJS' Observable implementation is not standard-conform, especially because it uses a `pipe` method which is not part of the proposed standard. RxJS unfortunately also does not use a polyfill for `Symbol.observable`, it just assumes that one is present or otherwise it falls back on using a non-standard `"@@observable"` string instead. To ensure compatibility between RxJS and any other observable implementation make sure to load a `Symbol.observable` polyfill like [symbol-observable] before importing RxJS:

```typescript
import "symbol-observable";
```

In order to use Observable instances of this library in RxJS pipes you have to convert the observable with RxJS' `from` function:

```typescript
import { from } from "rxjs";
Expand Down
21 changes: 3 additions & 18 deletions src/main/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "symbol-observable";
import { ObservableLike } from "./ObservableLike.js";
import { Observer } from "./Observer.js";
import { isSubscribable, Subscribable } from "./Subscribable.js";
import type { SubscribeArgs } from "./SubscribeArgs.js";
import { SubscriberFunction } from "./SubscriberFunction.js";
import { Subscription } from "./Subscription.js";
import { SubscriptionImpl } from "./SubscriptionImpl.js";
Expand Down Expand Up @@ -41,18 +42,6 @@ export interface ObservableConstructor<T> {
from(observable: Subscribable<T> | Iterable<T>): ObservableLike<T>;
}

/**
* Subscriber arguments.
*/
export type SubscribeArgs<T = unknown> =
| [ Observer<T> ]
| [ (value: T) => void, ((error: Error) => void)?, (() => void)? ]
| [
(((value: T) => void) | null | undefined),
(((error: any) => void) | null | undefined)?,
((() => void) | null | undefined)?
];

/**
* Observable implementation.
*/
Expand Down Expand Up @@ -92,14 +81,11 @@ export class Observable<T> implements ObservableLike<T> {
return createObservableFrom<T>(this, Observable, observable);
}

/** @inheritDoc */
public [Symbol.observable](): this {
return this;
}

public "@@observable"(): this {
return this;
}

/** @inheritDoc */
public subscribe(observer: Observer<T>): Subscription;

Expand Down Expand Up @@ -167,8 +153,7 @@ export function createObservableFrom<T>(thisConstructor: ObservableConstructor<T
observer.complete();
});
} else if (observable instanceof Object) {
const observableFactory = (observable as Observable<T>)["@@observable"]
?? (observable as Observable<T>)[Symbol.observable];
const observableFactory = (observable as Observable<T>)[Symbol.observable];
if (observableFactory instanceof Function) {
const observable = observableFactory();
if (observable instanceof Object) {
Expand Down
11 changes: 3 additions & 8 deletions src/main/ObservableLike.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Observer } from "./Observer.js";
import { Subscribable } from "./Subscribable.js";
import type { SubscribeArgs } from "./SubscribeArgs.js";
import { Subscription } from "./Subscription.js";

/**
Expand All @@ -19,16 +20,10 @@ export interface ObservableLike<T = unknown> extends Subscribable<T> {
Subscription;

/** @inheritDoc */
subscribe(...args: [ Observer<T> ] | [ (value: T) => void, ((error: Error) => void)?, (() => void)? ]):
Subscription;
subscribe(...args: SubscribeArgs<T>): Subscription;

/**
* Returns itself.
*/
[Symbol.observable]?(): ObservableLike<T>;

/**
* Returns itself. This is used as a fallback for environments which don't support `Symbol.observable`.
*/
"@@observable"?(): ObservableLike<T>;
[Symbol.observable](): ObservableLike<T>;
}
18 changes: 18 additions & 0 deletions src/main/SubscribeArgs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2018 Klaus Reimer <[email protected]>
* See LICENSE.md for licensing information.
*/

import type { Observer } from "./Observer.js";

/**
* Subscriber arguments.
*/
export type SubscribeArgs<T = unknown> =
| [ Observer<T> ]
| [ (value: T) => void, ((error: Error) => void)?, (() => void)? ]
| [
(((value: T) => void) | null | undefined),
(((error: any) => void) | null | undefined)?,
((() => void) | null | undefined)?
];
3 changes: 2 additions & 1 deletion src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
* See LICENSE.md for licensing information
*/

export { Observable, type SubscribeArgs } from "./Observable.js";
export { Observable } from "./Observable.js";
export { type ObservableLike } from "./ObservableLike.js";
export { type CompleteObserver, type ErrorObserver, type NextObserver, type Observer } from "./Observer.js";
export { SharedObservable } from "./SharedObservable.js";
export { isSubscribable, type Subscribable } from "./Subscribable.js";
export { type SubscribeArgs } from "./SubscribeArgs.js";
export { type SubscriberFunction } from "./SubscriberFunction.js";
export { type Subscription } from "./Subscription.js";
export { type SubscriptionObserver } from "./SubscriptionObserver.js";
Expand Down
2 changes: 2 additions & 0 deletions src/test/Observable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* See LICENSE.md for licensing information.
*/

import "symbol-observable";

import { runTests as runObservableTests } from "es-observable-tests";
import { from, merge, Subject } from "rxjs";
import { describe, expect, it, vi } from "vitest";
Expand Down
48 changes: 48 additions & 0 deletions src/test/ObservableLike.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2025 Klaus Reimer <[email protected]>
* See LICENSE.md for licensing information.
*/

import "symbol-observable";

import { from, merge, Subject } from "rxjs";
import { describe, expect, it } from "vitest";

import { Observable } from "../main/Observable.js";
import type { ObservableLike } from "../main/ObservableLike.js";
import type { SubscriptionObserver } from "../main/SubscriptionObserver.js";

describe("ObservableLike", () => {
it("can be converted into RxJS observable", () => {
let exposedObserver: SubscriptionObserver<number> | undefined;
const observable: ObservableLike<number> = new Observable<number>(observer => {
exposedObserver = observer;
});
const rxjsObservable = from(observable);
const values: number[] = [];
rxjsObservable.subscribe(value => values.push(value));
if (exposedObserver == null) {
throw new Error("Observer not exposed");
}
exposedObserver.next(1);
expect(values).toEqual([ 1 ]);
exposedObserver.next(2);
expect(values).toEqual([ 1, 2 ]);
});
it("can be used in RxJS operators", () => {
let exposedObserver: SubscriptionObserver<number> | undefined;
const observable: ObservableLike<number> = new Observable<number>(observer => {
exposedObserver = observer;
});
const subject = new Subject<number>();
const mergedObservable = merge(subject, observable);
const values: number[] = [];
mergedObservable.subscribe(value => values.push(value));
if (exposedObserver == null) {
throw new Error("Observer not exposed");
}
exposedObserver.next(1);
subject.next(2);
expect(values).toEqual([ 1, 2 ]);
});
});
2 changes: 2 additions & 0 deletions src/test/SharedObservable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* See LICENSE.md for licensing information
*/

import "symbol-observable";

import { from } from "rxjs";
import { describe, expect, it, vi } from "vitest";

Expand Down
3 changes: 2 additions & 1 deletion src/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import { describe, expect, it } from "vitest";

import * as exports from "../main/index.js";
import { Observable, type SubscribeArgs } from "../main/Observable.js";
import { Observable } from "../main/Observable.js";
import { type ObservableLike } from "../main/ObservableLike.js";
import { type CompleteObserver, type ErrorObserver, type NextObserver, type Observer } from "../main/Observer.js";
import { SharedObservable } from "../main/SharedObservable.js";
import { isSubscribable, type Subscribable } from "../main/Subscribable.js";
import { type SubscribeArgs } from "../main/SubscribeArgs.js";
import { type SubscriberFunction } from "../main/SubscriberFunction.js";
import { type Subscription } from "../main/Subscription.js";
import { type SubscriptionObserver } from "../main/SubscriptionObserver.js";
Expand Down

0 comments on commit 508309a

Please sign in to comment.