import { Injectable, OnDestroy } from '@angular/core';
import { Store } from '@ngrx/store';
import { filterDefined } from 'app/app.utils';
import { AppState } from 'app/store';
import { fromSession } from 'app/store/session';
import { Subject, Subscription, filter, switchMap, take } from 'rxjs';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';

import { MessageService } from './message.service';

interface BaseSocketMessage {
  type: RR.WebsocketMessageType;
}

export interface CeleryNotificationSocketMessage extends BaseSocketMessage {
  type: 'REGISTER_CELERY_RESULT';
  task_id: string;
}

interface ReportEventMessage extends BaseSocketMessage {
  type: 'CLOSE_REPORT' | 'OPEN_REPORT' | 'USER_INACTIVE' | 'CONTINUE_REPORT' | 'REPORT_EDITED';
  report_id: number;
  user_id: number;
  user_name: string;
}

/**
 * Websocket messages that the client can send
 */
type SocketMessage = BaseSocketMessage | CeleryNotificationSocketMessage | ReportEventMessage;

interface CeleryCompleteResponse extends BaseSocketMessage {
  type: 'CELERY_TASK_COMPLETED' | 'CELERY_TASK_FAILED';
  task_id: string;
}

/**
 * Websocket messages that the server responds with
 */
interface BaseSocketResponse extends BaseSocketMessage {
  // TODO: expand to include all possible message types
  type: Exclude<RR.WebsocketMessageType, 'CELERY_TASK_COMPLETED'>;
  // For report concurrent editing
  editing?: string[]; // undefined for REPORT_UNBLOCKED
  // Number of unprocessed followup tasks
  count?: number;
}

export type SocketResponse = BaseSocketResponse | CeleryCompleteResponse;

@Injectable()
export class SocketService implements OnDestroy {
  /**
   * The rxjs/websocket subject
   * * next() - sends a message
   * * subscribe() - opens the socket and receives messages
   */
  private _ws: WebSocketSubject<SocketResponse | SocketMessage> | undefined;
  closed = true;
  pendingMessages: SocketMessage[] = [];
  /**
   * Separate Subject so components can subscribe early - before the websocket is created
   */
  messageReceivedEvent$ = new Subject<SocketResponse>();
  subscription = new Subscription();

  constructor(
    private store: Store<AppState>,
    private messageService: MessageService,
  ) {}

  ngOnDestroy(): void {
    // Unregister from push notification events
    this.send({ type: 'UNREGISTER_PUSH_NOTIFICATION' });
    this.subscription.unsubscribe();
  }

  createWebSocket() {
    this.closed = false;
    this.subscription.add(
      this.store
        .select(fromSession.selectRRConfig)
        .pipe(
          filterDefined(),
          take(1),
          filter((config) => !!config.WEBSOCKET_URL),
          switchMap((config) => {
            this._ws = webSocket({
              url: `${location.protocol === 'https:' ? 'wss' : 'ws'}://${location.host}${config.WEBSOCKET_URL}`,
              openObserver: {
                next: () => {
                  if (!this._ws) throw new Error('Unexpected undefined websocket');

                  this.closed = false;
                  // Register for push notification events
                  this._ws.next({ type: 'REGISTER_PUSH_NOTIFICATION' });
                  // Connection opened
                  while (this.pendingMessages.length > 0) {
                    const pendingMessage = this.pendingMessages.shift();
                    if (!pendingMessage) throw new Error('Unexpected undefined pendingMessage');
                    this._ws.next(pendingMessage);
                  }
                },
              },
            });

            return this._ws; // need to subscribe to it once, otherwise it never starts
          }),
        )
        .subscribe({
          next: (value) => {
            // TODO: can we remove this cast
            this.messageReceivedEvent$.next(value as SocketResponse);
          },
          error: () => {
            this.closed = true;
          },
          complete: () => {
            // We don't call `this._ws.complete()`. So this is unexpected.
            this.closed = true;
          },
        }),
    );
  }

  send(message: SocketMessage) {
    if (!this._ws) {
      console.warn("Dropped message because websocket isn't initialised yet");
      return;
    }
    if (this.closed) {
      this.pendingMessages.push(message);
      this.createWebSocket();
      return;
    }
    this._ws.next(message);
  }

  notifyCeleryResult(taskId: string) {
    this.send({ type: 'REGISTER_CELERY_RESULT', task_id: taskId });
    this.subscription.add(
      this.messageReceivedEvent$
        .pipe(
          filter(
            (message): message is CeleryCompleteResponse =>
              message.type === 'CELERY_TASK_COMPLETED' || message.type === 'CELERY_TASK_FAILED',
          ),
          filter((message) => message.task_id === taskId),
          take(1), // unsubscribe after the first message because it can only complete once
        )
        .subscribe({
          next: (_message) => {
            if (_message.type === 'CELERY_TASK_FAILED') {
              this.messageService.add({
                title: 'Task failed',
                message: `Rerun image similarity task has failed.`,
                type: 'danger',
              });
            } else {
              this.messageService.add({
                title: 'Task completed',
                message: `Rerun image similarity task has been completed.`,
                type: 'success',
              });
            }
          },
        }),
    );
  }
}
