Use callbag operators on normal JS variables in React components. Easily debounce, flatten, throttle, maintain order of async operations (like data fetching), etc.
npm i react-callbag-streams
import { useStream } from 'react-callbag-streams'
import { debounce, filter, flatten, map, fromPromise } from 'callbag-common'
function App() {
const [q, setQ] = useState('')
const [info, loading] =
useStream(q, // π take the stream of values of q
debounce(200), // π debounce by 200 ms
map(q => fromPromise(pokeInfo(q))), // π get the pokemon info (which is an async function)
flatten, // π flatten requests (only keep one request in-flight)
)
return (
<>
<input type='text'
placeholder='pokemon name ...'
onInput={e => setQ((e.target as any).value)}/>
<br/>
<pre>
{ loading ? 'loading ...' : JSON.stringify(info, undefined, 2) }
</pre>
</>
)
}
Some extremely basic reactive programming stuff are weirdly difficult and migrain inducing in React, while they are pretty trivial with any reactive programming library.
react-callbag-streams
provides hooks that allow you to treat variables as streams, use these super-convenient
operators on the stream, and then treat the whole thing again as a plain variable (with also a loading indicator
as a bonus).
Imagine you fetch data like this:
function App() {
const [q, setQ] = useState('')
const [info, setInfo] = useState({})
useEffect(async () => {
setInfo(await pokeInfo(q))
}, [q])
...
}
Imagine you want to fetch info for "charizard"
and then for "snorlax"
, but the responses
to these requests come out of order. Now your query is for "snorlax"
but you are displaying information
for "charizard"
.
Fixing this issue is trivial with callbag-flatten
:
import { flatten, map, fromPromise } from 'callbag-common'
import { useStream } from 'react-callbag-streams'
function App() {
const [q, setQ] = useState('')
const [info] = useStream(
q,
map(q => fromPromise(pokeInfo(q))),
flatten
)
...
}
Debouncing, throttling, etc. become extremely easy operations when you treat your data as streams:
import { flatten, map, fromPromise, debounce } from 'callbag-common'
import { useStream } from 'react-callbag-streams'
function App() {
const [q, setQ] = useState('')
const [info] = useStream(
q,
debounce(200),
map(q => fromPromise(pokeInfo(q))),
flatten
)
...
}
- They are extremely lightweight. For example, callbag-merge is under
350B
, while themerge()
factory in RxJS is about4.3KB
in size. - They are pritty simple (for example compare callbag-map with RxJS's
map()
). - Callbags is a specification, not a library. This, combined with the simplicity, allows for fully decentralized and community-driven development of utilities and libraries, while for something like RxJS most of the utilities come from the library itself.
The whole point of this library is that it allows using callbag operators on variables inside React components. You can find a collection of useful and commonly used callbag operators here or here, find a list of available community operators here or here, or even easily create your own operators.
π useStream()
allows you to treat a variable as a stream:
import { useStream } from 'react-callbag-streams'
import { filter } from 'callbag-common'
function MyComp({ prop }) {
//
// π even will only have even values of prop, and won't be updated
// for its odd values.
//
const [even] = useStream(prop, filter(x => x % 2 === 0))
...
}
import { useStream } from 'react-callbag-streams'
import { debounce } from 'callbag-common'
function MyComp({ prop }) {
//
// π debounced will be the latest value of prop, 200ms after its last change.
// it will not be updated while prop is changing at intervals shorter than 200ms.
//
const [debounced] = useStream(prop, debounce(200))
...
}
import { useStream } from 'react-callbag-streams'
import { map, fromPromise, flatten } from 'callbag-common'
function MyComp({ prop }) {
//
// π fetched will be the result of asyncFetch() for latest value of prop,
// even if values for asyncFetch come out of order.
//
const [fetched] = useStream(prop, map(p => fromPromise(asyncFetch(p))), flatten)
...
}
π useStream()
also provides a loading indicator. The loading indicator is true
between the time that the
source variable changes until the next emission of the stream.
const [fetched, loading] = useStream(prop, map(p => fromPromise(asyncFetch(p))), flatten)
β‘β‘ Checkout this real-life example.
π useMergedStream()
allows you to treat multiple variables as one stream. Whenever any of the variables
has a new value, the stream will emit that value.
import { useMergedStream } from 'react-callbag-streams'
import { debounce } from 'callbag-common'
function MyComp() {
const [a] = useState()
const [b] = useState()
//
// π merged will be the latest value of either a or b (based on which changed later),
// 200ms after the latest change to either.
//
const [merged] = useMergedStream([a, b], debounce(200))
...
}
π useCombinedStream()
is similar to useMergedStream()
, except that it emits an array of latest values of all provided
variables, every time any of them changes:
import { useCombinedStream } from 'react-callbag-streams'
//
// this app finds repositories on github based on a query and language
//
function App() {
const [q, setQ] = useState('')
const [l, setL] = useState('javascript')
const [repos, loading] = useCombinedStream(
[q, l], // π a combined stream of query and language
filter(([q]) => q.length >= 2), // π filter out when query is too short
debounce(1000), // π debounce the combo by a second (github API will block us o.w.)
map(([q, l]) => fromPromise(search(q, l))), // π search in github api using query and language
flatten, // π flatten the stream (preserve order of responses)
map(res => // π format the incoming result ...
res.items.map(item => // .. take each repository ...
({ name: item.name, url: item.html_url }) // .. get its name and its url
)
),
)
return <>
<input type='text'
placeholder='keywords ....'
value={q}
onInput={e => setQ((e.target as any).value)}/>
<input type='text'
placeholder='language'
value={l}
onInput={e => setL((e.target as any).value)}/>
<br/>
{ q.length >= 2 ?
(
loading ? 'loading ...' : (
<ul>
{ repos?.map(repo => (
<li key={repo.url}><a href={repo.url}>{repo.name}</a></li>
))}
</ul>
)
) : ''
}
</>
}
π useSource()
provides access to the underlying callbags created from variables / parameters. It is useful for situations
where advanced stream combination is needed:
import { useStream, useSource } from 'react-callbag-streams'
import fetch from './fetch'
//
// in this example we have a list of entries that is fetched in a paginated manner,
// and is based on a given query.
//
// - when the user changes the query, we want the pagination to reset and result set to be refreshed
// - when the user loads more content, we want them to be added to current data
//
function App() {
const [query, setQuery] = useState('')
const [page, setPage] = useState(0)
const page$ = useSource(page) // π direct access to a stream from values of page
const [entries, loading] = useStream(
query, // π for each query
tap(() => setPage(0)), // .. reset the page counter
map(q => // .. and map the query to a query-specific sub-stream
pipe(
combine(of(q), page$), // π combine query value and values from page stream
map(([q, p]) => fromPromise(fetch(q, p))), // π for each new pair, fetch data
flatten,
scan((all, page) => [...all, ...page], []), // π accumulate results (this is query-specific stream)
)
),
flatten // π flatten the query-speicifc substream
)
return <>
<input
type='text'
placeholder='type something ...'
onInput={e => setQuery((e.target as any).value)}
/>
{ entries?.map(entry => <div>{entry}</div>) : '' }
<br />
<button onClick={() => setPage(page + 1)}>Load More</button>
</>
}
π useSignal()
provides a callbag source you can utilize in your streams. In the pagination example provided above, instead of a state
for the page
, we can have a loadMoreSignal
and calculate the page in the stream accordingly:
import { useStream, useSignal } from 'react-callbag-streams'
// ...
function App() {
// ...
const [loadMoreSignal, loadMore] = useSignal()
const [entries, loading] = useStream(
query,
map(q =>
pipe(
combine(
of(q),
pipe(
loadMoreSignal, // π so for each signal
scan(p => ++p, 0), // .. we increase the page number
startWith(0) // .. starting with 0
)
),
map(([q, p]) => fromPromise(fetch(q, p))),
flatten,
scan((all, page) => [...all, ...page], []),
)
),
flatten
)
return (
// ...
<button onClick={() => loadMore()}>Load More</button>
)
}
useSource()
and useSignal()
return callbags (basically raw streams) and not plain values, so you cannot use their
returned result as raw values. For turning them into raw values, you can simply use the useCallbag()
hook:
import useCallbag from 'use-callbag'
// ...
const [signal, loadMore] = useSignal()
const page = useCallbag(
0,
(initial) =>
pipe(signal, scan(p => ++p, initial))
)
// ...
return (
// ...
<label>Page: {page}</label>
// ...
)
Essentially, useStream()
is equivalent to a useSource()
followed by a useCallbag()
:
const [streamed] = useStream(prop, ...)
// -- OR --
const stream = useSource(prop)
const streamed = useCallbag(undefined, () => pipe(stream, ...))
Be nice and respectful, more importantly super open and welcoming to all.
π Useful commands for working on this repo:
git clone https://github.com/loreanvictor/react-callbag-streams.git
npm i # --> install dependencies
npm start # --> serves `samples/index.tsx` on localhost:3000
npm test # --> run all tests
npm run cov:view # --> view code coverage