/*
* Copyright (c) 2017-2018 Thomas Otterson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* A set of channel utilities for routing channels to other channels in different ways.
*
* **In every one of these functions** the source channel will not be available to be taken from, as all of the source
* channels will have their values taken by the processes within these functions. The lone exception is `tap`, where
* the regular function of the source channel will be restored if all taps are removed. Even so, while at least one tap
* is in place, the source channel cannot be taken from.
*
* @module cispy/utils/flow
* @private
*/
const { chan, close, CLOSED } = require('../channel');
const { put, take, alts, putAsync, takeAsync } = require('../ops');
const protocols = {
taps: Symbol('multitap/taps')
};
function isNumber(x) {
return Object.prototype.toString.call(x) === '[object Number]' && isFinite(x);
}
/**
* **Pipes the values from one channel onto another channel.**
*
* This ties two channels together so that puts on the source channel can be taken off the destination channel. This
* does not duplicate values in any way - if another process takes a value off the source channel, it will never appear
* on the destination channel. In most cases you will not want to take values off a channel once it's piped to another
* channel, since it's difficult to know which values will go to which channel.
*
* Closing either channel will break the connection between the two. If the source channel is closed, the destination
* channel will by default also be closed. However, passing `true` as the third parameter will cause the destination
* channel to remain open even when the source channel is closed (the connection is still broken however).
*
* Because of the ability to leave the destination channel open, a possible use case for this function is to wrap the
* destination channel(s) of one of the other flow control functions below to have a channel that survives the source
* channel closing. The rest of those functions (aside from the special-case
* `{@link module:cispy/util~CispyUtils.tap|tap}`) automatically close their destination channels when the
* source channels close.
*
* ```
* const { go, chan, put, take, close, utils } = cispy;
* const { pipe } = utils;
*
* const input = chan();
* const output = pipe(input, chan());
*
* go(async () => {
* await put(input, 1);
* await put(input, 2);
* close(input);
* });
*
* go(async () => {
* console.log(await take(output)); // -> 1
* console.log(await take(output)); // -> 2
* console.log(output.closed); // -> true
* });
* ```
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/channel~Channel} src The source channel.
* @param {module:cispy/channel~Channel} dest The destination channel.
* @param {boolean} [keepOpen=false] A flag to indicate that the destination channel should be kept open after the
* source channel closes. By default the destination channel will close when the source channel closes.
* @return {module:cispy/channel~Channel} The destination channel.
*/
function pipe(src, dest, keepOpen) {
async function loop() {
for (;;) {
const value = await take(src);
if (value === CLOSED) {
if (!keepOpen) {
close(dest);
}
break;
}
if (!await put(dest, value)) {
break;
}
}
}
loop();
return dest;
}
/**
* **Creates two new channels and routes values from a source channel to them according to a predicate function.**
*
* The supplied function is invoked with every value that is put onto the source channel. Those that return `true` are
* routed to the first destination channel; those that return `false` are routed to the second.
*
* The new channels are created by the function based on the buffer values passed as the third and fourth parameters. If
* one or both of these are missing, `null`, or `0`, the corresponding destination channel is unbuffered. If one is a
* positive integer, the corresponding channel is buffered by a fixed buffer of that size. If the parameter for a
* channel is a buffer, then that buffer is used to buffer the new channel.
*
* Both new channels are closed when the source channel is closed.
*
*
* ```
* const { go, chan, put, take, utils } = cispy;
* const { partition } = utils;
*
* const input = chan();
* const [even, odd] = partition(x => x % 2 === 0, input);
*
* go(async () => {
* await put(input, 1);
* await put(input, 2);
* await put(input, 3);
* await put(input, 4);
* });
*
* go(async () => {
* console.log(await take(even)); // -> 2
* console.log(await take(even)); // -> 4
* });
*
* go(async () => {
* console.log(await take(odd)); // -> 1
* console.log(await take(odd)); // -> 3
* });
* ```
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/utils~predicate} fn A predicate function used to test each value on the input channel.
* @param {module:cispy/channel~Channel} src The source channel.
* @param {(number|module:cispy/buffers~Buffer)} [tBuffer=0] A buffer used to create the destination channel which
* receives all values that passed the predicate. If this is a number, a
* {@link module:cispy/buffers~FixedBuffer} of that size will be used. If this is `0` or not present, the
* channel will be unbuffered.
* @param {(number|module:cispy/buffers~Buffer)} [fBuffer=0] A buffer used to create the destination channel which
* receives all values that did not pass the predicate. If this is a number, a
* {@link module:cispy/buffers~FixedBuffer} of that size will be used. If this is `0` or not present, the
* channel will be unbuffered.
* @return {module:cispy/core~Channel[]} An array of two channels. The first is the destination channel with all
* of the values that passed the predicate, the second is the destination channel with all of the values that did
* not pass the predicate.
*/
function partition(fn, src, tBuffer = 0, fBuffer = 0) {
const tDest = chan(tBuffer);
const fDest = chan(fBuffer);
async function loop() {
for (;;) {
const value = await take(src);
if (value === CLOSED) {
close(tDest);
close(fDest);
break;
}
await put(fn(value) ? tDest : fDest, value);
}
}
loop();
return [tDest, fDest];
}
/**
* **Merges one or more channels into a single destination channel.**
*
* Values are given to the destination channel as they are put onto the source channels. If `merge` is called when there
* are already values on multiple source channels, the order that they're put onto the destination channel is random.
*
* The destination channel is created by the function based on the buffer value passed as the second parameter. If this
* is missing, `null`, or `0`, the destination channel will be unbuffered. If it's a positive integer, the destination
* channel is buffered by a fixed buffer of that size. If the parameter is a buffer, then that buffer is used to buffer
* the destination channel.
*
* As each source channel closes, it is removed from the merge, leaving the channels that are still open to continue
* merging. When *all* of the source channels close, then the destination channel is closed.
*
* ```
* const { go, chan, put, take, utils } = cispy;
* const { merge } = utils;
*
* const input1 = chan();
* const input2 = chan();
* const input3 = chan();
* const output = merge([input1, input2, input3]);
*
* go(async () => {
* // Because we're putting to all three channels in the same
* // process, we know the order in which the values will be
* // put on the output channel; in general, we won't know this
* await put(input1, 1);
* await put(input2, 2);
* await put(input3, 3);
* });
*
* go(async () => {
* console.log(await take(output)); // -> 1
* console.log(await take(output)); // -> 2
* console.log(await take(output)); // -> 3
* });
* ```
*
* @memberOf module:cispy/utils~CispyPUtils
* @param {module:cispy/channel~Channel[]} srcs An array of source channels.
* @param {(number|module:cispy/buffers~Buffer)} [buffer=0] A buffer used to create the destination channel. If
* this is a number, a {@link module:cispy/buffers~FixedBuffer} of that size will be used. If this is `0` or
* not present, the channel will be unbuffered.
* @return {module:cispy/channel~Channel} The destination channel, which will receive all values put onto every
* source channel.
*/
function merge(srcs, buffer = 0) {
const dest = chan(buffer);
const inputs = srcs.slice();
async function loop() {
for (;;) {
if (inputs.length === 0) {
break;
}
const { value, channel } = await alts(inputs);
if (value === CLOSED) {
const index = inputs.indexOf(channel);
inputs.splice(index, 1);
continue;
}
await put(dest, value);
}
close(dest);
}
loop();
return dest;
}
/**
* **Splits a single channel into multiple destination channels, with each destination channel receiving every value put
* onto the source channel.**
*
* Every parameter after the first represents the buffer from a single destination channel. Each `0` or `null` will
* produce an unbuffered channel, while each positive integer will produce a channel buffered by a fixed buffer of that
* size. Each buffer will produce a buffered channel backed by that buffer. If there are no parameters after the first,
* then two unbuffered channels will be produced as a default.
*
* When the source channel is closed, all destination channels will also be closed. However, if destination channels are
* closed, they do nothing to the source channel.
*
* ```
* const { go, chan, put, take, utils } = cispy;
* const { split } = util;
*
* const input = chan();
* const outputs = split(input, 3);
*
* go(async () => {
* await put(input, 1);
* await put(input, 2);
* await put(input, 3);
* });
*
* go(async () => {
* for (const output of outputs) { // Each output will happen 3 times
* console.log(await take(output)); // -> 1
* console.log(await take(output)); // -> 2
* console.log(await take(output)); // -> 3
* }
* });
* ```
*
* This function moves its values to the output channels asynchronously. This means that even when using unbuffered
* channels, it is not necessary for all output channels to be taken from before the next put to the input channel can
* complete.
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/channel~Channel} src The source channel.
* @param {...(number|module:cispy/buffers~Buffer)} [buffers=2] The buffers used to create the destination
* channels. Each entry is treated separately. If one is a number, then a
* {@link module:cispy/buffers~FixedBuffer|FixedBuffer} of that size will be used. If one is a `0`, then the
* corresponding channel will be unbuffered. **Exception:** if a single number is passed, then that number of
* unbuferred channels will be created. This means that the default is to create two unbuffered channels. To create
* a single channel with a fixed buffer, use `{@link cispy~Cispy.fixedBuffer|fixedBuffer}` explicitly.
* @return {module:cispy/channel~Channel[]} An array of destination channels.
*/
function split(src, ...buffers) {
const dests = [];
let bs = buffers.length === 0 ? [2] : buffers;
if (bs.length === 1 && isNumber(bs[0])) {
const count = bs[0];
bs = [];
for (let i = 0; i < count; ++i) {
bs.push(0);
}
}
for (const b of bs) {
dests.push(chan(b));
}
const done = chan();
let count = 0;
function cb() {
if (--count === 0) {
putAsync(done);
}
}
async function loop() {
for (;;) {
const value = await take(src);
if (value === CLOSED) {
for (const dest of dests) {
close(dest);
}
break;
}
count = dests.length;
for (const dest of dests) {
putAsync(dest, value, cb);
}
await take(done);
}
}
loop();
return dests;
}
/**
* Utility function to add the ability to be tapped to a channel that is being tapped. This will add a single property
* to that channel only (named '@@multitap/taps' so as to decrease the chance of collision), but the tapping
* functionality itself is provided outside the channel object. This new property is an array of the channels tapping
* this channel, and it will be removed if all taps are removed.
*
* @param {module:cispy/channel~Channel} src The source channel to be tapped.
* @private
*/
function tapped(src) {
// Make the new property non-enumerable
Object.defineProperty(src, protocols.taps, {
configurable: true,
writable: true,
value: []
});
const done = chan();
let count = 0;
function cb() {
if (--count === 0) {
putAsync(done);
}
}
async function loop() {
for (;;) {
const value = await take(src);
if (value === CLOSED || src[protocols.taps].length === 0) {
delete src[protocols.taps];
break;
}
count = src[protocols.taps].length;
for (const tap of src[protocols.taps]) {
putAsync(tap, value, cb);
}
await take(done);
}
}
loop();
}
/**
* **Taps a channel, sending all of the values put onto it to the destination channel.**
*
* A source channel can be tapped multiple times, and all of the tapping (destination) channels receive each value put
* onto the tapped (source) channel.
*
* This is different from `{@link module:cispy/utils~CispyUtils.split|split}` in that it's temporary. Channels
* can tap a channel and then untap it, multiple times, as needed. If a source channel has all of its taps removed, then
* it reverts to a normal channel, just as it was before it was tapped.
*
* Also unlike `{@link module:cispy/utils~CispyUtils.split|split}`, each call can only tap once. For multiple
* channels to tap a source channel, `tap` has to be called multiple times.
*
* Closing either the source or any of the destination channels has no effect on any of the other channels.
*
* ```
* const { go, chan, put, take, utils } = cispy;
* const { tap } = utils;
*
* const input = chan();
* const tapper = chan();
* tap(input, tapper);
*
* go(async () => {
* await put(input, 1);
* await put(input, 2);
* });
*
* go(async () => {
* console.log(await take(tapper)); // -> 1
* console.log(await take(tapper)); // -> 2
* });
*
* ```
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/channel~Channel} src The channel to be tapped.
* @param {module:cispy/channel~Channel} [dest] The channel tapping the source channel. If this is not present,
* a new unbuffered channel will be created.
* @return {module:cispy/channel~Channel} The destination channel. This is the same as the second argument, if
* present; otherwise it is the newly-created channel tapping the source channel.
*/
function tap(src, dest = chan()) {
if (!src[protocols.taps]) {
tapped(src);
}
if (!~src[protocols.taps].indexOf(dest)) {
src[protocols.taps].push(dest);
}
return dest;
}
/**
* **Untaps a previously tapping destination channel from its source channel.**
*
* This removes a previously created tap. The destination (tapping) channel will stop receiving the values put onto the
* source channel.
*
* If the destination channel was not, in fact, tapping the source channel, this function will do nothing. If all taps
* are removed, the source channel reverts to normal (i.e., it no longer has the tapping code applied to it and can be
* taken from as normal).
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/channel~Channel} src The tapped channel.
* @param {module:cispy/channel~Channel} dest The channel that is tapping the source channel that should no longer
* be tapping the source channel.
*/
function untap(src, dest) {
const taps = src[protocols.taps];
if (taps) {
const index = taps.indexOf(dest);
if (index !== -1) {
taps.splice(index, 1);
if (taps.length === 0) {
putAsync(src);
}
}
}
}
/**
* **Removes all taps from a source channel.**
*
* The previously-tapped channel reverts to a normal channel, while any channels that might have been tapping it no
* longer receive values from the source channel. If the source channel had no taps, this function does nothing.
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/channel~Channel} src The tapped channel. All taps will be removed from this channel.
*/
function untapAll(src) {
if (src[protocols.taps]) {
src[protocols.taps] = [];
putAsync(src);
}
}
/**
* **Maps the values from one or more source channels through a function, putting the results on a new channel.**
*
* The mapping function is given one value from each source channel, after at least one value becomes available on every
* source channel. The output value from the function is then put onto the destination channel.
*
* The destination channel is created by the function based on the buffer value passed as the third parameter. If this
* is missing, `null`, or `0`, the destination channel will be unbuffered. If it's a positive integer, the destination
* channel is buffered by a fixed buffer of that size. If the parameter is a buffer, then that buffer is used to buffer
* the destination channel.
*
* Once *any* source channel is closed, the mapping ceases and the destination channel is also closed.
*
* This is obviously similar to a map transducer, but unlike a transducer, this function works with multiple input
* channels. This is something that a transducer on a single channel is unable to do.
*
* ```
* const { go, chan, put, take, close, utils } = cispy;
* const { map } = utils;
*
* const input1 = chan();
* const input2 = chan();
* const input3 = chan();
* const output = map((x, y, z) => x + y + z, [input1, input2, input3]);
*
* go(async () => {
* await put(input1, 1);
* await put(input1, 2);
* await put(input1, 3);
* });
*
* go(async () => {
* await put(input2, 10);
* await put(input2, 20);
* close(input2);
* });
*
* go(async () => {
* await put(input3, 100);
* await put(input3, 200);
* await put(input3, 300);
* });
*
* go(async () => {
* console.log(await take(output)); // -> 111
* console.log(await take(output)); // -> 222
* // Output closes now because input2 closes after 2 values
* console.log(output.closed); // -> true
* });
* ```
*
* @memberOf module:cispy/utils~CispyUtils
* @param {module:cispy/utils~mapper} fn The mapping function. This should have one parameter for each source
* channel and return a single value.
* @param {module:cispy/channel~Channel[]} srcs The source channels.
* @param {(number|module:cispy/buffers~Buffer)} [buffer=0] A buffer used to create the destination channel. If
* this is a number, a {@link module:cispy/buffers~FixedBuffer} of that size will be used. If this is `0` or
* not present, the channel will be unbuffered.
* @return {module:cispy/channel~Channel} The destination channel.
*/
function map(fn, srcs, buffer = 0) {
const dest = chan(buffer);
const srcLen = srcs.length;
const values = [];
const callbacks = [];
const temp = chan();
let count;
for (let i = 0; i < srcLen; ++i) {
callbacks[i] = (index => {
return value => {
values[index] = value;
if (--count === 0) {
putAsync(temp, values.slice());
}
};
})(i);
}
async function loop() {
for (;;) {
count = srcLen;
for (let i = 0; i < srcLen; ++i) {
takeAsync(srcs[i], callbacks[i]);
}
const values = await take(temp);
for (const value of values) {
if (value === CLOSED) {
close(dest);
return;
}
}
await put(dest, fn(...values));
}
}
loop();
return dest;
}
module.exports = {
pipe,
partition,
merge,
split,
tap,
untap,
untapAll,
map
};