import { Socket, io } from 'socket.io-client';
import { IRealtimeClient } from './realtime-client.interface';
import { Observable, Subject } from 'rxjs';

interface RealtimeMessageResponse {
  channel: string;
  payload: any;
}

export class GalactusClient implements IRealtimeClient {
  private client: Socket;
  private _galactusClientDisconnected$ = new Subject<boolean>();
  private tokens = new Map<string, string>();
  private subscriptionCallbacks = new Map<string, Array<(msg: any) => void>>();
  private MAX_ACK_TIMEOUT = 10000;
  private MAX_RETRY_ATTEMPTS = 5;
  private EXP_BACKOFF_SCALING_FACTOR = 200;

  constructor(url: string) {
    this.client = io(url, {
      ackTimeout: this.MAX_ACK_TIMEOUT,
      path: '/galactus',
      transports: ['websocket']
    });

    this.client.on('disconnect', (reason, desc) => {
      console.log('[GALACTUS] DISCONNECT: ', reason, desc);
      this._galactusClientDisconnected$.next(true);
    });

    this.client.on('connect', async () => {
      console.log('[GALACTUS] CONNECTED');

      const resubscriptions: Promise<void>[] = [];
      for (const channel of this.subscriptionCallbacks.keys()) {
        const resubscribe = async (): Promise<void> => {
          try {
            const token = this.tokens.get(channel) ?? '';
            await this._subscribeToChannel(channel, token);
            console.log(
              `[GALACTUS] Resubscribed to channel: ${channel} successfully`
            );
          } catch (err) {
            console.error(
              `[GALACTUS] Could not subscribe to channel: ${channel}, reason: `,
              err
            );
          }
        };
        resubscriptions.push(resubscribe());
      }
      await Promise.allSettled(resubscriptions);
      this._galactusClientDisconnected$.next(false);
    });

    this.client.on('message', (data: RealtimeMessageResponse) => {
      const { channel, payload } = data;
      const callbacks = this.subscriptionCallbacks.get(channel);
      callbacks?.forEach((callback) => callback(payload));
    });
  }

  unsubscribeFromChannel(channel: string): void {
    this.client.emit('unsubscribe', {
      channel: channel,
      token: this.tokens.get(channel)
    });
    this.subscriptionCallbacks.delete(channel);
    this.tokens.delete(channel);
    console.log(`Galactus unsubscribed from ${channel}`);
  }
  async subscribeToChannel(
    channel: string,
    token: string,
    callback: (message: any) => void
  ): Promise<void> {
    try {
      await this._subscribeToChannel(channel, token);
      // storing token
      this.tokens.set(channel, token);
      if (!this.subscriptionCallbacks.has(channel)) {
        this.subscriptionCallbacks.set(channel, []);
      }
      this.subscriptionCallbacks.get(channel)?.push(callback);
      console.log(`Subscribed to realtime channel: ${channel}`);
    } catch (err) {
      console.error('error: ', err);
    }
  }
  async publish(channel: string, token: string, message: any): Promise<void> {
    const { error } = await this.client.emitWithAck('publish', {
      channel: channel,
      token: token,
      payload: message
    });
    if (error) {
      throw new Error(error);
    }
  }

  realtimeClientDisconnected$(): Observable<boolean> {
    return this._galactusClientDisconnected$.asObservable();
  }

  disconnect(): void {
    this.client.disconnect();
  }

  private _sleep(time: number) {
    return new Promise<void>((resolve) => {
      setTimeout(resolve, time);
    });
  }

  private async _subscribeToChannel(
    channel: string,
    token: string
  ): Promise<void> {
    for (let attempt = 0; attempt < this.MAX_RETRY_ATTEMPTS; attempt++) {
      let result;
      try {
        result = await this.client.emitWithAck('subscribe', {
          channel: channel,
          token: token
        });
        break;
      } catch (err) {
        console.log(`Error subscribing to channel: ${channel}`, err);
        const backOffTime =
          Math.pow(2, attempt) * this.EXP_BACKOFF_SCALING_FACTOR;
        await this._sleep(backOffTime);
      }
      if (result?.status === 'failed') {
        throw new Error(result.error);
      }
    }
  }
}
