The Bus
The Bus is the feature that makes hurried different from every other worker library.
One event map. Two endpoints. Zero
any.Declare a single
type Eventsand both sides of the worker boundary speak the same typed language.
The mental model
You declare an Events map once. The main thread uses thread.on / emit / once. The worker uses the same API via the bus argument (inline) or workerBus() (file). Payloads are inferred. Renaming a field breaks both sides at compile time.
The simplest possible example
import { Thread } from 'hurried';
type Events = {
progress: { done: number; total: number };
log: string;
cancel: void; // payload-less event
};
const thread = Thread.fromFunction<Events, number, number>((bus, n) => {
bus.emit('log', `starting with n=${n}`);
for (let i = 0; i < n; i++) {
if (i % 1_000_000 === 0) {
bus.emit('progress', { done: i, total: n });
}
}
return n;
});
thread.on('progress', (p) => console.log(`${p.done}/${p.total}`));
thread.on('log', (m) => console.log(`[worker] ${m}`));
await thread.run(50_000_000);
thread.emit('cancel'); // void → no payload arg
await thread.terminate();
Inline tasks with two or more declared parameters receive a Bus<TEvents> as the first argument. Single-param tasks ((n) => …) keep the simple no-bus shape. So either of these is valid:
Thread.fromFunction((n: number) => n * 2); // no bus
Thread.fromFunction<Events, number, number>((bus, n) => …); // typed bus
The Bus API
The five methods you'll use:
bus.emit(event, payload?) // send to the other side
const off = bus.on(event, listener) // subscribe; off() to remove
bus.once(event, listener) // one-shot subscribe
bus.off(event, listener) // manually remove
const payload = await bus.waitFor(event, { signal? }) // Promise<TEvents[event]>
Plus two helpers when you want them:
bus.clear() // remove every listener
bus.listenerCount(event?) // diagnostics
on() returns its own unsubscribe
const off = thread.on('progress', render);
// later:
off(); // clean detach
No need to keep the listener reference around.
Void events
Some events don't need a payload. Use void:
type Events = { cancel: void; tick: void };
thread.emit('cancel'); // no second arg required
thread.emit('tick');
thread.on('cancel', () => console.log('bye'));
TypeScript enforces this with the EmitArgs helper type.
waitFor — promise-style awaits
type Events = { ready: { version: string } };
const { version } = await thread.bus().waitFor('ready', { signal });
console.log(`worker ${version} is ready`);
Combine with AbortSignal for cancellable awaits.
Inline tasks vs file-based workers
You can use the Bus two ways depending on how complex the worker is.
Inline (simple)
The task is serialized to source, so it can't reference closure variables — but it gets a bus argument out of the box.
const thread = Thread.fromFunction<Events, number, number>((bus, n) => {
bus.emit('progress', { done: 0, total: n });
return n;
});
File-based (recommended for non-trivial logic)
You can import anything you want and reach for the bus via workerBus<Events>():
// worker.ts
import { defineWorker, workerBus } from 'hurried';
import { someBigDependency } from './deps.js';
export type Events = { progress: { done: number; total: number } };
const bus = workerBus<Events>();
export default defineWorker({
process(items: string[]) {
items.forEach((it, i) => {
someBigDependency(it);
bus.emit('progress', { done: i + 1, total: items.length });
});
return items.length;
},
});
// main.ts
import { Thread } from 'hurried';
import type { Events } from './worker.js';
const thread = Thread.fromFile<Events>(new URL('./worker.js', import.meta.url));
thread.on('progress', (p) => render(p));
await thread.run('process', ['a', 'b', 'c']);
Pools and the Bus
The Bus surface is identical on pools — events from any worker are aggregated, and pool.emit() broadcasts to all workers.
const pool = new Pool<Events, number, number>({ size: 4, task });
pool.on('progress', (p) => console.log(p)); // from ANY worker
pool.emit('cancel'); // to ALL workers
See Aggregated events for the full pattern.
Caveats
Tight loops won't process incoming bus messages
Node workers can only process incoming messages when the event loop drains. If your worker is inside a sync loop, incoming bus.on() events queue up but listeners don't run until the loop ends.
For cooperative cancellation, yield periodically:
async (bus, n) => {
let stop = false;
bus.on('cancel', () => { stop = true; });
const chunk = 5_000_000;
for (let i = 0; i < n; i += chunk) {
if (stop) return 'cancelled';
for (let j = 0; j < chunk; j++) work(i + j);
await new Promise((r) => setImmediate(r)); // ← drain the queue
}
return 'done';
}
See the cooperative cancellation pattern for the full example.
Payloads must be structured-cloneable
Bus payloads cross the worker boundary via Node's structured clone algorithm — same rules as postMessage. Functions, DOM nodes, and class instances with custom prototypes won't survive. Plain objects, typed arrays, Map, Set, Date all work.