From c395bbf4a92dc91bcbea3b81280431d8755f7889 Mon Sep 17 00:00:00 2001 From: Connor Turland Date: Wed, 11 Mar 2020 13:14:12 -0400 Subject: [PATCH 1/2] first succes with pull-streams and callbags --- package-lock.json | 267 ++++++++++++++++++++++++++++++++++++++- package.json | 28 +++- test/matrix.test.js | 236 ++++++++++++++++++++++++++++++++++ ts-libs/matrix.ts | 302 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 829 insertions(+), 4 deletions(-) create mode 100644 test/matrix.test.js create mode 100644 ts-libs/matrix.ts diff --git a/package-lock.json b/package-lock.json index f3bc46b..ac9a0f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -391,6 +391,236 @@ "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", "integrity": "sha512-zauLjrfCG+xvoyaqLoV8bLVXXNGC4JqlxFCutSDWA6fJrTo2ZuvLYTqZ7aHBLZSMOopbzwv8f+wZcVzfVTI2Dg==" }, + "callbag": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag/-/callbag-1.2.0.tgz", + "integrity": "sha512-RMcVOk2oyA8CyCt/HsLEvnvvttKlaGVaG0uOo5IY7RpdiPLhStjxv5oYre33VJfT8kJE+cuKLSNHooz7yCUdhQ==" + }, + "callbag-basics": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/callbag-basics/-/callbag-basics-3.2.0.tgz", + "integrity": "sha512-+wOwoQYvn+R9q6H/co0KP05pSL4dIQGWvz3BHSshRBf17Y8P8gr7H4kZdvUl7SJ5d0raa97mHfb4ZJRyvReynw==", + "requires": { + "callbag": "1", + "callbag-combine": "1", + "callbag-concat": "1", + "callbag-filter": "1", + "callbag-flatten": "1", + "callbag-for-each": "1", + "callbag-from-event": "1", + "callbag-from-iter": "1", + "callbag-from-obs": "1", + "callbag-from-promise": "1", + "callbag-interval": "1", + "callbag-map": "1", + "callbag-merge": "2", + "callbag-pipe": "1", + "callbag-scan": "1", + "callbag-share": "1", + "callbag-skip": "1", + "callbag-take": "1" + } + }, + "callbag-callback": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/callbag-callback/-/callbag-callback-1.0.0.tgz", + "integrity": "sha512-pjr2+SwDyOrxtjw9opt56SL51JZ5Efb4412M2VLGCIzuMgWMkC4y+jiTTww+StNf5VmsElFj5H2tpBGPV2oKgg==" + }, + "callbag-combine": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-combine/-/callbag-combine-1.2.0.tgz", + "integrity": "sha512-iDHfGNcBb9oIJLA//ytxU1e1W5MjjG+lIu2E2xtVs8fBba7+Zkp4wU6bnIf5II34hDxyEy8O8ct8TY6GO/W8Ig==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-concat": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/callbag-concat/-/callbag-concat-1.2.1.tgz", + "integrity": "sha512-iIC4FCUF7sKnt4gDKHNjMRHekebdaA1/udjON/mCLg31EzzTO/dE9nfxpcj+G/63KopFXVQdZKQDsu7UOYGpPg==", + "requires": { + "callbag": "^1.2.0" + } + }, + "callbag-concat-map": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-concat-map/-/callbag-concat-map-1.1.0.tgz", + "integrity": "sha512-K50diiflf3A9TwIT76OHZnb+Ryw19q5PEsYgWw3LTzTI7SoL2ITulU9M4jVMLatH58CXmI7S01rS7CLD33Zpdw==" + }, + "callbag-filter": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-filter/-/callbag-filter-1.1.0.tgz", + "integrity": "sha512-m6ZHEd9JYHBAYJAdeqgt3gIKSoZOizfJTxIPd8EvDwY8Dr44+slm090WvCrrlz2SZoU8mCqpmZ8TFrGKXHo/yw==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-flat-map": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/callbag-flat-map/-/callbag-flat-map-1.0.2.tgz", + "integrity": "sha512-YaGIGeCPXYKExKXyoPwKnbqsbxtH3bjxbyAASnA8Occ0FdHs0mEbOcFUgH87qNlXmUWHlspBZNQbD2QSEvLbXQ==" + }, + "callbag-flat-map-lazy": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/callbag-flat-map-lazy/-/callbag-flat-map-lazy-1.0.0.tgz", + "integrity": "sha512-DWx5gxOpO1AqXGYfTwIgV2c2UKYzX7snCbSFbS8n4wskMKzjXpeJRixJ2+X6QNiAT5sOozrmDY28fiMyn/o3XA==" + }, + "callbag-flatten": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/callbag-flatten/-/callbag-flatten-1.4.0.tgz", + "integrity": "sha512-2JM/jA0Ijnuu15kqcdepVd8Iv/L3VytTFOjP0rk/BKc4/yvUUPkQfR3U2QM6Uq+DO/roOWMfrVKaMAxsZ8ciww==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-for-each": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-for-each/-/callbag-for-each-1.1.0.tgz", + "integrity": "sha512-/FKC/dd3hnw/M1G5mlZrz/+UbtHYPz5PsmmBvpthNb67v6XuQlT93BzxQDxRHAalAjsOci8ZqoYFgmmJxwGhvg==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-from-event": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/callbag-from-event/-/callbag-from-event-1.3.0.tgz", + "integrity": "sha512-cAu82hKKFmMtKTmd50p/nlMfs1oKz+PGUZmmwhbzPbw4YtjNgTKg6pXjpcQprhBQdrqg/v8pHcAS8Qs6X7r8fw==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-from-iter": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-from-iter/-/callbag-from-iter-1.2.0.tgz", + "integrity": "sha512-9rWvHOnRGp01YMRHHwgVZOO1vu4IRR8GcoH3FpSB16AMzum5juFWJPCMX/XnkJ9j6cic/G+kvb1Grvi6IuSmIQ==" + }, + "callbag-from-obs": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-from-obs/-/callbag-from-obs-1.2.0.tgz", + "integrity": "sha512-InhdPC6P4Gdpg7nuXSkocDFlb+//sbwCrVCYhxOHhSVm1gDcw/zSA+IF1gHdYtk4RQKKaCymUFCkVVUVSRThVQ==", + "requires": { + "symbol-observable": "^1.2.0" + } + }, + "callbag-from-promise": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/callbag-from-promise/-/callbag-from-promise-1.3.0.tgz", + "integrity": "sha512-1e6Xloc5DavrPUYIUKUNLhYTTXFtrRH/tN0qbS+LAzNeu9whCJOLEExjVfntEfSbiuADClAuIiVDLRiPBhu1mg==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-from-pull-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/callbag-from-pull-stream/-/callbag-from-pull-stream-1.0.0.tgz", + "integrity": "sha512-kNZXSxUxCaDQo9XnxPnOkKxF7Ull7tp/Yzy/6o0foHAm617JpwTFtvF9ufhnVzgPZ0Q85dmwmyFZF16ZS5btHA==" + }, + "callbag-interval": { + "version": "github:felixdrp/callbag-interval#2d223c4db3e8be33bfd1a4d135ab3896e31a323b", + "from": "github:felixdrp/callbag-interval", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-map": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-map/-/callbag-map-1.1.0.tgz", + "integrity": "sha512-/xFzvJCtx1xulZpFeI4wq+ZD7tkg417XOeJ3wS9v8bHLLpt+fHKXgbZ8TBnnIxywJN6Uu/ZhL+c1DzUG6KXYFQ==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-map-promise": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/callbag-map-promise/-/callbag-map-promise-0.1.3.tgz", + "integrity": "sha512-RQlstnieRVGNTVl6Hgxso69ScCiiniMQxhqhTcJFNc29ZCZNE4VuWzeO+ttVrpfjYHyyNjGMFQX9VB+fxGJJRw==", + "requires": { + "callbag-flat-map": "^1.0.0", + "callbag-from-promise": "^1.0.0", + "callbag-pipe": "^1.1.1" + } + }, + "callbag-merge": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/callbag-merge/-/callbag-merge-2.0.1.tgz", + "integrity": "sha512-Kl6JQ8Swc7DBuIL+Yjkgm92l82vrD2yQCNZHO8M8o6zVI+a0/RAAPLD5qrnGA1WwHLOOD6xAYsm0gzfzAJ3hkA==" + }, + "callbag-merge-all": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/callbag-merge-all/-/callbag-merge-all-1.0.1.tgz", + "integrity": "sha512-cNmj8x6K+0iddqSGhpteyg1w/Ag/9b99kMeaFhRL8/349AhKOcFOUm0eA08cpijc6MMKg40i8lYpdwy7YPFaYg==" + }, + "callbag-of": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/callbag-of/-/callbag-of-2.0.0.tgz", + "integrity": "sha512-eqOUTQn2P/gzlPlRKo/PY/bVRASDFiFNI2+kfjyd5YHJERET4u6WP4xHNmKJlWcDaFQtaJdigzryu+Ja+W6L9A==" + }, + "callbag-pipe": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-pipe/-/callbag-pipe-1.2.0.tgz", + "integrity": "sha512-M+LdHUK0qJkm4CRe+I870ZzD/SnzcpTsfJRzFoB2hOkNXlrVdoj002KMkfdS4pHMfV6egzZFd8j+xJu6C4kwEQ==" + }, + "callbag-pull": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/callbag-pull/-/callbag-pull-1.0.1.tgz", + "integrity": "sha512-YYyN3qCh7stb8VIr+/c/OQW44WftGJk0+id5m2frB5kboclSN8nwYSccCqtp/Cw1Hz/7pOzsV321YGjd8vAVQQ==" + }, + "callbag-pump": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-pump/-/callbag-pump-1.2.0.tgz", + "integrity": "sha512-UgsqORgBZuh6CUgaZpZIR9wHCGoc+W4mG+6AYKaYskuQMNHYMDIADjckP7uJyruafRDOLJ2UzyXFcxJeI9Y5Uw==" + }, + "callbag-replay-all": { + "version": "github:Connoropolous/callbag-replay-all#98229f04a706920064ca4dd0035f7f135b495a08", + "from": "github:Connoropolous/callbag-replay-all" + }, + "callbag-scan": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-scan/-/callbag-scan-1.1.0.tgz", + "integrity": "sha512-5Aw9LR8b4Fg/gC++lWrq9sXEqgO0c5Gk+HWaFhFoVqgujSUVRxFw+c4bLw1eExWrBX6OeBDbVZ/2rMr/E0q9cQ==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-share": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/callbag-share/-/callbag-share-1.2.0.tgz", + "integrity": "sha512-Gt+D5lfJ/j3X5/3PiMPL8hHDPBmeQILZl2QYu+YJtSFz6PlNGQSL45aD4e4uoxMvhMZY9NpMHh7KRWia9PZA+g==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-skip": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/callbag-skip/-/callbag-skip-1.1.0.tgz", + "integrity": "sha512-KodTVJFA1h1Z7iFvs2ocwtyvb+q38i/FJTabdMTmmPucQV+XpGH8e6BYxmYRznY4FQUTLM9e5Y5HqbUxT7ma1w==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-subject": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/callbag-subject/-/callbag-subject-1.0.2.tgz", + "integrity": "sha512-pMIKoXxJdO6SypymWQyLJHW/idY0esgT68I6xjYQjGZch0IVPlM5Bd+YSJaQ17fGLJEvK3HsaRaDuU8nmAqWOQ==" + }, + "callbag-take": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/callbag-take/-/callbag-take-1.4.0.tgz", + "integrity": "sha512-SMN236uYqjZ+8ypAY12qkV2QwutjOe96GyYPzUF9RKKJR5nRyu0fON4r4XBZ3ZonOWnDW05CWmKmS0mVQg5TIg==", + "requires": { + "callbag": "^1.1.0" + } + }, + "callbag-tap": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/callbag-tap/-/callbag-tap-1.3.0.tgz", + "integrity": "sha512-K2SarqzmLgl2OuzPQY7sQc9JzJ/n3C9Xkug4bLJfuecnhTTv6+y7O1JJ5ky4yXyYZ1eWkMrztnheQhs62wBPhw==" + }, + "callbag-to-pull-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/callbag-to-pull-stream/-/callbag-to-pull-stream-1.0.0.tgz", + "integrity": "sha512-bbSz67aAonxGfXdGUR7iSVvggeil7YXwm6hhUyuBLb7qPNQx/Wc4Ybo7ayUPcMyHiHj7yasm27NZvEozDZnkSg==" + }, "callsite": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/callsite/-/callsite-1.0.0.tgz", @@ -1384,6 +1614,11 @@ "resolved": "https://registry.npmjs.org/lolex/-/lolex-4.2.0.tgz", "integrity": "sha512-gKO5uExCXvSm6zbF562EvM+rd1kQDnB9AZBbiQVzf1ZmdDpxUSvpnAaVOP83N/31mRK8Ml8/VE8DMvsAZQ+7wg==" }, + "looper": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/looper/-/looper-4.0.0.tgz", + "integrity": "sha1-dwat7VmpntygbmtUu4bI7BnJUVU=" + }, "loose-envify": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", @@ -1753,6 +1988,32 @@ "resolved": "https://registry.npmjs.org/psl/-/psl-1.4.0.tgz", "integrity": "sha512-HZzqCGPecFLyoRj5HLfuDSKYTJkAfB5thKBIkRHtGjWwY7p1dAyveIbXIq4tO0KYfDF2tHqPUgY9SDnGm00uFw==" }, + "pull-paramap": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/pull-paramap/-/pull-paramap-1.2.2.tgz", + "integrity": "sha1-UaQZPOnI1yFdla2tReK824STsjo=", + "requires": { + "looper": "^4.0.0" + } + }, + "pull-pushable": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pull-pushable/-/pull-pushable-2.2.0.tgz", + "integrity": "sha1-Xy867UethpGfAbEqLpnW8b13ZYE=" + }, + "pull-stream": { + "version": "3.6.14", + "resolved": "https://registry.npmjs.org/pull-stream/-/pull-stream-3.6.14.tgz", + "integrity": "sha512-KIqdvpqHHaTUA2mCYcLG1ibEbu/LCKoJZsBWyv9lSYtPkJPBq8m3Hxa103xHi6D2thj5YXa0TqK3L3GUkwgnew==" + }, + "pull-tap": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pull-tap/-/pull-tap-1.1.1.tgz", + "integrity": "sha512-KhXu0+OeITLFGiBBuJqJ54VNBNQI4uwmSmzW3+ssIpSUzjs91AHehQGTjOtpKsHJ+NBZd/CzrpAXzFupBrnYFg==", + "requires": { + "pull-stream": "^3.6.1" + } + }, "pump": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/pump/-/pump-2.0.1.tgz", @@ -2085,9 +2346,9 @@ } }, "rsf-types": { - "version": "0.0.33", - "resolved": "https://registry.npmjs.org/rsf-types/-/rsf-types-0.0.33.tgz", - "integrity": "sha512-kMx+9d5T9MrOSXsRmV8YAXtiNnYn+hebhbRuolGlxiO00ivZypVmGGMl3Y+V9bcJKrM4/QNieNg1psBTvbTfSA==", + "version": "0.0.35", + "resolved": "https://registry.npmjs.org/rsf-types/-/rsf-types-0.0.35.tgz", + "integrity": "sha512-EwlxA2QLoMOt8gRWQQMUUJAjOg5OCbB19b7+SmfMn/PJAqn2aahuTMXSRzjdzdN9RPxpL9jxRYVsxgiMpIaQhQ==", "requires": { "typescript": "^3.7.2" } diff --git a/package.json b/package.json index 3b96a1a..7ab7314 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ }, "description": "a collection of modules for rapid collective text-driven sensemaking", "scripts": { - "test": "npm run build && mocha", + "test": "npm run build && mocha test/matrix.test.js --timeout 20000", "build": "tsc --build ts-components", "build-watch": "tsc --build --watch ts-components", "postinstall": "npm run build" @@ -17,11 +17,37 @@ "dependencies": { "@types/mongodb": "^3.3.11", "@types/socket.io-client": "^1.4.32", + "callbag-basics": "^3.2.0", + "callbag-callback": "^1.0.0", + "callbag-concat-map": "^1.1.0", + "callbag-flat-map": "^1.0.2", + "callbag-flat-map-lazy": "^1.0.0", + "callbag-for-each": "^1.1.0", + "callbag-from-iter": "^1.2.0", + "callbag-from-promise": "^1.3.0", + "callbag-from-pull-stream": "^1.0.0", + "callbag-interval": "github:felixdrp/callbag-interval", + "callbag-map": "^1.1.0", + "callbag-map-promise": "^0.1.3", + "callbag-merge-all": "^1.0.1", + "callbag-of": "^2.0.0", + "callbag-pipe": "^1.2.0", + "callbag-pull": "^1.0.1", + "callbag-pump": "^1.2.0", + "callbag-replay-all": "github:Connoropolous/callbag-replay-all", + "callbag-subject": "^1.0.2", + "callbag-take": "^1.4.0", + "callbag-tap": "^1.3.0", + "callbag-to-pull-stream": "^1.0.0", "chai": "^4.2.0", "mocha": "^6.2.2", "moment": "2.24.0", "mongodb": "^3.3.2", "noflo": "^1.1.3", + "pull-paramap": "^1.2.2", + "pull-pushable": "^2.2.0", + "pull-stream": "^3.6.14", + "pull-tap": "^1.1.1", "rsf-contactable": "1.0.91", "rsf-types": "0.0.35", "sinon": "^7.5.0", diff --git a/test/matrix.test.js b/test/matrix.test.js new file mode 100644 index 0000000..88a4012 --- /dev/null +++ b/test/matrix.test.js @@ -0,0 +1,236 @@ +const { expect } = require('chai') +const sinon = require('sinon') +const { pipe, matrix, defaultMapper, forEach } = require('../libs/matrix') +const { newMockMakeContactable } = require('rsf-contactable') + +// get the callbag sink +const fromIter = require('callbag-from-iter') + +const pull = require('pull-stream') +const { tap } = require('pull-tap') +const Pushable = require('pull-pushable') + +/* +This system takes two sources, either realtime, or fixed in length +and is designed to produce one result (response) for every possible combination +of the items in the two streams +It will do this dynamically and ongoingly. +It will do this by performing a 'stream replication' of one of the streams + +sourceX: a pull-stream source +sourceY: a pull-stream source +preMergeTransformer: transform the stream, prior to merging it with the rest + +returns: a new pull-streams source, whatever the product of combining an X and a Y is... a Z? +*/ +const matrixify = ( + sourceX, + sourceY, + // default is to pass it through exactly as is + preMergeTransformer = pull.map(a => a) +) => { + //immediately create the source which will be returned, which is a Pushable + const zSource = Pushable() + + // cache the prompts that have streamed in so far + // for the purposes of forwarding them to new joiners + const pastY = [] + + // cache the existing yStreams too, for the purposes of + // immediately forwarding new Ys to them + const yStreams = [] + + let xDone = false + + // create a standalone pipeline meant to drain the sourceY, + // and in the process of doing so, replicates it for all existing + // yStreams, (and future ones by caching the old values for future joiners) + pull( + sourceY, + // tap is for side effects (does not modify the stream in any way) + tap(y => { + // drop the new Y back into the cache + pastY.push(y) + // feed the Y directly through to existing yStreams + yStreams.forEach(yStream => yStream.push(y)) + }), + pull.drain( + // do nothing here + () => {}, + // record completion + function done() { + console.log('yDone') + yStreams.forEach(yStream => yStream.end()) + } + ) + ) + + // create a second standalone pipeline, meant to drain + // the sourceX stream + pull( + sourceX, + pull.drain( + x => { + const yStream = Pushable() + // add the new yStream to the cache of yStreams + yStreams.push(yStream) + // load in the cached prompts, that came in before this contactable joined + pastY.forEach(yStream.push) + // pull prompts through a one-at-a-time (async) + // process to get someones response to a certain prompt, pipe that response out + pull( + yStream, + // map y back into the xStream + pull.map(y => [x, y]), + // pass it through the preMergeTransformer hook + preMergeTransformer, + // pipe new results through to the zSource + pull.drain(zSource.push, function done() { + // any time that anyone finishes, check if EVERY one has + // finished, and only if so then end the zSource stream + if (xDone && yStreams.every(yS => yS.buffer.length === 0)) { + zSource.end() + } + }) + ) + }, + function done() { + console.log('xDone') + xDone = true + } + ) + ) + + return zSource +} + +// consume a prompt & a contactable, and wait for the contactable to provide +// a valid response +// suitable for passing to pull.asyncMap +const getResponseForPrompt = ([contactable, prompt], cb) => { + console.log(contactable.id + ' received: ' + prompt) + /* +can do the juicy stuff in here ... +listen +wait +validate +teardown +*/ + setTimeout(function() { + cb(null, contactable.id + ' responded to: ' + prompt) + }, Math.floor(Math.random() * 5000)) +} + +describe('PromptPeopleResponse Matrix Pull Streams', function() { + context( + 'when you begin with a finite stream of prompts and contactables', + function() { + it('should deliver pairs of all combinations of prompts and contactables through', function(testDone) { + return testDone() + const mockMakeContactable = newMockMakeContactable(sinon.spy) + + // in real world use case, these are most likely to stream in + // via noflo piping, or a websocket, or something like that. + const contactablesSource = Pushable() + const promptsSource = Pushable() + + promptsSource.push('prompt 1') + setTimeout(() => { + const c1 = mockMakeContactable({ id: 'contactable1' }) + contactablesSource.push(c1) + }, 4000) + setTimeout(() => { + const c2 = mockMakeContactable({ id: 'contactable2' }) + contactablesSource.push(c2) + promptsSource.push('prompt 2') + contactablesSource.end() + promptsSource.end() + }, 6000) + + // const spy = sinon.spy() + pull( + matrixify( + // sourceX + contactablesSource, + // source Y + promptsSource, + // preMergeTransform + pull.asyncMap(getResponseForPrompt) + ), + pull.drain( + response => { + console.log(response) + }, + function done() { + console.log('done') + testDone() + } + ) + ) + }) + } + ) +}) + +describe('PromptPeopleResponse Matrix Callbags', function() { + context( + 'when you begin with a finite stream of prompts and contactables', + function() { + it('should deliver pairs of all combinations of prompts and contactables through', function(done) { + const mockMakeContactable = newMockMakeContactable(sinon.spy) + const contactables = [ + { id: 'contactable1' }, + { id: 'contactable2' } + ].map(mockMakeContactable) + // in real world use case, these are most likely to stream in + // via noflo piping, or a websocket, or something like that. + const contactablesSource = fromIter(contactables) + const promptsSource = fromIter(['prompt 1', 'prompt 2']) + + const results = [] + const resultsSource = matrix( + // sourceX + contactablesSource, + // sourceY + promptsSource, + // preMergeTransform is concatMap, which is like asyncMap + // aka one-at-a-time, in sequence that they came in + defaultMapper + ) + // sink / results handler + const eachResult = forEach( + result => { + console.log(result) + results.push(result) + // const { matrixOverview } = result + }, + function onEnd(e) { + expect(results.length).to.equal(4) + expect(e).to.be.undefined + done() + } + ) + // same as + // pipe(resultsSource, eachResult) + eachResult(resultsSource) + + const c1 = contactables[0] + const c2 = contactables[1] + c1.trigger('c1 first response') + expect(results.length).to.equal(1) + setTimeout(() => { + c2.trigger('c2 first response') + expect(results.length).to.equal(2) + }, 100) + setTimeout(() => { + c1.trigger('c1 second response') + expect(results.length).to.equal(3) + }, 200) + setTimeout(() => { + c2.trigger('c2 second response') + expect(results.length).to.equal(4) + }, 300) + }) + } + ) +}) diff --git a/ts-libs/matrix.ts b/ts-libs/matrix.ts new file mode 100644 index 0000000..19bf692 --- /dev/null +++ b/ts-libs/matrix.ts @@ -0,0 +1,302 @@ +import * as fromIter from 'callbag-from-iter' +import of from 'callbag-of' +import * as pull from 'callbag-pull' + +import * as tap from 'callbag-tap' + +import * as subject from 'callbag-subject' +// asyncMap +import * as concatMap from 'callbag-concat-map' +// full stream replication / backfill +import * as replay from 'callbag-replay-all' + +// merge a source of sources down into a single source +import { mergeAll } from 'callbag-merge-all' +// map, but in parallel +import * as flatMap from 'callbag-flat-map' +import { Statement, Contactable } from 'rsf-types' +// import * as flatMap from 'callbag-flat-map-lazy' + +const START = 0 +const DATA = 1 +const END = 2 + +// from callbag-take +const take = max => source => (start, sink) => { + if (start !== 0) return + let taken = 0 + let sourceTalkback + let end + function talkback(t, d) { + if (t === 2) { + end = true + sourceTalkback(t, d) + } else if (taken < max) sourceTalkback(t, d) + } + source(0, (t, d) => { + if (t === 0) { + sourceTalkback = d + sink(0, talkback) + } else if (t === 1) { + if (taken < max) { + taken++ + sink(t, d) + if (taken === max && !end) { + end = true + // switched the order of these two... I want + // the source to terminate first + // https://github.com/staltz/callbag-take/issues/6 + sourceTalkback(2) + sink(2) + } + } + } else { + sink(t, d) + } + }) +} + +// from callbag-pipe +function pipe(...callbags) { + let res = callbags[0] + for (let i = 1, n = callbags.length; i < n; i++) { + res = callbags[i](res) + } + return res +} +// from callbag-map +const map = mapFn => source => (start, sink) => { + if (start !== START) return + source(START, (type: number, data: any) => { + if (type === DATA) { + sink(type, mapFn(data)) + return + } else { + sink(type, data) + } + }) +} +// from callbag-from-promise +const fromPromise = promise => (start, sink) => { + if (start !== 0) return + let ended = false + const onfulfilled = val => { + if (ended) return + sink(1, val) + if (ended) return + sink(2) + } + const onrejected = (err = new Error()) => { + if (ended) return + sink(2, err) + } + promise.then(onfulfilled, onrejected) + sink(0, t => { + if (t === 2) ended = true + }) +} +// from callbag-for-each, modified +const forEach = (operation, onEnd = (e?: Error) => {}) => source => { + let talkback + source(0, (type: number, data) => { + if (type === START) talkback = data + if (type === DATA) operation(data) + if (type === DATA || type === START) talkback(1) + if (type === END) onEnd(data) + }) +} +// from callbag-scan +function scan(reducer, seed) { + let hasAcc = arguments.length === 2 + return source => (start, sink) => { + if (start !== 0) return + let acc = seed + source(0, (t, d) => { + if (t === 1) { + acc = hasAcc ? reducer(acc, d) : ((hasAcc = true), d) + sink(1, acc) + } else sink(t, d) + }) + } +} + +type Filter = (data: any) => boolean +type Callbag = (type: number, data?: any) => void +// from callbag-filter +const filter = (condition: Filter) => (source: Callbag) => ( + start: number, + sink: any +): void => { + if (start !== START) return + let talkback + source(START, (type, data) => { + if (type === START) { + talkback = data + sink(type, data) + } else if (type === DATA) { + if (condition(data)) sink(type, data) + else talkback(DATA) + } else sink(type, data) + }) +} + +const fromContactable = contactable => (start, sink) => { + if (start !== START) return + let ended = false + const onMessage = (message: string) => { + // prevent race conditions + if (ended) return + sink(DATA, message) + } + contactable.listen(onMessage) + sink(START, (type: number) => { + if (type === END) { + ended = true + contactable.stopListening() + } + }) +} + +const getResponseForPrompt = ( + mapper: (contactable: Contactable) => (message: string) => any +) => ( + validator: (contactable: Contactable) => (message: string) => boolean +) => ([contactable, prompt]: [Contactable, string]) => { + // send the prompt + contactable.speak(prompt) + return pipe( + fromContactable(contactable), // create a stream of messages from the contactable + filter(validator(contactable)), // use the given validator, add hooks here to send back messages + // anything can happen after it filters out invalid responses + map(mapper(contactable)), // use the given mapper, and fold in the contactable + map((result: any) => [result, prompt]), // fold the prompt back in + take(1) // limit to one response per prompt, could make this variable + ) +} + +const basicValidator = (contactable: Contactable) => ( + message: string +): boolean => { + // in the basic validator + // don't check the message at all, just + // let it through + + // can do special stuff in here with contactables though + // like sending a message to the contactable if the message + // was invalid (good practice) + return true +} + +// first level: options +// second level: contactable +// third level: message +const messageToStatement = (anonymous: boolean) => ( + contactable: Contactable +) => (message: string): Statement => { + const statement: Statement = { + text: message, + timestamp: Date.now() + } + if (!anonymous) { + statement.contact = contactable.config() + } + return statement +} + +// asyncMap +// we really want concatMap when dealing with `contactables` +// because they should only be dealing with one thing at a time +// given the constraints of text<>text channel +const defaultMapper = concatMap( + getResponseForPrompt(messageToStatement(false))(basicValidator) +) + +interface MatrixOverview { + xCountIn: number + yCountIn: number + countOut: number +} +interface MatrixResult { + matrixOverview: MatrixOverview + data: any +} + +const matrix = ( + sourceX: (start: any, sink: any) => void, // contactables + sourceY: (start: any, sink: any) => void, // prompts + // make it async so that xSource can finish + // before zSources + preMergeTransformer = flatMap((a: any) => fromPromise(Promise.resolve(a))) +) => { + const matrixOverview: MatrixOverview = { + xCountIn: 0, + yCountIn: 0, + countOut: 0 + } + // source to return + /* acts like a callbag + Call it with args (1, data) to send data into the subject + Call it with args (2, err) to send an error into the subject + Call it with args (2) to make the subject complete + */ + const zSource = subject() + let xDone = false + let yDone = false + let error: Error + let zSources = [] + + function forEachZ(z: any) { + matrixOverview.countOut++ + const data: MatrixResult = { + matrixOverview, + data: z + } + zSource(DATA, data) + } + + function forEachX(x: any) { + // count Xs + matrixOverview.xCountIn++ + let zIsOver = false + // lift up a function that can check if this + // stream is over + zSources.push(() => zIsOver) + pipe( + sourceY, + map((y: any) => [x, y]), + preMergeTransformer, + forEach(forEachZ, function onEnd(e) { + // e can be undefined + zIsOver = true + // check every time a stream finishes here + if (xDone && yDone && zSources.every(z => z())) { + zSource(END, e) + } + }) + ) + } + + forEach( + (y: any) => matrixOverview.yCountIn++, + function onEnd(e) { + // TODO: should errors + // here end the flow early? + error = e + yDone = true + } + )(sourceY) + + // calling sourceX as the input to forEach + // is the same as using pipe in the opposite order + // as arguments + forEach(forEachX, function onEnd(e) { + // TODO: should errors + // here end the flow early? + error = e + xDone = true + })(sourceX) + + return zSource +} + +export { pipe, forEach, matrix, getResponseForPrompt, defaultMapper } From d98874e1e87effbfcd86e0af9aa87023ae84d12f Mon Sep 17 00:00:00 2001 From: Connor Turland Date: Wed, 11 Mar 2020 13:19:20 -0400 Subject: [PATCH 2/2] fix broken import --- ts-libs/matrix.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ts-libs/matrix.ts b/ts-libs/matrix.ts index 19bf692..d4e5646 100644 --- a/ts-libs/matrix.ts +++ b/ts-libs/matrix.ts @@ -8,7 +8,7 @@ import * as subject from 'callbag-subject' // asyncMap import * as concatMap from 'callbag-concat-map' // full stream replication / backfill -import * as replay from 'callbag-replay-all' +// import * as replay from 'callbag-replay-all' // merge a source of sources down into a single source import { mergeAll } from 'callbag-merge-all'