Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with takeUntil in nested stream #189

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 231 additions & 1 deletion examples/drag-and-drop/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,238 @@ StreamTransformer.prototype['@@transducer/step'] = function(s, v) { return v; };

var lib = flyd;

/**
* Tests whether or not an object is an array.
*
* @private
* @param {*} val The object to test.
* @return {Boolean} `true` if `val` is an array, `false` otherwise.
* @example
*
* _isArray([]); //=> true
* _isArray(null); //=> false
* _isArray({}); //=> false
*/
var _isArray = Array.isArray || function _isArray(val) {
return val != null && val.length >= 0 && Object.prototype.toString.call(val) === '[object Array]';
};

function _isTransformer(obj) {
return typeof obj['@@transducer/step'] === 'function';
}
var _isTransformer_1 = _isTransformer;

/**
* Returns a function that dispatches with different strategies based on the
* object in list position (last argument). If it is an array, executes [fn].
* Otherwise, if it has a function with one of the given method names, it will
* execute that function (functor case). Otherwise, if it is a transformer,
* uses transducer [xf] to return a new transformer (transducer case).
* Otherwise, it will default to executing [fn].
*
* @private
* @param {Array} methodNames properties to check for a custom implementation
* @param {Function} xf transducer to initialize if object is transformer
* @param {Function} fn default ramda implementation
* @return {Function} A function that dispatches on object in list position
*/


function _dispatchable(methodNames, xf, fn) {
return function () {
if (arguments.length === 0) {
return fn();
}
var args = Array.prototype.slice.call(arguments, 0);
var obj = args.pop();
if (!_isArray(obj)) {
var idx = 0;
while (idx < methodNames.length) {
if (typeof obj[methodNames[idx]] === 'function') {
return obj[methodNames[idx]].apply(obj, args);
}
idx += 1;
}
if (_isTransformer_1(obj)) {
var transducer = xf.apply(null, args);
return transducer(obj);
}
}
return fn.apply(this, arguments);
};
}
var _dispatchable_1 = _dispatchable;

var _xfBase = {
init: function () {
return this.xf['@@transducer/init']();
},
result: function (result) {
return this.xf['@@transducer/result'](result);
}
};

var XDrop = /*#__PURE__*/function () {

function XDrop(n, xf) {
this.xf = xf;
this.n = n;
}
XDrop.prototype['@@transducer/init'] = _xfBase.init;
XDrop.prototype['@@transducer/result'] = _xfBase.result;
XDrop.prototype['@@transducer/step'] = function (result, input) {
if (this.n > 0) {
this.n -= 1;
return result;
}
return this.xf['@@transducer/step'](result, input);
};

return XDrop;
}();

var _xdrop = /*#__PURE__*/_curry2_1(function _xdrop(n, xf) {
return new XDrop(n, xf);
});
var _xdrop_1 = _xdrop;

/**
* This checks whether a function has a [methodname] function. If it isn't an
* array it will execute that function otherwise it will default to the ramda
* implementation.
*
* @private
* @param {Function} fn ramda implemtation
* @param {String} methodname property to check for a custom implementation
* @return {Object} Whatever the return value of the method is.
*/


function _checkForMethod(methodname, fn) {
return function () {
var length = arguments.length;
if (length === 0) {
return fn();
}
var obj = arguments[length - 1];
return _isArray(obj) || typeof obj[methodname] !== 'function' ? fn.apply(this, arguments) : obj[methodname].apply(obj, Array.prototype.slice.call(arguments, 0, length - 1));
};
}
var _checkForMethod_1 = _checkForMethod;

/**
* Optimized internal three-arity curry function.
*
* @private
* @category Function
* @param {Function} fn The function to curry.
* @return {Function} The curried function.
*/


function _curry3(fn) {
return function f3(a, b, c) {
switch (arguments.length) {
case 0:
return f3;
case 1:
return _isPlaceholder_1(a) ? f3 : _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
});
case 2:
return _isPlaceholder_1(a) && _isPlaceholder_1(b) ? f3 : _isPlaceholder_1(a) ? _curry2_1(function (_a, _c) {
return fn(_a, b, _c);
}) : _isPlaceholder_1(b) ? _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
}) : _curry1_1(function (_c) {
return fn(a, b, _c);
});
default:
return _isPlaceholder_1(a) && _isPlaceholder_1(b) && _isPlaceholder_1(c) ? f3 : _isPlaceholder_1(a) && _isPlaceholder_1(b) ? _curry2_1(function (_a, _b) {
return fn(_a, _b, c);
}) : _isPlaceholder_1(a) && _isPlaceholder_1(c) ? _curry2_1(function (_a, _c) {
return fn(_a, b, _c);
}) : _isPlaceholder_1(b) && _isPlaceholder_1(c) ? _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
}) : _isPlaceholder_1(a) ? _curry1_1(function (_a) {
return fn(_a, b, c);
}) : _isPlaceholder_1(b) ? _curry1_1(function (_b) {
return fn(a, _b, c);
}) : _isPlaceholder_1(c) ? _curry1_1(function (_c) {
return fn(a, b, _c);
}) : fn(a, b, c);
}
};
}
var _curry3_1 = _curry3;

