import { Injectable } from '@angular/core';
import {
  BehaviorSubject,
  filter,
  fromEventPattern,
  last,
  Observable,
  pairwise,
  startWith,
  Subject,
  Subscription,
  take,
} from 'rxjs';
import { Collaboration } from '@pncl/common-models';
import { Mutex } from 'async-mutex';
import { environment } from '../../environments/environment';
import { Session } from '../models/session';
import { MessageUpdate, RealtimeToken } from '../models/messaging';
import { Context, encodeContext } from '../models/context';
import { NotificationsService } from './notifications.service';
import { MessageSignal } from './messenger.service';
import { FLAGS, FlagsService } from './flags.service';

import FayeClient = Collaboration.Realtime.FayeClient;
import GalactusClient = Collaboration.Realtime.GalactusClient;
import IRealtimeClient = Collaboration.Realtime.IRealtimeClient;
@Injectable({
  providedIn: 'root',
})
export class RealtimeService {
  subscriptionSet = new Set<string>();
  commentSubjectsMap = new Map<string, BehaviorSubject<any>>();
  spaceCommentSubjectsMap = new Map<string, BehaviorSubject<any>>();
  private sessions: { [key: string]: Subject<Session | null> } = {};
  private sessionsListenerCount: { [key: string]: number } = {};
  realtimeSignalingRTM = new BehaviorSubject(null);
  userSignal = new BehaviorSubject<MessageSignal | null>(null);

  private currentConversationMessageUpdate = new Subject<MessageUpdate>();
  private client: IRealtimeClient | null = null;
  private clientMutex = new Mutex();
  private useFaye = false;

  private _realtimeDisconnected$ = new Subject<boolean>();

  private bufferedSubscriptions: {
    channel: string;
    token: RealtimeToken;
    callback: (msg: any) => void;
  }[] = [];

  private realtimeClientDisconnectedSubscription?: Subscription;

  private realtimeClientFactory(useFaye: boolean) {
    return useFaye
      ? new FayeClient(environment.fayeServer, {
          outgoing: (tokens) => (message: any, next: (message: any) => void) => {
            if (tokens) {
              if (!message.ext) {
                message.ext = {};
              }
              message.ext.authToken = tokens[message.subscription];
              next(message);
            }
          },
        })
      : new GalactusClient(environment.galactusServer);
  }

  public realtimeInitOrReconnect$ = this._realtimeDisconnected$.pipe(
    startWith(true, false),
    pairwise(),
    filter(
      ([prev, cur]) =>
        // Emit when disconnection state changes from true to false which will happen on first connection or any reconnection event
        prev === true && cur === false,
    ),
  );

  constructor(
    private notificationsService: NotificationsService,
    private flagsService: FlagsService,
  ) {
    // TODO: remove after deprecating Faye
    this.flagsService
      .featureFlagChanged(FLAGS.USE_FAYE, true)
      .pipe(take(2), last())
      .subscribe(async (curUseFaye) => {
        await this.clientMutex.runExclusive(async () => {
          this.useFaye = curUseFaye;
          this.client = this.realtimeClientFactory(curUseFaye);
          this.realtimeClientDisconnectedSubscription?.unsubscribe();
          this.realtimeClientDisconnectedSubscription = this.client
            .realtimeClientDisconnected$()
            .subscribe(this._realtimeDisconnected$);

          const subscriptions: Promise<void>[] = [];
          for (const { channel, token, callback } of this.bufferedSubscriptions) {
            subscriptions.push(this.subscribeToChannel(channel, token, callback));
          }
          Promise.allSettled(subscriptions);
        });
      });
  }

  get realtimeDisconnected$(): Observable<boolean> {
    return this._realtimeDisconnected$.asObservable();
  }

  subscribeNotification(userId: string, token: RealtimeToken): void {
    this.subscribeToChannel(`/notification/notification_${userId}`, token, () => {
      this.notificationsService.notificationCount.next(
        this.notificationsService.notificationCount.getValue() + 1,
      );
    });
  }

  subscribeUserNotification(userId: string, token: RealtimeToken): void {
    this.subscribeToChannel(`/${userId}`, token, (value) => {
      this.userSignal.next(value);
    });
  }

  subscribeSignalingRTM(userId: string, token: RealtimeToken): void {
    this.subscribeToChannel(`/rtm/${userId}`, token, (value) => {
      this.realtimeSignalingRTM.next(value);
    });
  }

  subscribeSession(sessionId: string, token: RealtimeToken): Observable<any> {
    if (this.sessions[sessionId]) {
      this.sessionsListenerCount[sessionId]++;
    } else {
      this.sessions[sessionId] = new Subject<Session | null>();
      this.sessionsListenerCount[sessionId] = 1;
      this.subscribeToChannel(`/sessions/${sessionId.split('=')[0]}`, token, (value: any) => {
        this.sessions[sessionId].next(value);
      });
    }

    return this.sessions[sessionId].asObservable();
  }

  /**
   * make sure that this function is only called when you already subscribed to the session
   * and will only be called exactly once , other wise it can reduce the counter to 0
   * which will result in unsubscribing completely from this space channel completely
   * even if there are other listeners to this space channel. we no longer get the updates.
   */
  unsubscribeSession(sessionId: string): void {
    if (this.sessionsListenerCount[sessionId]) {
      if (this.sessionsListenerCount[sessionId] === 1) {
        this.unsubscribeFromChannel(`/sessions/${sessionId.split('=')[0]}`);
        this.sessions[sessionId].next(null);
        delete this.sessions[sessionId];
      }
      this.sessionsListenerCount[sessionId]--;
    }
  }

