import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import negate from 'lodash-es/negate';
import { takeRightWhile } from 'lodash/fp';
import { ToastrService } from 'ngx-toastr';
import { combineLatest, merge, Observable, of, Subject } from 'rxjs';
import {
  catchError,
  concatMap,
  distinctUntilChanged,
  filter,
  map,
  pluck,
  scan,
  shareReplay,
  startWith,
  switchMap,
  tap,
} from 'rxjs/operators';
import {
  buildConversationMessageUpdate,
  encodeContextParam,
  filterNonNullable,
  filterOutCurrentUserAddedMessage,
  idEquals,
  updateConversationContent,
} from '../common/utils/messaging';
import { Context } from '../models/context';
import {
  ContentMessage,
  ConversationContent,
  ConversationContentResponse,
  ConversationHistoryUpdate,
  ConversationUpdateType,
  MessageAction,
  MessageUpdate,
  TypedMessage,
} from '../models/messaging';
import { User, UserInfo } from '../models/user';
import { URLService } from './dynamic-url.service';
import { MessengerService } from './messenger.service';
import { UserService } from './user.service';
import { RealtimeService } from './realtime.service';
@Injectable({
  providedIn: 'root',
})
export class ConversationService {
  currentUser$: Observable<User>;
  otherUserInfo$: Observable<UserInfo[]>;
  public totalMessagesCount$ = new Subject<number>();

  private fetchMoreMessagesRequests$ = new Subject<TypedMessage>();

  public currentUserSentPublicMessage$ = new Subject<MessageUpdate>();
  private _pendingGroupChatDeleteEvent$ = new Subject<TypedMessage[]>();
  public pendingGroupChatDeleteEvent$ = this._pendingGroupChatDeleteEvent$.asObservable();

  constructor(
    private urlService: URLService,
    private httpClient: HttpClient,
    private realtimeService: RealtimeService,
    private userService: UserService,
    private toastr: ToastrService,
    private messengerService: MessengerService,
  ) {
    this.currentUser$ = this.userService.user.pipe(
      filterNonNullable(),
      pluck('user'),
      distinctUntilChanged((previous, current) => idEquals(previous)(current)),
      shareReplay({ bufferSize: 1, refCount: false /* false for root service */ }),
    );

    // The current logged-in user is excluded from the list of other users.
    this.otherUserInfo$ = combineLatest([
      this.currentUser$,
      this.userService.allUsers.pipe(filterNonNullable(), pluck('profiles'), filterNonNullable()),
    ]).pipe(
      map(([currentUser, allUserInfo]) => allUserInfo.filter(negate(idEquals(currentUser)))),
      shareReplay({ bufferSize: 1, refCount: false /* false for root service */ }),
    );
  }

  getInitialConversationContentWithFayeReconnect$(
    context: Context,
  ): Observable<ConversationContentResponse> {
    return this.realtimeService.realtimeInitOrReconnect$.pipe(
      switchMap(() => this.getInitialConversationContent(context)),
    );
  }

  getConversationContent(context: Context): Observable<ConversationContent> {
    return combineLatest([
      this.currentUser$,
      this.realtimeService.realtimeInitOrReconnect$.pipe(
        switchMap(() =>
          this.getInitialConversationContent(context).pipe(
            catchError(() =>
              of({
                user: <User>{},
                messages: [],
                total_messages: 0,
                realtime: {
                  realtime_user_message: { faye: '', galactus: '' },
                },
              }),
            ),
            tap((chat) => this._pendingGroupChatDeleteEvent$.next(chat.messages)),
          ),
        ),
      ),
    ]).pipe(
      map(([currentUser, initialContentResponse]) => ({
        currentUser,
        context,
        messages: initialContentResponse.messages,
        totalMessages: initialContentResponse.total_messages,
        realtimeToken: initialContentResponse.realtime.realtime_user_message,
      })),
      tap((initialContent) => {
        this.totalMessagesCount$.next(initialContent.totalMessages);
        if (!this.realtimeService.channelSubscrptionIsActive(context)) {
          this.realtimeService.subscribeMessages(
            initialContent.context,
            initialContent.realtimeToken,
          );
        }
      }),
      switchMap((initialContent) =>
        merge(
          this.realtimeService.getCurrentConversationMessageUpdates().pipe(
            filter((update) =>
              filterOutCurrentUserAddedMessage(update, this.userService.user.value?.user._id || ''),
            ),
            map(buildConversationMessageUpdate),
          ),
          this.fetchMoreMessagesRequests$.pipe(
            distinctUntilChanged((previous, current) => idEquals(previous)(current)),
            concatMap((updateRequest) => this.getConversationHistoryUpdate(updateRequest)),
          ),
          this.currentUserSentPublicMessage$.pipe(map(buildConversationMessageUpdate)),
        ).pipe(scan(updateConversationContent, initialContent), startWith(initialContent)),
      ),
    );
  }

  unsubscribeFromConversation(context: Context): void {
    this.realtimeService.unsubscribeFromChannelUsingContext(context);
  }

  /* Should be private to components, and shared only with MessagingService. Not
   * sure if there's a way to express this.
   */
  getInitialConversationContent(
    context: Context,
    limit = 25,
  ): Observable<ConversationContentResponse> {
    return this.httpClient.get<ConversationContentResponse>(
      `${this.urlService.getDynamicUrl()}/tutor/messages/search?filters=${encodeContextParam(
        context,
      )}&num=${limit}`,
    );
  }

  /* Should be private to components, and shared only with MessagingService. Not
   * sure if there's a way to express this.
   */
  getConversationHistoryUpdate({
    _id,
    context,
    createdAt,
  }: TypedMessage): Observable<ConversationHistoryUpdate> {
    return this.httpClient
      .get<ConversationContentResponse>(
        `${this.urlService.getDynamicUrl()}/tutor/messages/search?filters=${encodeContextParam(
          context,
        )}&startDate=${createdAt}&num=${25}`,
      )
      .pipe(
        pluck('messages'),
        /* Returned messages will have slight overlap with current messages,
         * since their creation dates are not guaranteed to be unique.
         */
        map(takeRightWhile(negate(idEquals({ _id })))),
        map((messages) => ({ type: ConversationUpdateType.HistoryUpdateType, messages })),
      );
  }

  updateMessage(message: ContentMessage): Observable<void> {
    return this.userService.updateComment(message);
  }

  deleteMessage(message: ContentMessage): Observable<void> {
    return this.userService.deleteComment(message);
  }

  fetchMoreMessages(oldestMessage: TypedMessage): void {
    this.fetchMoreMessagesRequests$.next(oldestMessage);
  }

  deleteAllMessages(spaceId: string): void {
    this.userService.deleteAllComments(spaceId, MessageAction.DeleteAll).subscribe({
      complete: () => {
        this.messengerService.unreadSpaceLevelMessageCount$.next(0);
      },
      error: (error) => {
        this.toastr.error(error.err);
      },
    });
  }
}