/**
* Returns the elements of the given list or string (or object with a `slice`
* method) from `fromIndex` (inclusive) to `toIndex` (exclusive).
*
* Dispatches to the `slice` method of the third argument, if present.
*
* @func
* @memberOf R
* @since v0.1.4
* @category List
* @sig Number -> Number -> [a] -> [a]
* @sig Number -> Number -> String -> String
* @param {Number} fromIndex The start index (inclusive).
* @param {Number} toIndex The end index (exclusive).
* @param {*} list
* @return {*}
* @example
*
* R.slice(1, 3, ['a', 'b', 'c', 'd']); //=> ['b', 'c']
* R.slice(1, Infinity, ['a', 'b', 'c', 'd']); //=> ['b', 'c', 'd']
* R.slice(0, -1, ['a', 'b', 'c', 'd']); //=> ['a', 'b', 'c']
* R.slice(-3, -1, ['a', 'b', 'c', 'd']); //=> ['b', 'c']
* R.slice(0, 3, 'ramda'); //=> 'ram'
*/


var slice = /*#__PURE__*/_curry3_1( /*#__PURE__*/_checkForMethod_1('slice', function slice(fromIndex, toIndex, list) {
return Array.prototype.slice.call(list, fromIndex, toIndex);
}));
var slice_1 = slice;

/**
* Returns all but the first `n` elements of the given list, string, or
* transducer/transformer (or object with a `drop` method).
*
* Dispatches to the `drop` method of the second argument, if present.
*
* @func
* @memberOf R
* @since v0.1.0
* @category List
* @sig Number -> [a] -> [a]
* @sig Number -> String -> String
* @param {Number} n
* @param {*} list
* @return {*} A copy of list without the first `n` elements
* @see R.take, R.transduce, R.dropLast, R.dropWhile
* @example
*
* R.drop(1, ['foo', 'bar', 'baz']); //=> ['bar', 'baz']
* R.drop(2, ['foo', 'bar', 'baz']); //=> ['baz']
* R.drop(3, ['foo', 'bar', 'baz']); //=> []
* R.drop(4, ['foo', 'bar', 'baz']); //=> []
* R.drop(3, 'ramda'); //=> 'da'
*/


var drop = /*#__PURE__*/_curry2_1( /*#__PURE__*/_dispatchable_1(['drop'], _xdrop_1, function drop(n, xs) {
return slice_1(Math.max(0, n), Infinity, xs);
}));
var drop_1 = drop;

var dropCurrentValue = lib.transduce(drop_1(1));

var takeuntil = lib.curryN(2, function(src, term) {
return lib.endsOn(lib.merge(term, src.end), lib.combine(function(src, self) {
var end$ = term.hasVal ? dropCurrentValue(term) : term;
return lib.endsOn(lib.merge(end$, src.end), lib.combine(function(src, self) {
self(src());
}, [src]));
});
Expand Down
4 changes: 1 addition & 3 deletions module/switchlatest/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
var flyd = require('../../lib');
var takeUntil = require('../takeuntil');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = function(s) {
return flyd.combine(function(stream$, self) {
var value$ = stream$();
flyd.on(self, takeUntil(value$, dropCurrentValue(stream$)));
flyd.on(self, takeUntil(value$, stream$));
}, [s]);
};
6 changes: 5 additions & 1 deletion module/takeuntil/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
var flyd = require('../../lib');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = flyd.curryN(2, function(src, term) {
return flyd.endsOn(flyd.merge(term, src.end), flyd.combine(function(src, self) {
var end$ = term.hasVal ? dropCurrentValue(term) : term;
return flyd.endsOn(flyd.merge(end$, src.end), flyd.combine(function(src, self) {
self(src());
}, [src]));
});
20 changes: 20 additions & 0 deletions module/takeuntil/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,24 @@ describe('takeUntil', function() {
assert.deepEqual(result, [1]);
assert(s.end());
});

it('works in nested streams', function() {
var source = stream(1);
var terminator = stream(true);

var value = stream(1).chain(function() {
return takeUntil(source, terminator);
})
.map(function(val) {
return val + 1;
});

source(2)(3)(4)(5);

terminator(true);

source(6)(7)(8)(9);

assert.equal(value(), 6);
})
});