import { interval, Observable, Subject, throwError, timer } from 'rxjs';
import {
  filter,
  finalize,
  mergeMap,
  retryWhen,
  takeUntil,
} from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { v4 as generateUuid } from 'uuid';

import { Inject, Injectable, Optional } from '@angular/core';
import {
  IS_ENV_PRODUCTION,
  WEB_SOCKET_CONFIG,
  WEB_SOCKET_URL,
  WebSocketConfig,
} from '@multisite-pv/environment.token';
import { AuthService } from '@oculus/auth';

const KEEP_ALIVE_INTERVAL_MS = 480000;
const MAX_RETRY_ATTEMPTS = 10;
const SCALING_DURATION_MS = 2000;

@Injectable({
  providedIn: 'root',
})
export class NotificationService<T = any> {
  protected webSocketSubject$: WebSocketSubject<T> | undefined;
  protected webSocketClosed$ = new Subject<boolean>();

  messages$ = new Subject<T>();

  constructor(
    @Optional()
    @Inject(WEB_SOCKET_CONFIG)
    private webSocketConfig: WebSocketConfig,
    @Inject(WEB_SOCKET_URL) private webSocketUrl: string,
    @Inject(IS_ENV_PRODUCTION) private production: boolean,
    private auth: AuthService,
  ) {
    this.initWebSocketConnection();
    this.keepWebSocketConnectionAlive();
  }

  sendMessage(message: T): void {
    if (!this.webSocketSubject$) {
      return;
    }

    this.webSocketSubject$.next(message);
  }

  protected initWebSocketConnection(): void {
    this.logDebug('initWebSocketConnection');
    this.auth.user$
      .pipe(
        filter((user) => !!user),
        mergeMap(({ signInUserSession: { accessToken, idToken } }) => {
          this.webSocketSubject$ = webSocket<T>({
            url: this.generateAuthUrl(
              this.webSocketUrl,
              idToken.jwtToken,
              accessToken.jwtToken,
            ),
          });
          return this.webSocketSubject$;
        }),
        retryWhen(this.genericRetryStrategy()),
      )
      .subscribe(
        (message) => {
          this.logDebug(message);
          this.messages$.next(message as T);
        },
        (error) => console.error(error),
        () => {
          this.webSocketClosed$.next(true);
          this.webSocketClosed$.complete();
        },
      );
  }

  protected keepWebSocketConnectionAlive(
    message: unknown = { action: 'keep' },
  ): void {
    this.logDebug('keepWebSocketConnectionAlive');
    interval(KEEP_ALIVE_INTERVAL_MS)
      .pipe(takeUntil(this.webSocketClosed$))
      .subscribe(() => this.sendMessage(message as T));
  }

  protected generateAuthUrl(
    webSocketUrl: string,
    idToken: string,
    accessToken: string,
  ): string {
    return (
      `${webSocketUrl}?idToken=${idToken}` +
      `&accessToken=${accessToken}` +
      `&x-correlation-id=${generateUuid()}`
    );
  }

  protected genericRetryStrategy =
    ({
      maxRetryAttempts = this.webSocketConfig?.maxRetryAttempts ??
        MAX_RETRY_ATTEMPTS,
      scalingDuration = this.webSocketConfig?.scalingDuration ??
        SCALING_DURATION_MS,
    }: {
      maxRetryAttempts?: number;
      scalingDuration?: number;
    } = {}) =>
    (attempts: Observable<any>) =>
      attempts.pipe(
        mergeMap((error, i) => {
          const retryAttempt = ++i;
          if (retryAttempt > maxRetryAttempts) {
            return throwError(error);
          }

          this.logDebug(
            `Attempt ${retryAttempt}: retrying to connect with Web Socket ` +
              `in ${retryAttempt * scalingDuration}ms`,
          );

          return timer(retryAttempt * scalingDuration);
        }),
        finalize(() => this.logDebug('Done retrying connection')),
      );

  protected logDebug(message: any) {
    if (!this.production) {
      console.debug('Notification:', message);
    }
  }
}
