export type Processor<E> = (event: E) => void;

export interface Listener<E> {
  filter: (delta: E) => boolean;
  process: Processor<E>;
}

export class EventStreamer<E> {
  readonly listeners: Map<number, Listener<E>> = new Map();
  subKeyCounter = 0;

  listen(listener: Listener<E>): Function {
    const subKey = this.subKeyCounter++;
    this.listeners.set(subKey, listener);
    return () => this.listeners.delete(subKey);
  }

  broadcast(event: E) {
    this.listeners.forEach((listener) => {
      if (listener.filter(event)) {
        listener.process(event);
      }
    });
  }
}

export class EventBatcher<E> {
  queuedEvents: E[] = [];
  streamer: EventStreamer<E>;

  constructor(streamer: EventStreamer<E>) {
    this.streamer = streamer;
  }

  event(event: E) {
    this.queuedEvents.push(event);
    return this;
  }

  commit() {
    this.queuedEvents.forEach((delta) => this.streamer.broadcast(delta));
    this.queuedEvents = [];
  }
}
