reduce
Applies a reducer function over the stream, extending each signal with accumulated state.
Signature
function reduce<T extends Signal, A extends object>( reducer: (acc: A, signal: T) => A, seed?: A): Operator<T, ExtendSignalValue<T, A>>Example
import { singlePointer } from "cereb";import { reduce } from "cereb/operators";
singlePointer(element) .pipe( reduce( (acc, signal) => ({ count: acc.count + 1, totalDistance: acc.totalDistance + Math.abs(signal.value.x - acc.lastX), lastX: signal.value.x }), { count: 0, totalDistance: 0, lastX: 0 } ) ) .on((signal) => { console.log(signal.value.count, signal.value.totalDistance); });Difference from RxJS scan
Unlike RxJS’s scan which replaces the entire value, Cereb’s reduce extends the signal value while preserving signal metadata (kind, deviceId, createdAt).
Use Cases
Track Cumulative Distance
pan(element) .pipe( reduce( (acc, s) => ({ cumulativeX: acc.cumulativeX + Math.abs(s.value.deltaX), cumulativeY: acc.cumulativeY + Math.abs(s.value.deltaY) }), { cumulativeX: 0, cumulativeY: 0 } ) ) .on((signal) => { // signal.value has both pan properties AND accumulated values });Count Events
keyboard(window) .pipe( reduce((acc) => ({ keyPressCount: acc.keyPressCount + 1 }), { keyPressCount: 0 }) ) .on((signal) => { console.log(`Key #${signal.value.keyPressCount}: ${signal.value.key}`); });