  isListeningToSession(sessionId: string): boolean {
    return this.sessions[sessionId] ? true : false;
  }

  subscribeComments(channel: string, token: RealtimeToken): BehaviorSubject<any> | undefined {
    if (!this.commentSubjectsMap.has(channel)) {
      this.commentSubjectsMap.set(channel, new BehaviorSubject(null));
    }
    this.subscribeToChannel(`/${channel.split('=')[0]}`, token, (value: any) => {
      this.commentSubjectsMap.get(channel)?.next(value);
    });
    return this.commentSubjectsMap.get(channel);
  }

  unsubscribeComments(channel: string): void {
    this.unsubscribeFromChannel(`/${channel.split('=')[0]}`);
    this.commentSubjectsMap.get(channel)?.unsubscribe();
    this.commentSubjectsMap.delete(channel);
  }
  subscribeSpaceComments(channel: string, token: RealtimeToken): BehaviorSubject<any> | undefined {
    if (!this.spaceCommentSubjectsMap.has(channel)) {
      this.spaceCommentSubjectsMap.set(channel, new BehaviorSubject(null));
    }
    this.subscribeToChannel(`/${channel}`, token, (value: any) => {
      this.spaceCommentSubjectsMap.get(channel)?.next(value);
    });
    return this.spaceCommentSubjectsMap.get(channel);
  }

  unsubscribeSpaceComments(channel: string): void {
    this.unsubscribeFromChannel(`/${channel}`);
    this.spaceCommentSubjectsMap.get(channel)?.unsubscribe();
    this.spaceCommentSubjectsMap.delete(channel);
  }

  async subscribeToChannel(
    channel: string,
    token: RealtimeToken,
    callback: (arg: any) => void,
  ): Promise<void> {
    await this.clientMutex.runExclusive(async () => {
      if (!this.client) {
        this.bufferedSubscriptions.push({ channel, token, callback });
      } else {
        try {
          await this.client.subscribeToChannel(
            channel,
            this.useFaye ? token.faye : token.galactus,
            callback,
          );
          console.log(`realtime subscribed to ${channel}`);
        } catch (e) {
          console.error(`realtimeError: ${e}`);
        }
      }
    });
  }

  subscribeMessages(context: Context, channelToken: RealtimeToken): void {
    const channelName = this.getChannelNameFromContext(context);
    this.subscriptionSet.add(channelName);
    this.subscribeToChannel(channelName, channelToken, (message) => {
      this.currentConversationMessageUpdate.next(message);
    });
  }

  subscribeMessages$(context: Context, channelToken: RealtimeToken): Observable<any> {
    const channelName = this.getChannelNameFromContext(context);
    return new Observable((observer) => {
      this.subscribeToChannel(channelName, channelToken, (value) => {
        observer.next(value);
      });
    });
  }

  private getMessageUpdates(
    channelName: string,
    channelToken: RealtimeToken,
  ): Observable<MessageUpdate> {
    return fromEventPattern(
      (handler) => this.subscribeToChannel(channelName, channelToken, handler),
      () => this.unsubscribeFromChannel(channelName),
      (messageUpdate: MessageUpdate) => messageUpdate,
    );
  }

  getUserMessageUpdates(userId: string, channelToken: RealtimeToken): Observable<MessageUpdate> {
    return this.getMessageUpdates(`/messaging/${userId}`, channelToken);
  }

  /* TODO: Verify if this is correct for questions/notes/sets. It's currently
   * only used for sessions.
   */
  private getChannelNameFromContext(context: Context): string {
    const encodedContext = encodeContext(context);
    if (context.note) {
      return `/note/${encodedContext}`;
    } else if (context.worksheet) {
      return `/worksheet/${encodedContext}`;
    } else if (context.session) {
      return `/sessions/${encodedContext.replace('=', '')}`;
    } else if (context.appId) {
      return `/app/${encodedContext}`;
    } else {
      return encodedContext;
    }
  }

  getConversationMessageUpdates(
    context: Context,
    channelToken: RealtimeToken,
  ): Observable<MessageUpdate> {
    const channelName = this.getChannelNameFromContext(context);
    return this.getMessageUpdates(channelName, channelToken);
  }

  getCurrentConversationMessageUpdates(): Observable<MessageUpdate> {
    return this.currentConversationMessageUpdate.asObservable();
  }

  async unsubscribeFromChannel(channel: string): Promise<void> {
    await this.clientMutex.runExclusive(() => {
      if (!this.client) {
        this.bufferedSubscriptions = this.bufferedSubscriptions.filter(
          ({ channel: bufferedChannel }) => bufferedChannel !== channel,
        );
      } else {
        this.client.unsubscribeFromChannel(channel);
        console.log(`realtime unsubscribed from ${channel}`);
      }
    });
  }

  channelSubscrptionIsActive(context: Context): boolean {
    const channel = this.getChannelNameFromContext(context);
    return this.subscriptionSet.has(channel);
  }

  unsubscribeFromChannelUsingContext(context: Context): void {
    const channel = this.getChannelNameFromContext(context);
    this.subscriptionSet.delete(channel);
    this.unsubscribeFromChannel(channel);
  }
}
