import { Subject, Observable } from 'rxjs';
import * as faye from 'faye';
import { IRealtimeClient } from './realtime-client.interface';
import { Mutex } from 'async-mutex';

type FayeExtensionFn = (tokens?: {
  [key: string]: string;
}) => (message: any, next: (message: any) => void) => void;

export class FayeClient implements IRealtimeClient {
  private client: any;
  private _fayeDisconnected$ = new Subject<boolean>();
  private tokens: { [key: string]: string } = {};
  private mutex = new Mutex();

  constructor(
    url: string,
    extensions?: {
      ingoing?: FayeExtensionFn;
      outgoing?: FayeExtensionFn;
    }
  ) {
    this.client = new faye.Client(url, {});
    this.client.disable('autodisconnect');

    this.client.on('transport:down', () => {
      this._fayeDisconnected$.next(true);
    });

    this.client.on('transport:up', async () => {
      this._fayeDisconnected$.next(false);
    });

    const fayeExtensions: any = {};
    if (extensions?.ingoing) {
      fayeExtensions.ingoing = extensions.ingoing(this.tokens);
    }
    if (extensions?.outgoing) {
      fayeExtensions.outgoing = extensions.outgoing(this.tokens);
    }
    this.client.addExtension(fayeExtensions);
  }

  unsubscribeFromChannel(channel: string): void {
    this.client.unsubscribe(channel);
    console.log(`Faye unsubscribed from ${channel}`);
  }

  async subscribeToChannel(
    channel: string,
    token: string,
    callback: (message: any) => void
  ): Promise<void> {
    const release = await this.mutex.acquire();

    try {
      // storing token
      this.tokens[channel] = token;
      await this.client.subscribe(channel, callback);
      console.log(`Faye subscribed to ${channel}`);
    } catch (e) {
      console.error(`fayeError: ${e}`);
    } finally {
      release();
    }
  }

  async publish(channel: string, _token: string, message: any): Promise<void> {
    await this.client.publish(channel, message);
  }

  realtimeClientDisconnected$(): Observable<boolean> {
    return this._fayeDisconnected$.asObservable();
  }

  disconnect(): void {
    this.client.disconnect();
  }
}
