import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Centrifuge, HistoryOptions, Subscription } from 'centrifuge';
import {
  BehaviorSubject,
  Observable,
  Subject,
  catchError,
  finalize,
  firstValueFrom,
  map,
  retry,
  throwError,
  timer
} from 'rxjs';
import { environment } from 'src/environments/environment';
import { ApiService } from './api.service';

class SecurityToken {
  public token: string;

  constructor(init?: Partial<SecurityToken>) {
    if (!init) {
      return;
    }
    Object.assign(this, init);
    this.token = init.token;
  }
}

export class Message<T> {
  data: T;
}

export class ShellMessage {
  type: string;
  payload: unknown;
}

export enum packetType {
  PingMessage = 'ping-message',
  TestPacket = 'test-packet',
  PacketError = 'packet-error',
  TraceroutePak = 'traceroute-Pak'
}

@Injectable({ providedIn: 'root' })
export class PubSubWsService extends ApiService {
  private wsHost = environment.wsHost;
  private RECONNECT_INTERVAL = 2000;
  private url = `wss://${this.wsHost}/connection/websocket/console`;
  private token: string;
  private client: Centrifuge;

  private _isConnectedSubject = new BehaviorSubject(false);
  IsConnectedSubject$ = this._isConnectedSubject.asObservable();

  private getWebSocketToken$ = this.http
    .get(`https://${this.wsHost}/connection/websocket/console/token`)
    .pipe(
      map((response) => {
        return new SecurityToken(response).token;
      }),
      catchError((err) => {
        this.handleError<SecurityToken>('pubsubws.service.getWebSocketToken');

        return throwError(() => err);
      })
    );

  private getSubscriptionToken$ = (channel: string) =>
    this.http
      .get(
        `https://${this.wsHost}/connection/websocket/console/subToken?channel=${channel}`
      )
      .pipe(
        map((response) => new SecurityToken(response).token),
        catchError((err) => {
          this.handleError<SecurityToken>('pubsubws.service.getSubscriptionToken');

          return throwError(() => err);
        })
      );

  constructor(private http: HttpClient) {
    super();
  }

  private setConnect(b: boolean) {
    this._isConnectedSubject.next(b);
  }

  public Send(ch: string, m: ShellMessage): void {
    this.client.publish(ch, m);
  }

  public connect(): void {
    this.getWebSocketToken$
      .pipe(
        retry({
          delay: (_error, _retryCount) => timer(this.RECONNECT_INTERVAL)
        })
      )
      .subscribe({
        next: (token) => {
          this.token = token;
          this.connectToPubSub();
        },
        error: (err) => {
          console.error(err);
        }
      });
  }

  private connectToPubSub(): void {
    this.client = new Centrifuge(this.url, {
      debug: false,
      getToken: (_ctx) => firstValueFrom(this.getWebSocketToken$)
    });

    this.client.setToken(this.token);

    this.client.on('connected', (_ctx) => {
      this.setConnect(true);
    });

    this.client.on('disconnected', (_ctx) => {
      this.setConnect(false);
    });

    this.client.connect();
  }

  listen<T>(channel: string): Observable<T> {
    const subscription = this.getSubscription(channel);

    const subject = new Subject<T>();

    subscription
      .on('publication', (message) => subject.next(message.data as T))
      .subscribe();

    this.log(`subscribed to channel ${channel}`);

    return subject.asObservable().pipe(finalize(() => subscription.unsubscribe()));
  }

  publish = (channel: string, data: unknown) => this.client.publish(channel, data);

  history = (channel: string, options?: HistoryOptions) => {
    const subscription = this.getSubscription(channel);

    subscription.subscribe();

    return subscription.history(options);
  };

  private getSubscription(channel: string): Subscription | null {
    let subscription = this.client.getSubscription(channel);
    if (!subscription) {
      subscription = this.client.newSubscription(channel, {
        getToken: (ctx) => firstValueFrom(this.getSubscriptionToken$(ctx.channel))
      });
    }

    return subscription;
  }
}
