import {
  defer,
  EMPTY,
  filter,
  map,
  MonoTypeOperatorFunction,
  Observable,
  of,
  OperatorFunction,
  scan,
  ThrottleConfig,
} from "rxjs";

// our trick, to avoid writing the raw observable constructor, is to pass a
// mutable object through the system
type Acc<T> = {
  batch: T[];
};

/**
 * Takes one of the time-control observable operators like `debounce` or `audit`
 * and changes them from dropping unused inputs to batching all the inputs into
 * one array.
 *
 * @param operator the operator to together-ify. (eg. `debounceTime(100)`)
 * @returns a new operator you can pipe with
 */
export function together<T>(
  operator: MonoTypeOperatorFunction<Acc<T>>
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) =>
    source.pipe(
      scan<T, Acc<T>>((acc: null | Acc<T>, x) => {
        // the first run we have to make a new accumulator
        if (!acc) return { batch: [x] };

        acc.batch.push(x);
        return acc;

        // we start with "null" as an accumulator instead of {batch: []} because
        // that {batch: []} value would be shared across multiple subscriptions
        // of this observable (we use mutation)
      }, null as any),
      operator,
      map((acc) => {
        const batch = acc.batch;
        acc.batch = []; // new inputs will start accumulating in a new array
        return batch;
      })
    );
}

/**
 * This is sort of like `scan`, except that it allows for state that is not sent
 * down the observable.
 *
 * @param mapper The code that remaps the data. It needs to output `[nextState, output]`
 */
export function mapWithState<State, In, Out>(
  mapper: (state: void | State, arg: In) => [State, Out]
): OperatorFunction<In, Out> {
  return (source) =>
    defer(() => {
      let state: void | State;
      return source.pipe(
        map((value) => {
          const result = mapper(state, value);
          state = result[0];
          return result[1];
        })
      );
    });
}

/**
 * Takes all the null/undefined values out of the observable
 */
export function dropNullish<T>(): OperatorFunction<
  T,
  Exclude<T, void | undefined | null>
> {
  // @ts-expect-error no way to type this I think
  return filter((x) => x != null);
}

/**
 * Emits a value from the source as an observable and then ignores subsequent
 * values until the emitted value is subscribed to.
 *
 * @param op
 */
export function throttled<T>(
  config?: ThrottleConfig
): OperatorFunction<T, Observable<T>> {
  type Scheduled = {
    isLeading: boolean;
    input: T;
  };

  const leading = !config || config.leading;
  const trailing = config && config.trailing;

  return (source) =>
    new Observable((sink) => {
      let scheduled: null | Scheduled = null;

      const sub = source.subscribe({
        next(input) {
          if (!scheduled || (leading && trailing && scheduled.isLeading)) {
            const myScheduled: Scheduled = {
              isLeading: !scheduled,
              input,
            };
            scheduled = myScheduled;

            sink.next(
              // lazily reads the input (it may update if trailing is true)
              defer(() => {
                // mark that this has now happened
                if (scheduled === myScheduled) {
                  scheduled = null;
                }

                // the user asking for neither leading or trailing is pretty
                // pathological, but the case is handled for correctness
                return leading || trailing ? of(myScheduled.input) : EMPTY;
              })
            );
          } else if (trailing) {
            scheduled.input = input;
          }
        },

        complete() {
          sink.complete();
        },

        error(error) {
          sink.error(error);
        },
      });

      return () => {
        sub.unsubscribe();
      };
    });
}

/**
 * Allows for some cleanup code to run on unsubscription.
 *
 * @param onUnsubscribe Called
 * @returns
 */
export function tapUnsubscribed<T>(
  onUnsubscribe: () => void
): MonoTypeOperatorFunction<T> {
  return (source) =>
    new Observable((sink) => {
      const sub = source.subscribe(sink);
      return () => {
        sub.unsubscribe();
        onUnsubscribe();
      };
    });
}
