reduce

Applies a reducer function over the stream, extending each signal with accumulated state.

Signature

function reduce<T extends Signal, A extends object>(
reducer: (acc: A, signal: T) => A,
seed?: A
): Operator<T, ExtendSignalValue<T, A>>

Example

import { singlePointer } from "cereb";
import { reduce } from "cereb/operators";
singlePointer(element)
.pipe(
reduce(
(acc, signal) => ({
count: acc.count + 1,
totalDistance: acc.totalDistance + Math.abs(signal.value.x - acc.lastX),
lastX: signal.value.x
}),
{ count: 0, totalDistance: 0, lastX: 0 }
)
)
.on((signal) => {
console.log(signal.value.count, signal.value.totalDistance);
});

Difference from RxJS scan

Unlike RxJS’s scan which replaces the entire value, Cereb’s reduce extends the signal value while preserving signal metadata (kind, deviceId, createdAt).

Use Cases

Track Cumulative Distance

pan(element)
.pipe(
reduce(
(acc, s) => ({
cumulativeX: acc.cumulativeX + Math.abs(s.value.deltaX),
cumulativeY: acc.cumulativeY + Math.abs(s.value.deltaY)
}),
{ cumulativeX: 0, cumulativeY: 0 }
)
)
.on((signal) => {
// signal.value has both pan properties AND accumulated values
});

Count Events

keyboard(window)
.pipe(
reduce((acc) => ({ keyPressCount: acc.keyPressCount + 1 }), { keyPressCount: 0 })
)
.on((signal) => {
console.log(`Key #${signal.value.keyPressCount}: ${signal.value.key}`);
});