- Source:
A set of utility functions using processes to work with channels.
These are all accessed through the cispy.utils
namespace; e.g.,
reduce
can be called like this:
const output = cispy.utils.reduce((acc, value) => acc + value, ch, 0);
Methods
(static) debounce(src, bufferopt, delay, optionsopt) → {module:cispy/channel~Channel}
- Source:
Debounces an input channel.
Debouncing is the act of turning several input values into one. For example, debouncing a channel driven by a
mousemove
event would cause only the final mousemove
event to be put onto the channel, dropping all of the other
ones. This can be desirable because mousemove
events come in bunches, being produced continually while the mouse is
moving, and often all that we really care about is to learn where the mouse ended up.
This function does this by controlling which values that have been put onto the source channel are made available on the destination channel, and when.
The delay
parameter determines the debounce threshold. Once the first value is put onto the source channel,
debouncing is in effect until the number of milliseconds in delay
passes without any other value being put onto
that channel. In other words, the delay will be refreshed if another value is put onto the source channel before the
delay elapses. After a full delay interval passes without a value being placed on the source channel, the last value
put becomes available on the destination channel.
This behavior can be modified through four options: leading
, trailing
, maxDelay
, and cancel
.
If both leading
and trailing
are true
, values will not be duplicated. The first value put onto the source
channel will be put onto the destination channel immediately (per leading
) and the delay will begin, but a value
will only be made available on the destination channel at the end of the delay (per trailing
) if another input
value was put onto the source channel before the delay expired.
Parameters:
Name | Type | Attributes | Default | Description | |||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
src |
module:cispy/channel~Channel | The source channel. |
|||||||||||||||||||||||||||
buffer |
number | module:cispy/buffers~Buffer |
<optional> |
0
|
A buffer used to create the destination channel. If
this is a number, a FixedBuffer of that size will be used. If this is
|
|||||||||||||||||||||||||
delay |
number | The debouncing delay, in milliseconds. |
|||||||||||||||||||||||||||
options |
Object |
<optional> |
{}
|
A set of options to further configure the debouncing. Properties
|
Returns:
The newly-created destination channel, where all of the values will be debounced from the source channel.
(static) map(fn, srcs, bufferopt) → {module:cispy/channel~Channel}
- Source:
Maps the values from one or more source channels through a function, putting the results on a new channel.
The mapping function is given one value from each source channel, after at least one value becomes available on every source channel. The output value from the function is then put onto the destination channel.
The destination channel is created by the function based on the buffer value passed as the third parameter. If this
is missing, null
, or 0
, the destination channel will be unbuffered. If it's a positive integer, the destination
channel is buffered by a fixed buffer of that size. If the parameter is a buffer, then that buffer is used to buffer
the destination channel.
Once any source channel is closed, the mapping ceases and the destination channel is also closed.
This is obviously similar to a map transducer, but unlike a transducer, this function works with multiple input channels. This is something that a transducer on a single channel is unable to do.
const { go, chan, put, take, close, utils } = cispy;
const { map } = utils;
const input1 = chan();
const input2 = chan();
const input3 = chan();
const output = map((x, y, z) => x + y + z, [input1, input2, input3]);
go(async () => {
await put(input1, 1);
await put(input1, 2);
await put(input1, 3);
});
go(async () => {
await put(input2, 10);
await put(input2, 20);
close(input2);
});
go(async () => {
await put(input3, 100);
await put(input3, 200);
await put(input3, 300);
});
go(async () => {
console.log(await take(output)); // -> 111
console.log(await take(output)); // -> 222
// Output closes now because input2 closes after 2 values
console.log(output.closed); // -> true
});
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
fn |
module:cispy/utils~mapper | The mapping function. This should have one parameter for each source channel and return a single value. |
||
srcs |
Array.<module:cispy/channel~Channel> | The source channels. |
||
buffer |
number | module:cispy/buffers~Buffer |
<optional> |
0
|
A buffer used to create the destination channel. If
this is a number, a module:cispy/buffers~FixedBuffer of that size will be used. If this is |
Returns:
The destination channel.
(static) partition(fn, src, tBufferopt, fBufferopt) → {Array.<module:cispy/core~Channel>}
- Source:
Creates two new channels and routes values from a source channel to them according to a predicate function.
The supplied function is invoked with every value that is put onto the source channel. Those that return true
are
routed to the first destination channel; those that return false
are routed to the second.
The new channels are created by the function based on the buffer values passed as the third and fourth parameters. If
one or both of these are missing, null
, or 0
, the corresponding destination channel is unbuffered. If one is a
positive integer, the corresponding channel is buffered by a fixed buffer of that size. If the parameter for a
channel is a buffer, then that buffer is used to buffer the new channel.
Both new channels are closed when the source channel is closed.
const { go, chan, put, take, utils } = cispy;
const { partition } = utils;
const input = chan();
const [even, odd] = partition(x => x % 2 === 0, input);
go(async () => {
await put(input, 1);
await put(input, 2);
await put(input, 3);
await put(input, 4);
});
go(async () => {
console.log(await take(even)); // -> 2
console.log(await take(even)); // -> 4
});
go(async () => {
console.log(await take(odd)); // -> 1
console.log(await take(odd)); // -> 3
});
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
fn |
module:cispy/utils~predicate | A predicate function used to test each value on the input channel. |
||
src |
module:cispy/channel~Channel | The source channel. |
||
tBuffer |
number | module:cispy/buffers~Buffer |
<optional> |
0
|
A buffer used to create the destination channel which
receives all values that passed the predicate. If this is a number, a
module:cispy/buffers~FixedBuffer of that size will be used. If this is |
fBuffer |
number | module:cispy/buffers~Buffer |
<optional> |
0
|
A buffer used to create the destination channel which
receives all values that did not pass the predicate. If this is a number, a
module:cispy/buffers~FixedBuffer of that size will be used. If this is |
Returns:
An array of two channels. The first is the destination channel with all of the values that passed the predicate, the second is the destination channel with all of the values that did not pass the predicate.
- Type
- Array.<module:cispy/core~Channel>
(static) pipe(src, dest, keepOpenopt) → {module:cispy/channel~Channel}
- Source:
Pipes the values from one channel onto another channel.
This ties two channels together so that puts on the source channel can be taken off the destination channel. This does not duplicate values in any way - if another process takes a value off the source channel, it will never appear on the destination channel. In most cases you will not want to take values off a channel once it's piped to another channel, since it's difficult to know which values will go to which channel.
Closing either channel will break the connection between the two. If the source channel is closed, the destination
channel will by default also be closed. However, passing true
as the third parameter will cause the destination
channel to remain open even when the source channel is closed (the connection is still broken however).
Because of the ability to leave the destination channel open, a possible use case for this function is to wrap the
destination channel(s) of one of the other flow control functions below to have a channel that survives the source
channel closing. The rest of those functions (aside from the special-case
tap
) automatically close their destination channels when the
source channels close.
const { go, chan, put, take, close, utils } = cispy;
const { pipe } = utils;
const input = chan();
const output = pipe(input, chan());
go(async () => {
await put(input, 1);
await put(input, 2);
close(input);
});
go(async () => {
console.log(await take(output)); // -> 1
console.log(await take(output)); // -> 2
console.log(output.closed); // -> true
});
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
src |
module:cispy/channel~Channel | The source channel. |
||
dest |
module:cispy/channel~Channel | The destination channel. |
||
keepOpen |
boolean |
<optional> |
false
|
A flag to indicate that the destination channel should be kept open after the source channel closes. By default the destination channel will close when the source channel closes. |
Returns:
The destination channel.
(static) reduce(fn, ch, init) → {module:cispy/channel~Channel}
- Source:
Creates a single value from a channel by running its values through a reducing function.
For every value put onto the input channel, the reducing function is called with two parameters: the accumulator that
holds the result of the reduction so far, and the new input value. The initial value of the accumulator is the third
parameter to reduce
. The reduction is not complete until the input channel closes.
This function returns a channel. When the final reduced value is produced, it is put onto this channel, and when that value is taken from it, the channel is closed.
const { go, chan, put, take, close, utils } = cispy;
const { reduce } = utils;
const input = chan();
const output = reduce((acc, value) => acc + value, input, 0);
go(async () => {
await put(input, 1);
await put(input, 2);
await put(input, 3);
close(input);
});
go(async () => {
const result = await take(output);
console.log(output); // -> 6
});
Note that the input channel must be closed at some point, or no value will ever appear on the output channel. The closing of the channel is what signifies that the reduction should be completed.
Parameters:
Name | Type | Description |
---|---|---|
fn |
module:cispy/utils~reducer | The reducer function responsible for turning the series of channel values into a single output value. |
ch |
module:cispy/channel~Channel | The channel whose values are being reduced into a single output value. |
init |
* | The initial value to feed into the reducer function for the first reduction step. |
Returns:
A channel that will, when the input channel closes, have the reduced value put into it. When this value is taken, the channel will automatically close.
(static) split(src, …buffersopt) → {Array.<module:cispy/channel~Channel>}
- Source:
Splits a single channel into multiple destination channels, with each destination channel receiving every value put onto the source channel.
Every parameter after the first represents the buffer from a single destination channel. Each 0
or null
will
produce an unbuffered channel, while each positive integer will produce a channel buffered by a fixed buffer of that
size. Each buffer will produce a buffered channel backed by that buffer. If there are no parameters after the first,
then two unbuffered channels will be produced as a default.
When the source channel is closed, all destination channels will also be closed. However, if destination channels are closed, they do nothing to the source channel.
const { go, chan, put, take, utils } = cispy;
const { split } = util;
const input = chan();
const outputs = split(input, 3);
go(async () => {
await put(input, 1);
await put(input, 2);
await put(input, 3);
});
go(async () => {
for (const output of outputs) { // Each output will happen 3 times
console.log(await take(output)); // -> 1
console.log(await take(output)); // -> 2
console.log(await take(output)); // -> 3
}
});
This function moves its values to the output channels asynchronously. This means that even when using unbuffered channels, it is not necessary for all output channels to be taken from before the next put to the input channel can complete.
Parameters:
Name | Type | Attributes | Default | Description |
---|---|---|---|---|
src |
module:cispy/channel~Channel | The source channel. |
||
buffers |
number | module:cispy/buffers~Buffer |
<optional> <repeatable> |
2
|
The buffers used to create the destination
channels. Each entry is treated separately. If one is a number, then a
FixedBuffer of that size will be used. If one is a |
Returns:
An array of destination channels.
- Type
- Array.<module:cispy/channel~Channel>
(static) tap(src, destopt) → {module:cispy/channel~Channel}
- Source:
Taps a channel, sending all of the values put onto it to the destination channel.
A source channel can be tapped multiple times, and all of the tapping (destination) channels receive each value put onto the tapped (source) channel.
This is different from split
in that it's temporary. Channels
can tap a channel and then untap it, multiple times, as needed. If a source channel has all of its taps removed, then
it reverts to a normal channel, just as it was before it was tapped.
Also unlike split
, each call can only tap once. For multiple
channels to tap a source channel, tap
has to be called multiple times.
Closing either the source or any of the destination channels has no effect on any of the other channels.
const { go, chan, put, take, utils } = cispy;
const { tap } = utils;
const input = chan();
const tapper = chan();
tap(input, tapper);
go(async () => {
await put(input, 1);
await put(input, 2);
});
go(async () => {
console.log(await take(tapper)); // -> 1
console.log(await take(tapper)); // -> 2
});
Parameters:
Name | Type | Attributes | Description |
---|---|---|---|
src |
module:cispy/channel~Channel | The channel to be tapped. |
|
dest |
module:cispy/channel~Channel |
<optional> |
The channel tapping the source channel. If this is not present, a new unbuffered channel will be created. |
Returns:
The destination channel. This is the same as the second argument, if present; otherwise it is the newly-created channel tapping the source channel.
(static) throttle(src, bufferopt, delay, optionsopt) → {module:cispy/channel~Channel}
- Source:
Throttles an input channel.
Throttling is the act of ensuring that something only happens once per time interval. In this case, it means that a value put onto the source channel is only made available to the destination channel once per a given number of milliseconds. An example usage would be with window scroll events; these events are nearly continuous as scrolling is happening, and perhaps we don't want to call an expensive UI updating function every time a scroll event is fired. We can throttle the input channel and make it only offer up the scroll events once every 100 milliseconds, for instance.
Throttling is effected by creating a new channel as a throttled destination for values put onto the source channel. Values will only appear on that destination channel once per delay interval; other values that are put onto the source channel in the meantime are discarded.
The delay
parameter controls how often a value can become available on the destination channel. When the first
value is put onto the source channel, it is immediately put onto the destination channel as well and the delay
begins. Any further values put onto the source channel during that delay are not passed through; only when the
delay expires is the last input value made available on the destination channel. The delay then begins again, so that
further inputs are squelched until that delay passes. Throttling continues, only allowing one value through per
interval, until an entire interval passes without input.
This behavior can be modified by three options: leading
, trailing
, and cancel
.
If both leading
and trailing
are true
, values will not be duplicated. The first value put onto the source
channel will be put onto the destination channel immediately (per leading
) and the delay will begin, but a value
will only be made available on the destination channel at the end of the delay (per trailing
) if another input
value was put onto the source channel before the delay expired.
Parameters:
Name | Type | Attributes | Default | Description | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
src |
module:cispy/channel~Channel | The source channel. |
||||||||||||||||||||||
buffer |
number | module:cispy/buffers~Buffer |
<optional> |
0
|
A buffer used to create the destination channel. If
this is a number, a FixedBuffer of that size will be used. If this is
|
||||||||||||||||||||
delay |
number | The throttling delay, in milliseconds. |
||||||||||||||||||||||
options |
Object |
<optional> |
{}
|
A set of options to further configure the throttling. Properties
|
Returns:
} The newly-created destination channel, where all of the values will be throttled from the source channel.
(static) untap(src, dest)
- Source:
Untaps a previously tapping destination channel from its source channel.
This removes a previously created tap. The destination (tapping) channel will stop receiving the values put onto the source channel.
If the destination channel was not, in fact, tapping the source channel, this function will do nothing. If all taps are removed, the source channel reverts to normal (i.e., it no longer has the tapping code applied to it and can be taken from as normal).
Parameters:
Name | Type | Description |
---|---|---|
src |
module:cispy/channel~Channel | The tapped channel. |
dest |
module:cispy/channel~Channel | The channel that is tapping the source channel that should no longer be tapping the source channel. |
(static) untapAll(src)
- Source:
Removes all taps from a source channel.
The previously-tapped channel reverts to a normal channel, while any channels that might have been tapping it no longer receive values from the source channel. If the source channel had no taps, this function does nothing.
Parameters:
Name | Type | Description |
---|---|---|
src |
module:cispy/channel~Channel | The tapped channel. All taps will be removed from this channel. |