@thi.ng/csp

@thi.ng/csp

npm versionnpm downloads Mastodon Follow

[!NOTE] This is one of 190 standalone projects, maintained as part of the @thi.ng/umbrella monorepo and anti-framework.

🚀 Please help me to work full-time on these projects by sponsoring me on GitHub. Thank you! ❤️

About

ES6 promise based CSP primitives & operations.

  • Channel with/without buffering and/or transducers
    • optional channel IDs
    • choice of buffer behaviors (fixed, sliding, dropping)
    • channel selection
    • channel merging (many-to-one, serial or parallel)
    • channel piping (w/ transducers)
    • timeouts / sleeping / throttling / delaying
    • prepopulated channel ctors (iterators, ranges, promise, constants etc.)
  • Mult for channel multiplexing (one-to-many splitting)
    • individual transducers per tap
    • dynamic add/removal of taps
  • PubSub for topic subscriptions
    • each topic implemented as Mult
    • wildcard topic for processing fallthrough messages

Status

DEPRECATED - superseded by other package(s)

Search or submit any issues for this package

This package is deprecated. Please see the following actively maintained alternatives providing similar functionality:

Related packages

  • @thi.ng/rstream - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines

Installation

yarn add @thi.ng/csp

ES module import:

<script type="module" src="https://cdn.skypack.dev/@thi.ng/csp"></script>

Skypack documentation

For Node.js REPL:

const csp = await import("@thi.ng/csp");

Package sizes (brotli'd, pre-treeshake): ESM: 2.58 KB

Dependencies

API

Generated API docs

File loading & word frequency analysis

import { Channel } from "@thi.ng/csp";
import * as tx from "@thi.ng/transducers";

// compose transducer to split source file into words
// and filter out short strings
const proc: tx.Transducer<string, string> = tx.comp(
tx.mapcat((src: string) => src.toLowerCase().split(/[^\w]+/g)),
tx.filter((w: string) => w.length > 1)
);

// define a channel which receives file paths
// and resolves them with their contents
const paths = new Channel<any>(
tx.map((path: string) =>
new Promise<string>(
resolve => fs.readFile(path, (_, data) => resolve(data.toString()))
)
)
);

// define multiplexed output channel
// items in this channel will have this form: `[word, count]`
const results = new Mult("results");

// tap result channel and sum word counts
const counter = results
.tap(tx.map(x => x[1]))
.reduce(tx.add());

// 2nd output channel with streaming sort transducer
// (using a sliding window size of 500 items) and dropping
// words with < 20 occurrences
const sorted = results.tap(
tx.comp(
tx.streamSort(500, x => x[1]),
tx.dropWhile(x => x[1] < 20)
)
);

// define workflow:
// pipe source files into a new channel and
// reduce this channel using `frequencies` reducer
// finally stream the result map (word frequencies)
// into the `sorted` channel
// (`freqs` is a JS Map and is iterable)
paths.pipe(proc)
.reduce(tx.frequencies())
.then(freqs => results.channel().into(freqs));

// start tracing sorted outputs and
// wait for all to finish
Promise
.all([sorted.consume(), counter])
.then(([_, num]) => console.log("total words:", num));

// no real work has been executed thus far (only scheduled via promises)
// now kick off entire process by writing file paths into the 1st channel
paths.into(["src/channel.ts", "src/mult.ts", "src/pubsub.ts"]);

// results-tap1 : [ 'let', 20 ]
// results-tap1 : [ 'topic', 20 ]
// results-tap1 : [ 'chan', 20 ]
// results-tap1 : [ 'number', 22 ]
// results-tap1 : [ 'buf', 23 ]
// results-tap1 : [ 'length', 23 ]
// results-tap1 : [ 'tx', 25 ]
// results-tap1 : [ 'state', 25 ]
// results-tap1 : [ 'from', 27 ]
// results-tap1 : [ 'close', 28 ]
// results-tap1 : [ 'new', 33 ]
// results-tap1 : [ 'args', 34 ]
// results-tap1 : [ 'id', 36 ]
// results-tap1 : [ 'any', 36 ]
// results-tap1 : [ 'src', 38 ]
// results-tap1 : [ 'if', 40 ]
// results-tap1 : [ 'return', 47 ]
// results-tap1 : [ 'channel', 57 ]
// results-tap1 : [ 'this', 120 ]
// results-tap1 done
// total words: 1607

Channel merging

import { Channel } from "@thi.ng/csp";
import { push } from "@thi.ng/transducers";

Channel.merge([
Channel.range(0, 3),
Channel.range(10, 15),
Channel.range(100, 110)
]).reduce(push()).then(console.log);

// [ 0, 100, 101, 102, 103, 1, 2, 104, 105, 10, 11, 12, 13, 106, 14, 107, 108, 109 ]

// emit tuples of values read from all inputs
// preserves ordering of all inputs, but
// throughput controlled by slowest input
// by default stops & closes when any of the inputs closes
Channel.mergeTuples([
Channel.from([1, 2, 3]),
Channel.from([10, 20, 30, 40]),
Channel.from([100, 200, 300, 400, 500])
], null, false).consume();

// chan-3 : [ 1, 10, 100 ]
// chan-3 : [ 2, 20, 200 ]
// chan-3 : [ 3, 30, 300 ]
// chan-3 done

// same as above, however continues until all inputs are closed
Channel.mergeTuples([
Channel.from([1, 2, 3]),
Channel.from([10, 20, 30, 40]),
Channel.from([100, 200, 300, 400, 500])
], null, false).consume();

// chan-3 : [ 1, 10, 100 ]
// chan-3 : [ 2, 20, 200 ]
// chan-3 : [ 3, 30, 300 ]
// chan-3 : [ undefined, 40, 400 ]
// chan-3 : [ undefined, undefined, 500 ]
// chan-3 done

PubSub

import { Channel, PubSub } from "@thi.ng/csp";
import { map } from "@thi.ng/transducers";

// define a channel publisher with transducer and topic function applied to each item
// the input channel receives names and transforms them into indexable objects
const pub = new PubSub(
new Channel<any>("users", map((x: string) => ({ type: x.charAt(0), val: x }))),
(x) => x.type
);

// create subscriptions (channel + debug consumer)
// under the hood each topic is a Mult (multiplexed channel)
// sub channels are automatically named:
// `<src-id>-<topic>-tap<tapid>` (see below)
for (let i of "abc") {
pub.sub(i).consume();
}

// start processing, then close everything down
// (pubsubs & mults are closed recursively once the input channel is closed)
pub.channel().into(["alice", "bert", "bella", "charlie", "arthur"]);

// users-a-tap0 : { type: 'a', val: 'alice' }
// users-b-tap0 : { type: 'b', val: 'bert' }
// users-b-tap0 : { type: 'b', val: 'bella' }
// users-c-tap0 : { type: 'c', val: 'charlie' }
// users-a-tap0 : { type: 'a', val: 'arthur' }
// users-b-tap0 done
// users-c-tap0 done
// users-a-tap0 done

Authors

If this project contributes to an academic publication, please cite it as:

@misc{thing-csp,
title = "@thi.ng/csp",
author = "Karsten Schmidt",
note = "https://thi.ng/csp",
year = 2016
}

License

© 2016 - 2024 Karsten Schmidt // Apache License 2.0

Generated using TypeDoc