// borrowed (mostly) from praxis-ui

import { map, Observable } from "rxjs";
import { BackoffConfig, retryOnBackoff } from "./backoff";

export type TypedSocket<OutT> = {
  sendJson(msg: OutT): void;
  ws: WebSocket;
};

type ActiveSocketNotification<InT, OutT> =
  | {
      type: "open";
      data: {
        iteration: number;
        socket: TypedSocket<OutT>;
      };
    }
  | {
      type: "msg";
      data: InT | ArrayBuffer;
    };

type SocketNotification<InT, OutT> =
  | ActiveSocketNotification<InT, OutT>
  | {
      type: "close";
    };

export function socketRx<InT, OutT>(
  backoffConfig: BackoffConfig,
  socketUrl: string
): Observable<SocketNotification<InT, OutT>> {
  return retryOnBackoff(
    backoffConfig,
    (iteration) =>
      new Observable<ActiveSocketNotification<InT, OutT>>((subscriber) => {
        const pingTimer: number = 0;

        // todo: adjustable ip
        const ws = new WebSocket(socketUrl);
        ws.binaryType = "arraybuffer";

        ws.addEventListener("open", () => {
          subscriber.next({
            type: "open",
            data: {
              iteration,
              socket: {
                sendJson(msg) {
                  ws.send(JSON.stringify(msg));
                },
                ws,
              },
            },
          });
        });

        ws.addEventListener("message", (event) => {
          const rawData = event.data;
          let data: InT | ArrayBuffer;
          if (typeof rawData === "string") {
            try {
              data = JSON.parse(rawData);
            } catch (error) {
              console.error(`Failure parsing json ${rawData}`);
              subscriber.complete();
              return;
            }
          } else if (Array.isArray(rawData)) {
            console.warn("BUFFER ARRAY");
            data = concatBuffers(rawData);
          } else {
            // arraybuffer
            data = rawData;
          }

          subscriber.next({
            type: "msg",
            data,
          });
        });

        ws.addEventListener("close", () => {
          subscriber.complete();
        });

        ws.addEventListener("error", () => {
          console.error("SOCKET ERROR");
          subscriber.complete();
        });

        return () => {
          if (pingTimer) {
            clearInterval(pingTimer);
          }
          ws.close();
        };
      })
  ).pipe(
    map((notification) =>
      notification.kind === "N" ? notification.value : { type: "close" }
    )
  );
}

function concatBuffers(buffers: Buffer[]): ArrayBuffer {
  const length = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);

  const concatenated = new Uint8Array(length);
  buffers.reduce((index, buf) => {
    const bytes = new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
    concatenated.set(bytes, index);
    return index + bytes.length;
  }, 0);

  return concatenated.buffer;
}
