import { HttpClient, HttpEventType, HttpHeaders, HttpResponse } from '@angular/common/http';
import { filterNil } from '@ngneat/elf';
import { UntilDestroy, untilDestroyed } from '@ngneat/until-destroy';
import { TranslateService } from '@ngx-translate/core';
import * as Sentry from '@sentry/browser';
import { Mutex } from 'async-mutex';
import { fabric } from 'fabric';
import { IEvent } from 'fabric/fabric-impl';
import { cloneDeep } from 'lodash';
import {
  Observable,
  ReplaySubject,
  Subject,
  Subscription,
  combineLatest,
  filter,
  first,
  firstValueFrom,
  fromEvent,
  of,
  startWith,
  throwError,
} from 'rxjs';
import {
  auditTime,
  catchError,
  distinctUntilChanged,
  map,
  pairwise,
  retry,
  take,
  tap,
} from 'rxjs/operators';
import { ManagerOptions, SocketOptions } from 'socket.io-client';
import { TelemetryService } from 'src/app/services/telemetry.service';
import { v4 as uuidv4 } from 'uuid';
import { Injectable, NgZone, OnDestroy } from '@angular/core';
import {
  MovedBoardsPlacement,
  SyncService,
  WebSocketEventType,
  WhiteboardServerErrors,
  customFabricFields,
} from '../common/interfaces/sync-service-interface';
import { ERRORS, SUCCESSES } from '../common/utils/notification-constants';
import { GestureName } from '../hand-gestures/models/gesture';
import { AnalyticsEventType } from '../models/analytics';
import { ConnectionStatus } from '../models/network';
import {
  BoardFolder,
  Frame,
  FrameItem,
  FrameType,
  ISession,
  Room,
  Session,
  permissionsSchema,
} from '../models/session';
import {
  DocumentUpdate,
  FrameEvent,
  FrameEventType,
  FrameObservableData,
  GcsSpaceDownload,
  Mouse,
  Point,
  ServerStateUpdate,
  SessionAuth,
  SessionUpdate,
  SessionsUpdate,
  SpaceChunk,
  SpaceChunkInfo,
  SpaceDownloadEnd,
  SpaceDownloadStart,
  UserEvent,
} from '../models/session-sync';
import { User } from '../models/user';
import { Manager } from '../sessions/common/session-data-structures';
import {
  SessionMetadata,
  UndoEnum,
  YCanvasItemObject,
  YObject,
  YSession,
} from '../sessions/common/y-session';
import {
  EventCategory,
  SessionEvent,
  SessionEventController,
  SessionEventSubscriber,
} from '../sessions/session/SessionEvent';
import { BaseCustomFabric } from '../sessions/session/custom-fabric-objects/base-custom-fabric';
import {
  BrowserLifecycleEvent,
  PencilSocket,
  SocketConnectionEvent,
  SocketManagerEvent,
} from '../socket-utilities/pencil-socket';
import { SocketIOWorkerWrapper, SocketIOWrapper } from '../socket-utilities/socket-io-wrapper';
import { SpaceRepository, filterNotSynced } from '../state/space.repository';
import { ToasterPopupStyle } from '../ui/notification-toaster/custom-notification-toastr/custom-notification-toastr.component';
import { IconMessageToasterElement } from '../ui/notification-toaster/icon-message-toaster-element/icon-message-toaster-element.component';
import { ObservableUtils } from '../utilities/ObservableUtils';
import { enterZone, modifiedSetInterval, modifiedSetTimeout } from '../utilities/ZoneUtils';
import { SpaceUtils } from '../utilities/space.utils';
import { ExpiringSet } from '../utilities/expiring-set';
import { LeaderModeRepositoryService } from '../state/leader-mode-repository.service';
import {
  TemporaryUserMetadataEntryBase,
  TemporaryUserMetadataEntryRedis,
  TemporaryUserMetadataRepositoryService,
} from '../state/temporary-user-metadata-repository.service';
import { ErrorInterceptorSkipHeader } from '../error.interceptor';
import { notEmpty } from '../common/utils/common-util';
import { AnalyticsService } from './analytics.service';
import { URLService } from './dynamic-url.service';
import { FLAGS, FlagsService } from './flags.service';
import { ClientNetworkState, NetworkService } from './network.service';
import {
  NotificationDataBuilder,
  NotificationToasterService,
  NotificationType,
} from './notification-toaster.service';
import { PresenceData, PresenceRoom } from './presence_v2.service';
import { ScribbleType, SessionSharedDataService } from './session-shared-data.service';
import { SpacesService } from './spaces.service';
import { emojiLibraryIndex } from './ui.service';
import { UploadFileService } from './upload-file.service';
import { UserIdleService } from './user-idle.service';
import { UserService } from './user.service';
import { VideoAIService } from './video-ai.service';
import { RealtimeService } from './realtime.service';
import { CountedMetric, ExternalMetric } from './performance.logger.service';
import { LessonProcessor } from './lesson-generator.service';

export enum NetworkStatus {
  CONNECTED = 'CONNECTED',
  DISCONNECTED = 'DISCONNECTED',
}

export interface ObjectWithBoard {
  object: YObject;
  boardUid: string | undefined;
}

class ChunkedSpace {
  static readonly CHUNK_SIZE = 1024 * 1024;
  private _spaceBuffer!: ArrayBufferLike;
  private _spaceContext!: ISession;
  private totalSize = 0;
  private readonly spaceDownloadProgress$: Subject<number> = new Subject<number>();
  private spaceUtils = new SpaceUtils();

  constructor(
    private socket: PencilSocket<WebSocketEventType>,
    spaceId: string,
    spaceSize: number,
    spaceContext: ISession,
  ) {
    this.socket = socket;
    this._spaceContext = spaceContext;
    this.totalSize = spaceSize;
    this._spaceBuffer = new Uint8Array();
    const spaceChunkInfo: SpaceChunkInfo = {
      spaceId,
      receivedBytes: 0,
      chunkSize: ChunkedSpace.CHUNK_SIZE,
    };
    this.socket.emit(WebSocketEventType.GET_SPACE_CHUNK, spaceChunkInfo);
    this.spaceDownloadProgress$.next(0);
  }

  appendChunk(spaceId: string, chunk: ArrayBuffer) {
    const currentChunk = new Uint8Array(this.spaceBuffer);
    const newChunk = new Uint8Array(chunk);
    this._spaceBuffer = this.spaceUtils.concatChunks([currentChunk, newChunk]);
    this.spaceDownloadProgress$.next(
      Math.ceil((this.spaceBuffer.byteLength / this.totalSize) * 100),
    );
    const spaceChunkInfo: SpaceChunkInfo = {
      spaceId,
      receivedBytes: this.spaceBuffer.byteLength,
      chunkSize: ChunkedSpace.CHUNK_SIZE,
    };
    this.socket.emit(WebSocketEventType.GET_SPACE_CHUNK, spaceChunkInfo);
  }

  resetDownloadProgress() {
    this.spaceDownloadProgress$.next(0);
  }

  public get spaceBuffer(): ArrayBufferLike {
    return this._spaceBuffer;
  }

  public get spaceContext() {
    return this._spaceContext;
  }

  public getSpaceDownloadProgressObservable(): Observable<number> {
    return this.spaceDownloadProgress$.asObservable();
  }
}

const extractObjectFromEvent = (eventWithFrame: {
  event: fabric.IEvent<Event>;
  boardUid: string | undefined;
}): { fabricObject: fabric.Object; boardUid: string | undefined } => ({
  fabricObject:
    eventWithFrame.event.target && !eventWithFrame.event.target.excludeFromExport
      ? (eventWithFrame.event.target as fabric.Object)
      : ({} as fabric.Object),
  boardUid: eventWithFrame.boardUid,
});

/**
 * custom operator that excludes objects with excludeFromExport === true
 * and event.target in case it's a custom fabric object
 */
const transformObject = (object: fabric.Object, boardUid: string | undefined): ObjectWithBoard => {
  if (object && BaseCustomFabric.isCustomFabricObject(object)) {
    return { object: object.toObject() as YObject, boardUid };
  }
  return { object: object as YObject, boardUid };
};

// custom rxjs operator prepares objects before getting updated
const prepareObjectForUpdate = (eventWithFrame: {
  event: fabric.IEvent<Event>;
  boardUid: string | undefined;
}): { fabricObjects: fabric.Object[]; boardUid: string | undefined } => {
  const target: any = eventWithFrame.event.target;
  let objects: fabric.Object[] = [];

  if (!target) {
    return { fabricObjects: objects, boardUid: eventWithFrame.boardUid };
  }

  if (BaseCustomFabric.isCustomFabricObject(target)) {
    objects = [target];
  } else if (target.objects) {
    objects = target.objects;
  } else if (target._objects) {
    objects = target._objects;
  } else {
    objects = [target];
  }
  const filteredObjects = objects
    .filter((object) => Boolean(object.excludeFromExport) === false)
    .flat();
  return { fabricObjects: filteredObjects, boardUid: eventWithFrame.boardUid };
};

const transformObjectsForUpdate = (
  objects: fabric.Object[],
  boardUid: string | undefined,
): Map<string | undefined, YObject[]> => {
  const objectsToBoardMap: Map<string | undefined, YObject[]> = new Map();
  if (objects.length === 0) {
    return objectsToBoardMap;
  }

  const yObjects: YObject[] = [];
  for (const object of objects) {
    if (object && BaseCustomFabric.isCustomFabricObject(object)) {
      yObjects.push(object.toObject() as YObject);
    } else {
      yObjects.push(object as YObject);
    }
  }
  objectsToBoardMap.set(boardUid, yObjects);
  return objectsToBoardMap;
};

@UntilDestroy()
@Injectable({
  providedIn: 'root',
})
export class SessionsSyncService extends SyncService implements SessionEventController, OnDestroy {
  /** Public members */

  /**
   * @deprecated backward compatibility
   */
  public get userJoinedEvent$(): Observable<Array<PresenceData>> {
    return this.onUserJoinPresenceEvent.pipe(enterZone(this.zone));
  }

  /**
   * @deprecated backward compatibility
   */
  public get userLeftEvent$(): Observable<Array<PresenceData>> {
    return this.onUserLeavePresenceEvent.pipe(enterZone(this.zone));
  }

  /**
   * @deprecated backward compatibility
   */
  public get userDisconnectedEvent$(): Observable<Array<PresenceData>> {
    return this.onUserDisconnectedPresenceEvent.pipe(enterZone(this.zone));
  }

  public get presenceUpdate$(): Observable<Array<PresenceRoom>> {
    return this.onPresenceUpdate.pipe(enterZone(this.zone));
  }

  public get userTemporaryMetadataUpdate$(): Observable<Array<TemporaryUserMetadataEntryRedis>> {
    return this.onUserTemporaryMetadataUpdate.pipe(enterZone(this.zone));
  }

  public get reconnectAttempt(): number {
    return this._reconnectAttempt;
  }

  public get socketIdHistory(): Set<string> {
    return this._socketIdHistory;
  }

  public get isOffline(): boolean {
    return this._isOffline;
  }

  // Public observables

  public get realTimeEvent$(): Observable<{ mouse: Mouse; user: string }[]> {
    return this.realTimeEventSubject;
  }

  public get undoStack$(): Observable<number> {
    return this.undoStackSubject.pipe(enterZone(this.zone));
  }

  public get redoStack$(): Observable<number> {
    return this.redoStackSubject.pipe(enterZone(this.zone));
  }

  public get usersLeftFrame$(): Observable<string[]> {
    return this.usersLeftFrame.pipe(enterZone(this.zone));
  }

  public get usersJoinedFrame$(): Observable<string[]> {
    return this.usersJoinedFrame.pipe(enterZone(this.zone));
  }

  public get userEvents$(): Observable<UserEvent | undefined> {
    return this.userEvents.pipe(startWith(undefined));
  }

  public get updateUserPresence$(): Observable<'reconnected' | 'disconnected'> {
    return this.updateUserPresence.pipe(enterZone(this.zone));
  }

  public get networkStatusObservable$(): Observable<NetworkStatus> {
    return this._networkStatusObservable;
  }

  /** Private members */

  // Signals when a user performs mouse activity on the canvas, or changes frames
  private readonly realTimeEventSubject: Subject<{ mouse: Mouse; user: string }[]> = new Subject();

  // Signals the number of element in the undo/redo stack on change.
  private readonly undoStackSubject = new Subject<number>();
  private readonly redoStackSubject = new Subject<number>();

  // Signals when a user joins or leaves a frame
  private readonly usersLeftFrame: Subject<string[]> = new Subject<string[]>();
  private readonly usersJoinedFrame: Subject<string[]> = new Subject<string[]>();

  // private readonly inZoneScheduler = enterZoneScheduler(this.zone);

  // Subject that produces all userEvents coming from remote clients
  private readonly userEvents: Subject<UserEvent> = new Subject<UserEvent>();

  private readonly ySessions: Manager<YSession>;

  public socket?: PencilSocket<WebSocketEventType>;

  private user?: User;
  private disconnected = true;
  private _reconnectAttempt = 0; // Count reconnect attemps
  private _socketIdHistory: Set<string> = new Set(); // Store socketID for every succussful connection attemp
  private _isOffline = false;

  private heartbeatInterval: NodeJS.Timer;

  // backward compatibility
  private onUserJoinPresenceEvent = new ReplaySubject<Array<PresenceData>>();
  // backward compatibility
  private onUserLeavePresenceEvent = new ReplaySubject<Array<PresenceData>>();
  // backward compatibility
  private onUserDisconnectedPresenceEvent = new ReplaySubject<Array<PresenceData>>();

  private onPresenceUpdate = new ReplaySubject<Array<PresenceRoom>>();

  private onUserTemporaryMetadataUpdate = new Subject<Array<TemporaryUserMetadataEntryRedis>>();

  // The current active session
  private activeSessionId?: string;

  // The rooms that have been joined by the socket server
  private joinedRooms: string[] = [];

  // Holds the object relationships
  private relationships: { [key: string]: any[] } = {};
  private rtFrameSubscription?: Subscription;
  private roomsSubscription?: Subscription;

  private sessionSubscriptions?: Subscription;

  private eventRegistry = {};

  // Needed to reconnect when we disconnect from the socket
  private userRoom?: string;
  private changeOfflineStateMutex = new Mutex();

  private sendUserMetadataUpdateMutex = new Mutex();
  private numberOfUserMetadataUpdatesQueue = 0;

  private userFaceDetected = true;
  private userHandDetected = false;
  private timerArray: Array<any> = [];
  private prevEmojiId = -1;
  private animationStartTime = 0;

  private readonly YDOC_PENDING_CHANGES_DELAY = 3000;
  private readonly WB_SERVER_ACK_TIMEOUT = 5000;
  private readonly WB_SERVER_ACK_TIMEOUT_THROTTLE = 6000;
  private readonly CURRENT_SESSION_HEARTBEAT_THROTTLE = 60000; // 1 minute
  private readonly CONSISTENCY_THRESHOLD = 3;

  private readonly wbServerAckTimeoutSubject: Subject<boolean> = new Subject<boolean>();
  private chunkedSpace!: ChunkedSpace;
  private _spaceDownloadProgress$: Subject<number> = new Subject<number>();

  private webSocketWorkerEnabled = false;
  private socketId!: string;

  private updateUserPresence: ReplaySubject<'disconnected' | 'reconnected'> = new ReplaySubject();
  private _syncSessionDataSubject = new Subject<void>();
  private readonly SYNC_SESSION_DATA_THROTTLE = 2000;
  private _syncSessionDataMutex = new Mutex();

  private networkStatus = new Subject<NetworkStatus>();
  private _networkStatusObservable = this.networkStatus.asObservable();

  private syncsInProgress = new ExpiringSet<string>();
  private readonly SYNC_TIMEOUT = 2 * 60 * 1_000; // 2 minutes

  private pendingYDocScribbleUids: string[] = [];

  /** Public functions */

  constructor(
    private sharedDataService: SessionSharedDataService,
    private flagsService: FlagsService,
    private userService: UserService,
    private zone: NgZone,
    private urlService: URLService,
    private networkService: NetworkService,
    private uploadService: UploadFileService,
    private translateService: TranslateService,
    private notificationToasterService: NotificationToasterService,
    private videoAIService: VideoAIService,
    private observerUtils: ObservableUtils,
    private spaceRepo: SpaceRepository,
    private leaderModeRepositoryService: LeaderModeRepositoryService,
    private temporaryUserMetadataRepository: TemporaryUserMetadataRepositoryService,
    private analyticsService: AnalyticsService,
    private spacesService: SpacesService,
    private userIdleService: UserIdleService,
    private telemetry: TelemetryService,
    private realtimeService: RealtimeService,
    private http: HttpClient,
  ) {
    super();

    this.flagsService.featureFlagsChanged.pipe(first()).subscribe(() => {
      // Update the rate at which socket events are processed
      this.socketEventProcessRate =
        (this.flagsService.featureFlagsVariables.event_buffering
          .receive_data_buffer_ms as number) ?? 1000; // default to 1000ms
      this.realtimeEventProcessRate =
        (this.flagsService.featureFlagsVariables.event_buffering
          .receive_realtime_buffer_ms as number) ?? 30; // default to 30ms
    });

    this.ySessions = new Manager<YSession>((sessionId) => {
      const ySession = new YSession(
        this.flagsService.isFlagEnabled(FLAGS.ENABLE_YJS_FABRIC_JSON),
        this.flagsService.isFlagEnabled(FLAGS.BREAKOUT_ROOMS),
      );
      ySession.updateObservable$.subscribe((e) => {
        //  Because array duplication of frames in yjs is quite common,
        // every time there is an update we attempt to fix the duplicated frames
        const didFrameDuplicationOccur = ySession.deduplicateYFrames(e.frames);

        if (didFrameDuplicationOccur) {
          const update = ySession.getUpdate();
          const serverUpdate = this.createYUpdate(update);
          this.sendUpdateToServer('update', serverUpdate);
        }

        if (didFrameDuplicationOccur) {
          this.telemetry.event('Frame duplication occurred, its been fixed');
        }

        this.spaceRepo.updateSpaceYjs(sessionId, e);
      });
      return ySession;
    });
    this.sharedDataService.sessionEventController = this;

    this.userService.user.subscribe((data) => {
      if (data && data?.user) {
        this.user = data.user;
      }
    });

    this.heartbeatInterval = modifiedSetInterval(() => {
      if (this.activeSessionId && this.activeSessionId !== '') {
        this.sendHeartbeat();
      }
    }, 300000);
    this.handleConnectionStatus();

    this.videoAIService.faceObservable().subscribe((faceDetected) => {
      if (faceDetected !== this.userFaceDetected) {
        const faceEvent = new SessionEvent(
          EventCategory.Users,
          'users:face-detected',
          faceDetected,
        );
        this.sendEvent(this, faceEvent);
      }
      this.userFaceDetected = faceDetected;
    });

    this.videoAIService.handRaisedObservable().subscribe((raisedHand) => {
      if (raisedHand !== this.userHandDetected) {
        const handEvent = new SessionEvent(EventCategory.Users, 'users:raise-hand', raisedHand);
        this.sendEvent(this, handEvent);
      }
      this.userHandDetected = raisedHand;
    });

    this.videoAIService
      .emoticonObservable()
      .pipe(distinctUntilChanged(), auditTime(1000))
      .subscribe((gesture: GestureName) => {
        this.analyticsService.addToAnalyticsEventsBatch(AnalyticsEventType.REACTIONS);
        this.sendEmote(gesture);
      });

    this.analyticsService.getCurrentSessionId$.subscribe(() => this.getCurrentAnalyticsSession());

    this.realtimeService.realtimeDisconnected$
      .pipe(untilDestroyed(this), distinctUntilChanged(), pairwise())
      .subscribe(async ([previousrealtimeDisconnected, currentrealtimeDisconnected]) => {
        // Sync session data in case realtime update is missed
        if (
          this.spaceRepo.activeSpaceId &&
          previousrealtimeDisconnected === true &&
          currentrealtimeDisconnected === false
        ) {
          this._syncSessionDataSubject.next();
        }
      });

    this.observerUtils
      .throttledObservableOf(this._syncSessionDataSubject, this.SYNC_SESSION_DATA_THROTTLE)
      .pipe(untilDestroyed(this))
      .subscribe(async () => {
        await this._syncSessionDataMutex.runExclusive(async () => {
          if (this.spaceRepo.activeSpaceId) {
            await this.syncSessionData(this.spaceRepo.activeSpaceId);
          }
        });
      });
  }

  ngOnDestroy(): void {
    clearInterval(this.heartbeatInterval);
  }

  private async _setupSocket(
    url: string,
    opts: Partial<ManagerOptions & SocketOptions>,
  ): Promise<void> {
    const socketWrapper = new SocketIOWrapper(url, opts);
    this.socket = socketWrapper;
  }

  private async _setupWebWorker(
    url: string,
    opts: Partial<ManagerOptions & SocketOptions>,
  ): Promise<void> {
    const socketWrapper: SocketIOWorkerWrapper<WebSocketEventType> = new SocketIOWorkerWrapper(
      url,
      opts,
    );
    this.socket = socketWrapper;

    // setup socket wrapper
    await socketWrapper.setup();

    addEventListener(BrowserLifecycleEvent.ONLINE, async () => {
      const connected = await socketWrapper.connected();
      if (connected) {
        this.sendSyncWithServer();
      }
    });
  }

  public async setupSocket(): Promise<void> {
    const socketUrl = this.urlService.getDynamicWhiteboardUrl();
    const socketPath = this.urlService.getDynamicWhiteboardPath();
    const transports = this.flagsService.isFlagEnabled(FLAGS.WB_NO_LONGPOLLING)
      ? ['websocket']
      : ['websocket', 'polling'];
    const dataBufferDuration: number =
      (this.flagsService.featureFlagsVariables.event_buffering.receive_data_buffer_ms as number) ??
      1000; // default to 1000ms
    const realtimeBufferDuration: number =
      (this.flagsService.featureFlagsVariables.event_buffering
        .receive_realtime_buffer_ms as number) ?? 30; // default to 30ms
    this.webSocketWorkerEnabled = this.isWsWorkerEnabled();

    // Run socket subscription outside angular to prevent buffering/signaling from triggering unncessary
    // CD cycles. This make it the responsibility of code in the socket callbacks to notify any public
    // observables "inside zone" so downstream observers can still update their state as part of a CD cycle.
    this.zone.runOutsideAngular(async () => {
      if (this.webSocketWorkerEnabled) {
        await this._setupWebWorker(socketUrl, {
          transports,
          forceNew: true,
          path: socketPath,
        });
      } else {
        await this._setupSocket(socketUrl, {
          transports,
          closeOnBeforeunload: false,
          forceNew: true,
          path: socketPath,
        });
      }

      if (!this.socket) {
        return;
      }

      // Send session heartbeat every 5 minutes to keep current session active
      this.observerUtils
        .throttledObservableOf(
          this.userIdleService.onHeartbeat$,
          this.CURRENT_SESSION_HEARTBEAT_THROTTLE,
        )
        .pipe(untilDestroyed(this))
        .subscribe(() => {
          this.socket?.emit(WebSocketEventType.CURRENT_SESSION_HEARTBEAT, this.activeSessionId);
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.USER_JOINED),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((data: any) => {
          if (data) {
            this.onUserJoinPresenceEvent.next(data);
          }
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.USER_LEFT),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((data: any) => {
          if (data) {
            this.onUserLeavePresenceEvent.next(data);
          }
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.USER_DISCONNECTED),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((data: any) => {
          if (data) {
            this.onUserDisconnectedPresenceEvent.next(data);
          }
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.PRESENCE_UPDATE),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((data: any) => {
          if (data) {
            this.onPresenceUpdate.next(data);
          }
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.USER_TEMPORARY_METADATA_UPDATE),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((data: any) => {
          if (data) {
            this.onUserTemporaryMetadataUpdate.next(data);
          }
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.BROADCAST),
          dataBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((updates) => {
          this.applyBulkUpdates(updates as SessionsUpdate[]);
        });

      this.observerUtils
        .bufferedObservableOf(fromEvent(this.socket, WebSocketEventType.UPDATE), dataBufferDuration)
        .pipe(untilDestroyed(this))
        .subscribe((updates) => {
          this.applyBulkUpdates(updates as SessionsUpdate[]);
        });

      // serverStateObservable should be deprecated after using update ack to send serverStateVector
      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.SERVER_STATE_VECTOR),
          dataBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((updates) => {
          this.updateBulkServerStates(updates as ServerStateUpdate[]);
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.FRAME_EVENT),
          realtimeBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((events) => {
          this.handleBulkFrameEvents(events as FrameEvent[]);
        });

      this.observerUtils
        .bufferedObservableOf(
          fromEvent(this.socket, WebSocketEventType.USER_EVENT),
          dataBufferDuration,
        )
        .pipe(untilDestroyed(this))
        .subscribe((events) => {
          for (const event of events) {
            this.userEvents.next(event as UserEvent);
          }
        });

      // throttling to not send many sync requests within short period
      this.observerUtils
        .throttledObservableOf<boolean>(
          // interested only in timeout events
          this.wbServerAckTimeoutSubject.pipe(filter((value) => !!value)),
          this.WB_SERVER_ACK_TIMEOUT_THROTTLE,
        )
        .pipe(untilDestroyed(this))
        .subscribe({
          next: async (next) => {
            if (next && (await this.socket?.connected())) {
              this.sendSyncWithServer();
            }
          },
          error: (error) => {
            Sentry.captureException(new Error('wbServerAckTimeoutSubject error'), {
              extra: {
                error: error,
              },
            });
          },
        });

      fromEvent(this.socket, SocketManagerEvent.RECONNECT)
        .pipe(untilDestroyed(this))
        .subscribe(this.onReconnect);

      fromEvent(this.socket, SocketManagerEvent.RECONNECT_ATTEMPT)
        .pipe(untilDestroyed(this))
        .subscribe(this.onReconnectAttempt);

      fromEvent(this.socket, SocketManagerEvent.RECONNECT_ERROR)
        .pipe(untilDestroyed(this))
        .subscribe(this.onReconnectError);

      fromEvent(this.socket, SocketManagerEvent.RECONNECT_FAILED)
        .pipe(untilDestroyed(this))
        .subscribe(this.onReconnectFailed);

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SPACE_DOWNLOAD_START)
        .pipe(untilDestroyed(this))
        .subscribe(async (spaceDownloadStart) => {
          const _spaceDownloadStart = spaceDownloadStart as SpaceDownloadStart;
          const spaceSize = _spaceDownloadStart.spaceSize;
          const spaceId = _spaceDownloadStart.spaceId;
          this.telemetry.setSessionVars({ activeSessionSize: spaceSize });

          if (!this.socket) {
            return;
          }

          this.joinedRooms.push(spaceId);
          this.chunkedSpace = new ChunkedSpace(
            this.socket,
            spaceId,
            spaceSize,
            _spaceDownloadStart.session,
          );
          this.chunkedSpace
            .getSpaceDownloadProgressObservable()
            .pipe(untilDestroyed(this))
            .pipe(enterZone(this.zone))
            .subscribe((progress) => {
              this._spaceDownloadProgress$.next(progress);
            });
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SPACE_DOWNLOAD_URL)
        .pipe(untilDestroyed(this))
        .subscribe(async (data: any) => {
          const gcsSpaceDownload = data as GcsSpaceDownload;
          this.getYjsUpdateFromGCS(gcsSpaceDownload)
            .pipe(retry({ count: 10, delay: 300 }))
            .pipe(take(1))
            .subscribe((update) => {
              this.spaceRepo.updateSpace(gcsSpaceDownload.sessionId, {
                initialGCSSpaceSize: update?.byteLength ?? 0,
              });
              if (update) {
                this._applyUpdateAndRemovePendingUpdates({
                  sessionId: gcsSpaceDownload.sessionId,
                  documentUpdate: {
                    yjs_update: update,
                  },
                });
              }

              // increment yDoc receive
              this.telemetry.incrementMetric(CountedMetric.yDocRx);
              this.spaceRepo.updateSpaceUI(gcsSpaceDownload.sessionId, {
                isInitialSpaceStateLoaded: true,
              });
              this.joinedRooms.push(gcsSpaceDownload.sessionId);
              this.sendSyncWithServer(gcsSpaceDownload.sessionId, false, true);
            });
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SPACE_CHUNK)
        .pipe(untilDestroyed(this))
        .subscribe((spaceChunk) => {
          this.zone.runOutsideAngular(() => {
            const _spaceChunk = spaceChunk as SpaceChunk;
            const chunk = _spaceChunk.chunk;
            const spaceId = _spaceChunk.spaceId;
            this.chunkedSpace.appendChunk(spaceId, chunk);
          });
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SPACE_DOWNLOAD_END)
        .pipe(untilDestroyed(this))
        .subscribe(async (spaceDownloadEnd) => {
          const _spaceDownloadEnd = spaceDownloadEnd as SpaceDownloadEnd;
          const sessionUpdate: SessionUpdate = _spaceDownloadEnd.spaceUpdate.sessionsUpdates[0];

          // Ensure that space is in the repo before setting synced
          await firstValueFrom(this.spaceRepo.getSpace$(sessionUpdate.sessionId).pipe(filterNil()));

          sessionUpdate.documentUpdate = {
            yjs_update: this.chunkedSpace.spaceBuffer,
          };
          this.applyYUpdates(_spaceDownloadEnd.spaceUpdate);
          this.spaceRepo.updateSpaceUI(sessionUpdate.sessionId, {
            hasSpaceSynced: true,
            isInitialSpaceStateLoaded: true,
          });

          this.sendSyncWithServer();
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SPACE_DOWNLOAD_ERROR)
        .pipe(untilDestroyed(this))
        .subscribe((spaceId) => {
          // rejoin in case of error
          this.joinSpaceRooms(this.populateSessionAuth([spaceId as string]) as SessionAuth[]);
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.SYNC)
        .pipe(untilDestroyed(this))
        .subscribe((update) => {
          this.syncYUpdates(update as SessionsUpdate);
        });

      // Deliver immediately (do not buffer/throttle)
      fromEvent(this.socket, WebSocketEventType.JOIN_ROOMS_RES)
        .pipe(untilDestroyed(this))
        .subscribe((res) => {
          const _res = res as { update: SessionsUpdate };
          const { update } = _res;

          const sessionId = update.sessionsUpdates?.[0].sessionId;
          if (!sessionId) {
            throw new Error(`Expected sessionId to be in update ${update}`);
          }

          this.joinedRooms.push(sessionId);
          this.applyYUpdates(update);
        });

      fromEvent(this.socket, SocketConnectionEvent.CONNECT)!
        .pipe(untilDestroyed(this))
        .subscribe(async () => {
          this.handleDisconnect();
          if (!this.socket) {
            return;
          }

          this.socketId = await this.socket.getSocketId();
          this._socketIdHistory.add(this.socketId);
          this.sharedDataService.socketReconnected.next();
          this.updateWebSocketConnectionStatus(ConnectionStatus.CONNECTED);
        });

      fromEvent(this.socket, SocketConnectionEvent.DISCONNECT)
        .pipe(untilDestroyed(this))
        .subscribe((reason) => {
          this.updateWebSocketConnectionStatus(ConnectionStatus.DISCONNECTED, false);
          this.disconnected = true;
          this._reconnectAttempt = 0;
          this.syncUserPresence('disconnected');
          // if the disconnection was initiated by the server or client, then reconnect manually
          if (reason === 'io server disconnect' || reason === 'io client disconnect') {
            this.connectWebSocket();
            // this.socket.connect();
          }
          this.telemetry.event('Socket disconnected', { reason });
        });

      fromEvent(this.socket, SocketConnectionEvent.CONNECT_ERROR)
        .pipe(untilDestroyed(this))
        .subscribe((error) => {
          modifiedSetTimeout(() => {
            this.connectWebSocket();
          }, 1000);
          this.telemetry.event('Socket connection error', { error });
        });

      // fromEvent(this.socket, 'currentSessionId')
      fromEvent(this.socket, WebSocketEventType.CURRENT_SESSION_ID)
        .pipe(untilDestroyed(this))
        .subscribe((id) => {
          this.analyticsService.setCurrentSessionId(id as string);
        });

      fromEvent(this.socket, WebSocketEventType.ANALYTICS_SESSION_ENDED_MANUALLY)
        .pipe(untilDestroyed(this))
        .subscribe((res) => {
          this.analyticsService.handleManualEndSession(
            res as { newSessionId: string; oldSessionId: string },
          );
        });
    });
  }

  public subscribeToEvent(subscriber: SessionEventSubscriber, category: EventCategory): void {
    if (!this.eventRegistry[category]) {
      this.eventRegistry[category] = [subscriber];
    } else {
      this.eventRegistry[category].push(subscriber);
    }
  }

  public unSubscribeFromEvent(subscriber: SessionEventSubscriber, category: EventCategory): void {
    if (this.eventRegistry[category]) {
      this.eventRegistry[category] = this.eventRegistry[category].filter((s) => s !== subscriber);
    }
  }

  // Used for sendToBack and bringToFront fabric items
  public updateObjectIndex(uid: string, index: number): void {
    const [sessionId, frameUID] = this.getSessionData();

    if (!sessionId || !frameUID) {
      return;
    }

    const ySession = this.ySessions.get(sessionId);
    const documentUpdate = ySession.updateObjectsOrder(frameUID, uid, index);

    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
    return;
  }

  public sendEvent(sender: any, event: SessionEvent): void {
    if (sender === 'remote' || !this.user) {
      return;
    }

    this.publishEventToSubscribers(event, this.user._id);

    if (!this.getSessionId()) {
      return;
    }

    const sessionId = this.getSessionId();
    if (!sessionId) {
      return;
    }

    const update: SessionsUpdate = {
      userId: this.user._id,
      timestamp: new Date(),
      sessionsUpdates: [
        {
          sessionId: sessionId,
          events: [event],
        },
      ],
    };

    this.sendUpdateToServer('update', update);
  }

  private joinFrameRoom(spaceFrameID: string, breakOutRoomUid?: string) {
    if (
      !this.user ||
      !this.activeSessionId ||
      !(this.activeSessionId in this.sharedDataService.sessionAuthData)
    ) {
      return;
    }

    const authData = this.sharedDataService.sessionAuthData[this.activeSessionId].sessionAuth;
    const sessionAuth = { userId: this.user._id, ...authData };

    this.socket?.emit(
      WebSocketEventType.JOIN_FRAME_ROOM,
      sessionAuth,
      spaceFrameID,
      breakOutRoomUid,
    );
  }

  public leaveFrameRoom(): void {
    this.socket?.emit(WebSocketEventType.LEAVE_FRAME_ROOM);
  }

  /*
   * Joins a userRoom
   * A user can only subscribe to one user room at a time
   * This is an artificial restriction that can be extended in the future
   */
  public async joinUserRoom(userId: string): Promise<void> {
    if (
      !this.user ||
      !this.activeSessionId ||
      !(this.activeSessionId in this.sharedDataService.sessionAuthData)
    ) {
      return;
    }

    const authData = this.sharedDataService.sessionAuthData[this.activeSessionId].sessionAuth;
    const sessionAuth = { userId: this.user._id, ...authData };

    return new Promise((resolve, reject) => {
      this.socket?.emit(WebSocketEventType.JOIN_USER_ROOM, sessionAuth, userId, (res: boolean) => {
        if (res) {
          resolve();
          this.userRoom = userId;
        } else {
          reject();
        }
      });
    });
  }

  /*
   * Leaves a user room
   * We will no longer receive any user events for that user
   */
  public async leaveUserRoom(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.socket?.emit(WebSocketEventType.LEAVE_USER_ROOM, (res: boolean) => {
        if (res) {
          resolve();
          this.userRoom = undefined;
        } else {
          reject();
        }
      });
    });
  }

  /*
   * Sends a userEvent to this users UserRoom
   * The backend enforces that you cannot send userEvents as different users
   */
  public sendUserEvent(event: UserEvent) {
    const userId = this.user?._id;
    if (!userId) {
      return;
    }

    event.userId = userId;
    this.socket?.emit(WebSocketEventType.USER_EVENT, event);
  }

  public setActiveSession(sessionId: string) {
    this.telemetry.startPerfScenario('Spaces load animation');
    this.spaceRepo.updateSpaceUI(sessionId, {
      isInitialSpaceStateLoaded: false,
      hasSpaceSynced: false,
    });
    this.animationStartTime = new Date().getTime();
    this.sharedDataService.dataLoading.next(true);

    this.activeSessionId = sessionId;
    this.telemetry.setSessionVars({ activeSessionId: this.activeSessionId });
    this.spaceRepo.setActiveSpace(sessionId); // set active session in store
    this.joinRooms([this.activeSessionId]);
    this.createRTFrameSubscription();

    // Subscribe to get the first (non-null) value when it is available
    this.spaceRepo.activeSpace$.pipe(filterNotSynced(), first()).subscribe(() => {
      this.sharedDataService.dataLoading.next(false);
      this.logSpaceLoadingTimeToFS();
    });
  }

  /*
   * Tears down the connections and resets the data for the service
   */
  public stopService() {
    this.leaveRooms(this.joinedRooms);
    this.activeSessionId = undefined;

    this.rtFrameSubscription?.unsubscribe();
    this.sessionSubscriptions?.unsubscribe();

    this.cleanupSocket();
  }

  /*
   * Joins the rooms that are not already joined/cached and that have the authentication data
   *
   * If the session that is fetched from the socket server is the current session, that will be
   * emitted to the sharedDataService
   *
   * Otherwise it will be put into in inactive session list in the sharedDataService
   */
  public joinRooms(sessionIds: string[], callback?: () => void) {
    // syncing cached spaces with server
    this.syncSpaces(sessionIds);

    const joinableSessions = sessionIds.filter(
      (sessionId) =>
        this.joinedRooms.indexOf(sessionId) === -1 &&
        !this.spaceRepo.getSpace(sessionId)?.isInitialSpaceStateLoaded,
    );
    const sessionsWithAuth = this.populateSessionAuth(joinableSessions);
    if (!sessionsWithAuth?.length) {
      if (sessionIds.length > 0 && joinableSessions.length === 0) {
        if (callback) {
          callback();
        }
      }
      return;
    }
    this.joinSpaceRooms(sessionsWithAuth);
    this.getCurrentAnalyticsSession();
  }

  public leaveRooms(sessionIds: string[]) {
    this.socket?.emit(WebSocketEventType.LEAVE_ROOMS, sessionIds);
    const joinedRooms = this.joinedRooms.filter(
      (sessionId) => sessionIds.indexOf(sessionId) === -1,
    );
    this.joinedRooms = joinedRooms;
  }

  public getMostRecentFrameIndex(frames: FrameObservableData[]): number {
    if (frames.length === 0) {
      return -1;
    }

    let lastUpdate = frames[0].updatedAt;
    let recentIndex = 0;
    frames.forEach((frame, index) => {
      if (frame.updatedAt && lastUpdate) {
        if (frame.updatedAt > lastUpdate) {
          lastUpdate = frame.updatedAt;
          recentIndex = index;
        }
      } else if (frame.updatedAt) {
        lastUpdate = frame.updatedAt;
        recentIndex = index;
      }
    });
    return recentIndex;
  }

  public undo(): boolean {
    return this.handleUndoRedo(UndoEnum.UNDO);
  }

  public redo(): boolean {
    return this.handleUndoRedo(UndoEnum.REDO);
  }

  private assignBoardToEvent = (event: fabric.IEvent<Event>) => {
    const boardUid = this.getCurrentFrameUID();
    return { event, boardUid };
  };

  /*
   * flatten array of maps into a single map
   */
  private flattenMaps<K, V>(arrayOfMaps: Map<K, V[]>[]): Map<K, V[]> {
    const flattenedMap = new Map<K, V[]>();

    for (const _map of arrayOfMaps) {
      for (const [key, values] of _map.entries()) {
        if (!flattenedMap.has(key)) {
          flattenedMap.set(key, [...values]);
        } else {
          flattenedMap.get(key)?.push(...values);
        }
      }
    }
    return flattenedMap;
  }

  /*
   * Adds the canvas listeners to the current canvas
   */
  public createCanvasListeners(canvas: fabric.Canvas): void {
    // Run Canvas listeners outside Angular to prevent unnessary CDs
    this.zone.runOutsideAngular(() => {
      canvas.on('selection:created', () => {
        this.handleObjectSelection();
      });
      canvas.on('selection:updated', () => {
        this.handleObjectSelection();
      });

      const modifiedObjects$ = fromEvent(canvas, 'object:modified').pipe(
        untilDestroyed(this),
        tap(() => this.setObjectsSessionVars()),
        map((event) => this.assignBoardToEvent(event)),
        map((eventWithFrame) => prepareObjectForUpdate(eventWithFrame)),
        map((objects) => transformObjectsForUpdate(objects.fabricObjects, objects.boardUid)),
      );

      this.observerUtils
        .bufferedObservableOf(modifiedObjects$, this.socketEventProcessRate)
        .pipe(untilDestroyed(this))
        .subscribe((objectsToBoardMaps: Map<string | undefined, YObject[]>[]) => {
          this.handleObjectModified(objectsToBoardMaps);
        });

      const addedObejcts$ = fromEvent(canvas, 'object:added').pipe(
        untilDestroyed(this),
        tap(() => this.setObjectsSessionVars()),
        map((event) => this.assignBoardToEvent(event)),
        map((eventWithBoard) => extractObjectFromEvent(eventWithBoard)),
        map((object) => transformObject(object.fabricObject, object.boardUid)),
      );

      this.observerUtils
        .bufferedObservableOf(addedObejcts$, this.socketEventProcessRate)
        .pipe(untilDestroyed(this))
        .subscribe((objectsToAdd: ObjectWithBoard[]) => {
          this.handleObjectAdd(objectsToAdd);
        });

      const removedObejcts$ = fromEvent(canvas, 'object:removed').pipe(
        untilDestroyed(this),
        tap(() => this.setObjectsSessionVars()),
        map((event) => this.assignBoardToEvent(event)),
        map((eventWithBoard) => extractObjectFromEvent(eventWithBoard)),
        map((object) => transformObject(object.fabricObject, object.boardUid)),
      );

      this.observerUtils
        .bufferedObservableOf(removedObejcts$, this.socketEventProcessRate)
        .pipe(tap(() => this.setObjectsSessionVars()))
        .subscribe((objectsToRemove: ObjectWithBoard[]) => {
          this.handleObjectRemoved(objectsToRemove);
        });
    });
  }

  private setObjectsSessionVars(): void {
    this.telemetry.setSessionVars({
      count_board_objects_visible: (
        this.sharedDataService.fabricCanvas?.getObjects().filter((o) => o.visible) || []
      ).length,
    });
  }

  /*
   * Computes the transformation required to get a fabric object to have coords relative to the entire
   * canvas and not relative to the selection it is a part of.
   */
  public applyFabricMultTransform(
    obj: fabric.Object,
    activeTransform: any[],
    relationship: any[],
  ): fabric.Object {
    const newTransform = fabric.util.multiplyTransformMatrices(activeTransform, relationship);
    const opt = fabric.util.qrDecompose(newTransform);
    const { flipX, flipY } = obj;
    obj.set(opt);
    obj.setPositionByOrigin(new fabric.Point(opt.translateX, opt.translateY), 'center', 'center');

    // This is here because the setting of flipX and flipY is absolute and not based
    // on the previous state. This implements that logic.
    obj.flipX = obj.flipX ? !flipX : flipX;
    obj.flipY = obj.flipY ? !flipY : flipY;

    return obj;
  }

  public handleCanvasItemsAdded(
    items: YCanvasItemObject[],
    sessionId: string,
    frameUID: string,
    trackUndoRedo = true,
  ): void {
    if (!this.user || !sessionId || !frameUID) {
      return;
    }
    const ySession = this.ySessions.get(sessionId);
    for (let item of items) {
      if (item.fresh === undefined) {
        item = this.convertFrameItemToYCanvasItem(item);
        item.fresh = true;
      }

      if (item.fresh !== true) {
        return;
      }

      item.fresh = false;
    }
    const documentUpdate = ySession.addCanvasItems(frameUID, items, trackUndoRedo);

    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
    return;
  }

  // Try not to use this method directly when syncing canvasItems, instead use sendChangeStateEvent from canvas-items component
  public handleCanvasItemsModified(
    items: YCanvasItemObject[],
    sessionId: string,
    frameUID: string,
  ) {
    if (!sessionId || !frameUID) {
      return;
    }
    const ySession = this.ySessions.get(sessionId);
    for (const item of items) {
      // Update the version of the object
      const vuid = uuidv4();
      item.vuid = vuid;
    }

    const documentUpdate = ySession.modifyCanvasItems(frameUID, items);
    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
    return;
  }

  public handleCanvasItemRemoved(itemID: string, sessionId: string, frameUID: string) {
    if (!sessionId || !frameUID) {
      return;
    }

    const ySession = this.ySessions.get(sessionId);

    const documentUpdate = ySession.removeCanvasItem(frameUID, itemID);

    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
    return;
  }

  /*
   * Add a group of object to a frame in the yjs document
   */
  public handleObjectsAdd(
    sessionId: string,
    frameUID: string,
    fabricObjects: YObject[],
    trackUndoRedo = true,
  ) {
    for (const fabricObject of fabricObjects) {
      if (!this.user) {
        return;
      }
      fabricObject.userId = this.user._id;
      fabricObject.uid = uuidv4();

      // The version of this object in case it was modified
      fabricObject.vuid = uuidv4();
    }

    const ySession = this.ySessions.get(sessionId);
    const documentUpdate = ySession.addObjects(frameUID, fabricObjects, trackUndoRedo);
    const newUpdate = this.createYUpdate(documentUpdate);
    if (!newUpdate) {
      return;
    }
    this.sendUpdateToServer('update', newUpdate);
    return;
  }

  public createYUpdate(
    documentUpdate: Uint8Array,
    sessionId?: string,
    frameUID?: string,
  ): SessionsUpdate | null {
    frameUID = frameUID ?? this.getCurrentFrameUID();
    sessionId = sessionId ?? this.getSessionId();

    if (!sessionId || !this.user) {
      return null;
    }

    const sessionUpdate: SessionsUpdate = {
      userId: this.user._id,
      timestamp: new Date(),
      sessionsUpdates: [
        {
          sessionId,
          documentUpdate: {
            yjs_update: documentUpdate,
          },
        },
      ],
    };
    return sessionUpdate;
  }

  public sendMouseFrameEvent(
    type: string,
    events: (IEvent | undefined)[],
    metadata: any = null,
    volatile = false,
  ) {
    if (!metadata) {
      metadata = {};
    }

    // Add uid for realtime scribbles
    if (type == 'draw') {
      metadata.uid = this.getRealtimeScribbleUid(metadata.state);
    }
    metadata.userName = this.user?.name;
    metadata.email = this.user?.email;

    const filteredEvents = <fabric.IEvent[]>events.filter((x) => x?.pointer);
    const mouse: Mouse[] = filteredEvents.map((e) => {
      const m: Mouse = {
        type: type,
        event: undefined,
        // Note: this cast is safe because we filter out all events without a point above
        point: <Point>e.pointer,
        metadata,
      };
      return m;
    });

    if (!mouse.length) {
      return;
    }

    const frameEvent: FrameEvent = { type: FrameEventType.Realtime, mouse };

    frameEvent.activeToolbar = this.getRTEventToolbar();

    // save the last mouse event to be sent for new joined users
    this.sharedDataService.lastMouseMoveEvent = frameEvent;
    this.sendFrameEvent(frameEvent, volatile);
  }

  getRTEventToolbar(): string {
    const isLeaderMode = this.leaderModeRepositoryService.getCurrentState().leaderModeEnabled;

    const isBoardLocked = !this.sharedDataService.canModifySession();

    if (isBoardLocked && isLeaderMode) {
      return 'pointer';
    }

    if (this.sharedDataService.createComment$.getValue()) {
      return 'text';
    }

    const activeObject = this.sharedDataService.fabricCanvas
      ?.getActiveObjects()
      .find((object) => (object as any).isEditing);

    if (activeObject) {
      return 'text';
    }

    return this.sharedDataService.mainToolbar?.activeToolName || 'pointer';
  }

  /*
   * Send a frameEvent to the frameUID that this user is on
   */
  public sendFrameEvent(frameEvent: FrameEvent, volatile = false) {
    if (!volatile) {
      this.socket?.emit(WebSocketEventType.FRAME_EVENT, frameEvent);
    } else {
      this.socket?.volatile().emit(WebSocketEventType.FRAME_EVENT, frameEvent);
    }
  }

  /*
   * Adds a frame to the yjs document and propagates that change to all other clients
   */

  public addFrame(frame: Frame, sessionId: string, roomId: string) {
    frame = new Frame(frame);

    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }

    const update = this.createYUpdate(
      this.ySessions.get(sessionId).addFrame(roomId, frame),
      sessionId,
      frame.uid,
    );
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /*
   * Adds multiple frames to the yjs document and propagates that change to all other clients
   */

  public addFrames(frames: Frame[], sessionId: string, roomId: string) {
    const update = this.createYUpdate(
      this.ySessions.get(sessionId).addFrames(roomId, frames),
      sessionId,
    );
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /* er
   * Modifies a frame to the yjs document and propagates that change to all other clients
   */
  public modifyFrame(frame: Frame, sessionId: string, roomId: string) {
    frame = new Frame(frame);

    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }

    const update = this.createYUpdate(this.ySessions.get(sessionId).modifyFrame(roomId, frame));
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /**
   * move a frame to a new position
   * data saved inside the Yjs document for the session
   * @param frame
   * @param fromIndex
   * @param toIndex
   * @param sessionId
   * @param roomId
   */
  public moveFrame(
    frame: Frame,
    fromIndex: number,
    toIndex: number,
    sessionId: string,
    roomId: string,
  ) {
    frame = new Frame(frame);
    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }
    const moveUpdate = this.ySessions.get(sessionId).moveFrame(roomId, frame, fromIndex, toIndex);
    if (!moveUpdate) {
      return;
    }
    const update = this.createYUpdate(moveUpdate);
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /**
   * clear all the frame content
   * @param frame
   * @param sessionId
   * @param roomId
   */
  public clearFrameContent(frame: Frame, sessionId: string, roomId: string): void {
    frame = new Frame(frame);

    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }

    const update = this.createYUpdate(
      this.ySessions.get(sessionId).clearFrameContent(roomId, frame.uid),
      sessionId,
      frame.uid,
    );
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /*
   * Removes a frame to the yjs document and propagates that change to all other clients
   */
  public removeFrame(frame: Frame, sessionId: string, roomId: string) {
    frame = new Frame(frame);

    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }

    const update = this.createYUpdate(
      this.ySessions.get(sessionId).removeFrame(roomId, frame.uid),
      sessionId,
      frame.uid,
    );
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  public updateFrameBoardFolder(frame: Frame, sessionId: string, roomId: string) {
    frame = new Frame(frame);

    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }

    const boardFolderUpdate = this.ySessions.get(sessionId).updateFrameBoardFolder(roomId, frame);

    if (!boardFolderUpdate) {
      return;
    }

    const update = this.createYUpdate(boardFolderUpdate, sessionId, frame.uid);
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }
  /**
   * update the frame privacy inside ySession
   * frame can be public => anyone in the space can access it
   * frame can be private => only hosts and  users in the privateUsers will be able to access it
   * @param frame
   * @param sessionId
   * @param roomId
   */
  public updateFramePrivacy(frame: Frame, sessionId: string, roomId: string) {
    frame = new Frame(frame);
    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }
    const update = this.createYUpdate(
      this.ySessions.get(sessionId).updateFramePrivacy(roomId, frame),
    );
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /**
   * will call the ySession to get the session and update the lock state for the frame
   * @param roomId
   * @param frame
   * @param sessionId
   */
  public updateFrameLockState(frame: Frame, sessionId: string, roomId: string): void {
    frame = new Frame(frame);
    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }
    const update = this.createYUpdate(
      this.ySessions.get(sessionId).updateFrameLockState(roomId, frame),
    );
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /**
   * will call the ySession to get the session
   * and update the background color and pattern for the frame
   * @param roomId
   * @param frame
   * @param sessionId
   */
  public updateFrameBackground(frame: Frame, sessionId: string, roomId: string): void {
    frame = new Frame(frame);
    if (!frame.validate()) {
      this.captureInvalidFrame(frame);
      return;
    }
    const update = this.createYUpdate(
      this.ySessions.get(sessionId).updateFrameBackground(roomId, frame),
    );
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  // Modifies the current users metadata for the current session
  public async modifyTemporaryUserMetadata(
    partialTemporaryMetadata: Partial<TemporaryUserMetadataEntryBase>,
  ) {
    const userId = this.user?._id;
    const sessionId = this.getSessionData()[0];

    if (!userId || !sessionId) {
      return;
    }

    if (!this.flagsService.isFlagEnabled(FLAGS.ENABLE_REDIS_USER_TEMPORARY_METADATA)) {
      this.modifyTemporaryUserMetadataYJS(partialTemporaryMetadata);
    } else {
      this.modifyTemporaryUserMetadataRedis(partialTemporaryMetadata);
    }
  }

  private async modifyTemporaryUserMetadataRedis(
    partialTemporaryMetadata: Partial<TemporaryUserMetadataEntryBase>,
  ) {
    const curState = this.temporaryUserMetadataRepository.getCurrentUserMetadata(
      this.userService.userUniqueHash,
      this.spaceRepo.activeSpaceId!,
    ) as TemporaryUserMetadataEntryRedis;
    if (partialTemporaryMetadata.leader === undefined) {
      partialTemporaryMetadata.leader = curState.leader;
    }

    // update temporary data only when
    // 1) leader mode is on
    // 2) leader mode changed
    // 3) raise hand changed
    if (
      curState.leader ||
      partialTemporaryMetadata.leader ||
      !!partialTemporaryMetadata.raiseHand !== !!curState.raiseHand
    ) {
      const dataEvent: TemporaryUserMetadataEntryRedis = {
        ...curState,
        ...partialTemporaryMetadata,
        frameUID: this.spaceRepo.activeSpace?.selectedBoardUid,
        breakoutRoomUid: this.spaceRepo.activeSpaceCurrentRoomUid,
        sessionView: this.sharedDataService.sessionView.getSessionView(),
        timestamp: Date.now(),
      };
      this.temporaryUserMetadataRepository.setNewEvents([dataEvent]);
      this.sendUpdatedUserMetadataToRedis();
    }
  }

  private modifyTemporaryUserMetadataYJS(
    partialTemporaryMetadata: Partial<TemporaryUserMetadataEntryBase>,
  ) {
    const sessionId = this.getSessionData()[0]!;
    const userId = this.user!._id;

    for (const socketId of this.socketIdHistory) {
      const yTemporaryUserMetadata = this.ySessions
        .get(sessionId)
        .getTemporaryUserSocketMetadata(socketId);
      if (
        !Object.prototype.hasOwnProperty.call(partialTemporaryMetadata, 'leader') &&
        yTemporaryUserMetadata?.leader
      ) {
        partialTemporaryMetadata.leader = yTemporaryUserMetadata.leader;
      }

      if (socketId !== this.socketId) {
        this.modifyYSession((ySession) => ySession.removeTemporaryUserMetadata(socketId));
        this.socketIdHistory.delete(socketId);
      }
    }

    // update temporary data only when
    // 1) leader mode is on
    // 2) leader mode changed
    // 3) raise hand changed
    if (
      partialTemporaryMetadata.leader ||
      this.ySessions.get(sessionId).getTemporaryUserSocketMetadata(this.socketId)?.leader ||
      partialTemporaryMetadata.raiseHand !==
        this.ySessions.get(sessionId).getTemporaryUserSocketMetadata(this.socketId)?.raiseHand
    ) {
      this.modifyYSession((ySession) =>
        ySession.modifyTemporaryUserMetadata(this.socketId, userId, {
          ...partialTemporaryMetadata,
          frameUID: this.spaceRepo.activeSpace?.selectedBoardUid,
          breakoutRoomUid: this.spaceRepo.activeSpaceCurrentRoomUid,
          sessionView: this.sharedDataService.sessionView.getSessionView(),
        }),
      );
    }
  }

  private sendUpdatedUserMetadataToRedis() {
    this.numberOfUserMetadataUpdatesQueue++;
    this.sendUserMetadataUpdateMutex.runExclusive(() => {
      const curState = this.temporaryUserMetadataRepository.getCurrentUserMetadata(
        this.userService.userUniqueHash,
        this.spaceRepo.activeSpaceId!,
      );
      this.socket
        ?.timeout(this.WB_SERVER_ACK_TIMEOUT)
        .emit(WebSocketEventType.USER_TEMPORARY_METADATA_UPDATE, curState, (err: unknown) => {
          this.numberOfUserMetadataUpdatesQueue--;
          if (err) {
            this.telemetry.errorEvent('WB server update user metadata ack timeout');
            if (this.numberOfUserMetadataUpdatesQueue === 0) {
              // no other updates exist so we will try doing the update again
              this.sendUserMetadataUpdateMutex.cancel();
              this.sendUpdatedUserMetadataToRedis();
            }
          }
        });
    });
  }

  public modifySessionMetadata(sessionMetadata: Partial<SessionMetadata>) {
    this.modifyYSession((ySession) => ySession.modifySessionMetadata(sessionMetadata));
  }

  public async clearTemporaryUserMetadata(sessionId: string) {
    if (this.flagsService.isFlagEnabled(FLAGS.ENABLE_REDIS_USER_TEMPORARY_METADATA)) {
      this.modifyTemporaryUserMetadata({ leader: false, raiseHand: false });
    } else {
      this.modifyYSession(
        (ySession) => ySession.clearTemporaryUserMetadata(this.socketId),
        sessionId,
      );
    }
  }

  public celebrate(celebrationId: number): void {
    const event = new SessionEvent(EventCategory.Users, 'users:celebrate', {
      celebrationId: celebrationId,
    });
    this.sendEvent(this, event);
  }

  public emote(emojiId: number): void {
    const eventType =
      emojiId === emojiLibraryIndex.HAND_RAISED
        ? AnalyticsEventType.HAND_RAISED
        : AnalyticsEventType.REACTIONS;
    this.analyticsService.addToAnalyticsEventsBatch(eventType);
    const event = new SessionEvent(EventCategory.Users, 'users:emote', { emojiId: emojiId });
    this.sendEvent(this, event);

    // to handle spamming of emotes store timerIds
    this.timerArray.push(
      modifiedSetTimeout(() => {
        const resetEvent = new SessionEvent(EventCategory.Users, 'users:emote', { emojiId: -1 });
        this.sendEvent(this, resetEvent);
        this.timerArray = [];
      }, 2500),
    );

    if (this.prevEmojiId >= 0 && this.timerArray.length - 1 > 0) {
      // clear older timeouts
      for (let i = 0; i < this.timerArray.length - 1; i++) {
        clearTimeout(this.timerArray[i]);
      }
    }
    this.prevEmojiId = emojiId;
  }

  /**
   * send the pointers of the user to be emitted to the new joined user
   * @param frameEvent
   * @private
   */
  private sendMousePointerEvent(frameEvent: FrameEvent): void {
    this.socket?.volatile().emit(WebSocketEventType.POINTER_USER_EVENT, frameEvent);
    // this.socket.volatile.emit('pointerUserEvent', frameEvent);
  }

  public isConvertedToRooms(sessionId?: string): boolean {
    if (!sessionId) {
      return false;
    }
    return this.ySessions.get(sessionId).isConvertedToRooms();
  }

  removeAllFrames(): void {
    const sessionId = this.getSessionId();
    const currentRoomUid = this.spaceRepo.activeSpace?.currentRoomUid;
    if (!sessionId || currentRoomUid === undefined || currentRoomUid === null) {
      return;
    }
    const ySession = this.ySessions.get(sessionId);

    const newFrame = new Frame({
      name: `${this.translateService.instant('Board')} 1`,
      frameType: FrameType.WHITEBOARD,
    });
    ySession.addFrame(currentRoomUid, newFrame);

    const frames = this.spaceRepo.getSpaceFrames(sessionId, currentRoomUid);

    if (frames) {
      for (const frame of frames) {
        if (frame.uid !== newFrame.uid) {
          ySession.removeFrame(currentRoomUid, frame.uid);
        }
      }
    }

    const update = this.createDifferentialSessionUpdate(sessionId) ?? null;

    const sessionUpdate: SessionsUpdate = {
      userId: this.user!._id,
      timestamp: new Date(),
      sessionsUpdates: [update!],
    };
    this.sendUpdateToServer('update', sessionUpdate);
  }

  /* Add the newly created breakout room to yjs doc and propagates the updates to other clients */
  addRooms(sessionId: string, rooms: Room[]) {
    rooms.forEach((room) => this.validateRoomSchema(room));
    const documentUpdate = this.ySessions.get(sessionId).addRooms(rooms);
    const update = this.createYUpdate(documentUpdate, sessionId);

    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /**
   * add new board folder to the yjs doc board folders array
   * @param sessionId
   * @param boardFolder
   */
  addBoardFolder(sessionId: string, boardFolder: BoardFolder): void {
    const documentUpdate = this.ySessions.get(sessionId).addBoardFolder(boardFolder);
    const update = this.createYUpdate(documentUpdate, sessionId);

    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  removeAllBoardFolders(sessionId: string | undefined): void {
    const currentRoomUid = this.spaceRepo.activeSpace?.currentRoomUid;
    if (!sessionId || !currentRoomUid) {
      return;
    }

    this.ySessions.get(sessionId).removeAllBoardFolders(currentRoomUid);
  }

  /**
   * remove the board folder with its frames
   * @param boardFolder
   * @param sessionId
   * @param roomId
   */
  removeBoardFolder(boardFolder: BoardFolder, sessionId: string, roomId: string) {
    if (!sessionId || !roomId) {
      return;
    }
    const ySession = this.ySessions.get(sessionId);
    // remove board folder frames first
    const frames = this.spaceRepo.getSpaceFrames(sessionId, roomId);
    const boardFolderFrames = frames?.filter((frame) => frame.boardFolderUid === boardFolder.uid);
    if (boardFolderFrames?.length) {
      for (const frame of boardFolderFrames) {
        if (frame) {
          ySession.removeFrame(roomId, frame.uid);
        }
      }
    }
    // remove board folder itself
    ySession.removeBoardFolder(boardFolder.uid);
    // create the update with the changes
    const update = this.createDifferentialSessionUpdate(sessionId) ?? null;

    const sessionUpdate: SessionsUpdate = {
      userId: this.user!._id,
      timestamp: new Date(),
      sessionsUpdates: [update!],
    };
    // send the update
    this.sendUpdateToServer('update', sessionUpdate);
  }

  /* er
   * Modifies a boardFolder to the yjs document and propagates that change to all other clients
   */
  public modifyBoardFolder(boardFolder: BoardFolder, sessionId: string, roomId: string) {
    boardFolder = new BoardFolder(boardFolder);

    if (!sessionId || !roomId) {
      return;
    }

    const modifyUpdate = this.ySessions.get(sessionId).modifyBoardFolder(boardFolder);
    if (!modifyUpdate) {
      return;
    }

    const update = this.createYUpdate(modifyUpdate);
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /* Modify a breakout room in yjs doc and propagates the updates to other clients*/
  public modifyRoom(room: Room, sessionId: string) {
    room = new Room(room);
    this.validateRoomSchema(room);
    const update = this.createYUpdate(this.ySessions.get(sessionId).modifyRoom(room));
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  public moveFrameBetweenRooms(sessionId: string, movedBoardsPlacement: MovedBoardsPlacement[]) {
    const documentUpdate = this.ySessions
      .get(sessionId)
      .moveFrameBetweenRooms(movedBoardsPlacement);
    const update = this.createYUpdate(documentUpdate, sessionId);

    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /* Delete breakout room from yjs doc and propagates the updates to other clients */
  deleteRooms(sessionId: string, roomIds: string[]) {
    roomIds = roomIds.filter((roomId) => Session.getMainRoomId(sessionId, true) !== roomId);

    const update = this.createYUpdate(
      this.ySessions.get(sessionId).deleteRooms(roomIds),
      sessionId,
    );
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /*
   * Returns the list of authData structures to pass to the socket server to get the session data
   */
  public populateSessionAuth(sessionIds: string[]) {
    if (!this.user) {
      return;
    }
    const res: { id: string; token: string; userId: string }[] = [];
    for (const sessionId of sessionIds) {
      const sessionData = this.sharedDataService.sessionAuthData;
      if (!(sessionId in sessionData)) {
        continue;
      }
      const sessionAuth = sessionData[sessionId].sessionAuth;
      res.push({
        id: sessionId,
        token: sessionAuth.token,
        userId: this.user._id,
      });
    }
    return res;
  }

  setupSessionListeners() {
    // listen for frame changes and emit undo/redo stack length for each undo
    let undoStackSubscriber: Subscription | undefined;
    let yDocPendingChangesSubscriber: Subscription | undefined;
    this.sessionSubscriptions = combineLatest([
      this.spaceRepo.activeSpaceId$,
      this.spaceRepo.activeSpaceSelectedBoardUid$,
    ]).subscribe(([spaceId, selectedBoardUid]) => {
      if (!spaceId || !selectedBoardUid) {
        return;
      }
      const ySession = this.ySessions.get(spaceId);
      undoStackSubscriber?.unsubscribe();
      undoStackSubscriber = ySession
        .getUndoStackObservable(selectedBoardUid)
        .subscribe((stacks) => {
          this.undoStackSubject.next(stacks.undoStack);
          this.redoStackSubject.next(stacks.redoStack);
        });
      this.sessionSubscriptions?.add(undoStackSubscriber);

      this.sharedDataService.clearFrameContent$.pipe(untilDestroyed(this)).subscribe(() => {
        this.undoStackSubject.next(0);
        this.redoStackSubject.next(0);
      });

      yDocPendingChangesSubscriber = this.observerUtils
        .throttledObservableOf<boolean>(
          this.ySessions.get(spaceId).yDocPendingChangesSubject,
          this.YDOC_PENDING_CHANGES_DELAY,
        )
        .pipe(untilDestroyed(this))
        .subscribe({
          next: (next) => {
            if (next) {
              this.handleYDocPendingChages();
            }
          },
          error: (error) => {
            Sentry.captureException(new Error('yDocPendingChangesSubject error'), {
              extra: {
                error: error,
              },
            });
          },
        });
      this.sessionSubscriptions?.add(yDocPendingChangesSubscriber);
    });
  }

  private validateRoomSchema(room: Room): void {
    const result = permissionsSchema.safeParse(room.permissions);
    if (!result.success) {
      Sentry.captureException(new Error('Room Permission Validation Error'));
      this.telemetry.errorEvent('Room Permission Validation Error', { room, result });
    }
  }

  private handleYDocPendingChages() {
    this.telemetry.event('YJS Missing Update', { user: this.user?._id });
    this.sendSyncWithServer();
  }

  private handleDisconnect() {
    if (this.disconnected) {
      const oldRooms = [...this.joinedRooms];
      this.joinedRooms = [];

      // When we merge there may be a lot of differences in frames
      // If possible we should try to remain on the frame we are on
      const oldFrameUID = this.spaceRepo.activeSpace?.selectedBoardUid;
      if (oldFrameUID) {
        this.sharedDataService.frameUIDBeforeDisconnect.push(oldFrameUID);
      }

      // Once we join again we should publish any updates we made in the interim
      this.joinRooms(oldRooms);

      if (this.userRoom) {
        this.joinUserRoom(this.userRoom);
      }
    }
  }

  private cleanupSocket() {
    this.socket?.disconnect();
  }

  /*
   * Broadcasts an event to all remote users that have joined the session
   */

  private sendHeartbeat() {
    if (!this.user) {
      return;
    }
    this.socket?.emit(WebSocketEventType.HEARTBEAT, this.activeSessionId);
  }

  private createRTFrameSubscription() {
    if (this.rtFrameSubscription) {
      this.rtFrameSubscription.unsubscribe();
    }

    if (!this.socket) {
      throw new Error('creating rt subcsription before socket has been created');
    }

    this.roomsSubscription?.unsubscribe();
    // Listen to changes in activeSpaceId, selectedBoard, and currentRoom and:
    // 1. Call join room when a board/room combination changes
    // 2. When activeSpaceId/currentRoom changes, "listen" to frame changes to update Space store
    this.rtFrameSubscription = combineLatest([
      this.spaceRepo.activeSpaceId$,
      this.spaceRepo.activeSpaceSelectedBoardUid$,
      this.spaceRepo.activeSpaceCurrentRoomUid$,
      fromEvent(this.socket, SocketConnectionEvent.CONNECT).pipe(startWith({})),
    ]).subscribe(([spaceId, selectedBoardUid, currentRoomUid]) => {
      if (spaceId && selectedBoardUid && currentRoomUid) {
        this.joinFrameRoom(`${spaceId}_${selectedBoardUid}`, currentRoomUid);
      }
    });
  }

  private getSessionId() {
    return this.spaceRepo.activeSpace?._id;
  }

  private handleBulkFrameEvents(events: FrameEvent[]) {
    const realTimeEvents = events.filter(
      (event) => event.userId && event.type === FrameEventType.Realtime,
    );
    const userEvents = events.filter(
      (event) =>
        event.userId &&
        (event.type === FrameEventType.UserLeft ||
          event.type === FrameEventType.UserJoined ||
          event.type === FrameEventType.RequestFrame),
    );

    // handle RealTime events in bulk
    const bulkEvents: { mouse: Mouse; user: string; activeToolbar?: string }[] = [];
    for (const realtimeEvent of realTimeEvents) {
      realtimeEvent?.mouse?.forEach((m) => {
        if (m) {
          m.metadata.timestamp = performance.now(); // add timestamp when event was received
          bulkEvents.push({
            mouse: m,
            user: realtimeEvent.userId!,
            activeToolbar: realtimeEvent.activeToolbar,
          });
        }
      });
    }

    if (bulkEvents.length) {
      this.realTimeEventSubject.next(bulkEvents);
    }

    // handle User events next
    this.handleUserEvents(userEvents);
  }

  private handleUserEvents(userEvents: FrameEvent[]) {
    const usersLeft: string[] = [];
    const usersJoined: string[] = [];
    for (const userEvent of userEvents) {
      if (userEvent.type === FrameEventType.UserLeft) {
        usersLeft.push(userEvent.userId!);
      } else if (userEvent.type === FrameEventType.UserJoined) {
        usersJoined.push(userEvent.userId!);
        // send the current mouse pointers to the user joined, so we can draw the mouses
        if (this.flagsService.isFlagEnabled(FLAGS.PROPAGATE_POINTER_LOCATIONS_ON_JOIN)) {
          this.sendMousePointerEvent({
            ...this.sharedDataService.lastMouseMoveEvent,
            receiverSocketId: userEvent.receiverSocketId,
          });
        }
      } else if (userEvent.type === FrameEventType.RequestFrame) {
        this.sendMousePointerEvent({
          ...this.sharedDataService.lastMouseMoveEvent,
          receiverSocketId: userEvent.receiverSocketId,
        });
      }
    }
    if (usersLeft.length) {
      this.usersLeftFrame.next(usersLeft);
    }
    if (usersJoined.length) {
      this.usersJoinedFrame.next(usersJoined);
    }
  }

  private applyUpdate(update: SessionsUpdate) {
    this.applyYUpdates(update);
    this.applyEventUpdates(update);
  }

  private applyBulkUpdates(updates: SessionsUpdate[]) {
    for (const update of updates) {
      this.applyUpdate(update);
    }
  }

  /*
   * Applies the documentUpdate for all frames, then rerenders the current frame
   */
  private applyYUpdates(update: SessionsUpdate) {
    this.logYMetrics(update);
    update.sessionsUpdates.forEach((sessionUpdate) => {
      try {
        if (sessionUpdate.documentUpdate && sessionUpdate.documentUpdate.yjs_update) {
          const ySession = this.ySessions.get(sessionUpdate.sessionId);
          ySession.applyUpdate(sessionUpdate);
          this.updateServerState(sessionUpdate);
        }
      } catch (err) {
        Sentry.captureException(err);
        this.telemetry.errorEvent('corrupted_update');
      }
    });
    this.sharedDataService.socketYUpdate.next();
  }

  private logYMetrics(update: SessionsUpdate) {
    const e2eLatency = new Date().getTime() - new Date(update.timestamp).getTime();
    this.telemetry.updateMetric(ExternalMetric.yjsUpdateLatency, e2eLatency);

    update.sessionsUpdates
      .map((sessionUpdate) => sessionUpdate.documentUpdate?.yjs_update?.byteLength)
      .filter(notEmpty)
      .forEach((updateSize) => {
        this.telemetry.updateMetric(ExternalMetric.yjsUpdateSize, updateSize);
        this.telemetry.incrementMetric(CountedMetric.yDocRx);
      });
  }

  private _applyUpdateAndRemovePendingUpdates(update: SessionUpdate) {
    const ySession = this.ySessions.get(update.sessionId);
    const pendingUpdatesRemoved = ySession.applyUpdateAndRemovePendingUpdates(update);
    if (pendingUpdatesRemoved) {
      this.telemetry.event('space has pending updates on load');
    }
  }

  private updateServerState(update: SessionUpdate | ServerStateUpdate) {
    const sessionId = update.sessionId;
    if (update.serverStateVector) {
      this.ySessions.get(sessionId).updateServerState(update.serverStateVector);
    }
  }

  private updateBulkServerStates(updates: SessionUpdate[] | ServerStateUpdate[]) {
    for (const update of updates) {
      this.updateServerState(update);
    }
  }

  /*
   * Sends the update from the remote client to the local client
   */
  private applyEventUpdates(update: SessionsUpdate) {
    update.sessionsUpdates.forEach((sessionUpdate) => {
      // Don't want to update the UI if it is for a different session
      if (sessionUpdate.sessionId !== this.activeSessionId) {
        return;
      }

      if (!sessionUpdate.events) {
        return;
      }
      sessionUpdate.events.forEach((event) => {
        this.publishEventToSubscribers(event, update.userId);
      });
    });
  }

  private publishEventToSubscribers(event: SessionEvent, userId: string) {
    // Update any local subscribers
    if (!this.eventRegistry[event.category]) {
      return;
    }

    this.zone.run(() => {
      this.eventRegistry[event.category].forEach((subscriber: SessionEventSubscriber) => {
        subscriber.sessionEventReceived(event, userId);
      });
    });
  }

  private getCurrentFrameUID(): string | undefined {
    return this.spaceRepo.activeSpace?.selectedBoardUid;
  }

  private getSessionData(sessionId?: string, frameUID?: string) {
    sessionId = sessionId || this.getSessionId();
    frameUID = frameUID || this.getCurrentFrameUID();

    return [sessionId, frameUID];
  }

  // Response is true if an undo/redo occurred,
  // false if you are at the end of the undo/redo stack or data is missing
  private handleUndoRedo(type: UndoEnum): boolean {
    const activeSpace = this.spaceRepo.activeSpace;
    if (!activeSpace || !activeSpace.selectedBoardUid) {
      return false;
    }

    const beforeUndoRedoObjects = cloneDeep(this.sharedDataService.fabricCanvas?.getObjects());

    const ySession = this.ySessions.get(activeSpace._id);
    const undoRes = ySession.undoRedo(activeSpace.selectedBoardUid, type);

    // If no objects have changed then there was no op
    if (!undoRes) {
      return false;
    }

    const documentUpdate = undoRes.updates;
    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return false;
    }

    this.sendUpdateToServer('update', update);
    this.sharedDataService.trackUndoRedo.next({
      beforeUndoRedoObjects: beforeUndoRedoObjects as fabric.Object[],
      changedUIDs: new Set(undoRes.updatedIds),
    });
    return true;
  }

  /*
   * When a selection is created if it is a group selection
   * then we need to calculate the reference transform
   * we store it on a property called relationship
   * reference: http://fabricjs.com/using-transformations
   */
  private handleObjectSelection() {
    const activeObject = this.sharedDataService.fabricCanvas?.getActiveObject();
    const targets: any[] | undefined = this.sharedDataService.fabricCanvas?.getActiveObjects();
    if (!activeObject || !targets || targets.length === 1) {
      return;
    }
    const activeTransform = activeObject.calcTransformMatrix();
    const invertedActiveTransform = fabric.util.invertTransform(activeTransform);

    targets.forEach((o) => {
      const desiredTransform = fabric.util.multiplyTransformMatrices(
        invertedActiveTransform,
        o.calcTransformMatrix(),
      );
      this.relationships[o.uid] = desiredTransform;
    });
  }

  /*
   * When an object is modified we need to apply the relationship relative to the selection
   * this returns the objects to the non-relative space
   *
   * To modify the yjs doc state we delete at the index of the object
   * then insert the new transformed object
   *
   * note: flipX and flipY are absolute when applying the transform and does not take into
   * consideration the previous state.
   */
  private handleObjectModified(objectsToBoardMaps: Map<string | undefined, YObject[]>[]) {
    const [sessionId, frameUID] = this.getSessionData();

    if (!sessionId || !frameUID) {
      return;
    }

    const objectsToModifyMap = this.flattenMaps(objectsToBoardMaps);
    const ySession = this.ySessions.get(sessionId);

    const modifiedObjectsMap: Map<string | undefined, YObject[]> = new Map();

    // modify objects for each board
    for (const [boardUid, objects] of objectsToModifyMap) {
      if (objects.length === 0) {
        continue;
      }
      const modifiedObjects = this.modifyBoardObjects(objects);
      modifiedObjectsMap.set(boardUid, modifiedObjects);
    }

    const documentUpdate = ySession.modifyObjects(modifiedObjectsMap);
    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /*
   * Create mapping between objects to be removed and their boards
   */
  private handleObjectRemoved(objectsToRemove: ObjectWithBoard[]) {
    const [sessionId, frameUID] = this.getSessionData();
    if (!sessionId || !frameUID) {
      return;
    }
    const removedObjectsMap: Map<string | undefined, YObject[]> = new Map();
    for (const deletedObject of objectsToRemove) {
      if (Object.keys(deletedObject.object).length === 0) {
        continue;
      }
      const target = deletedObject.object.toObject(customFabricFields);

      if (!target || target.remoteDelete) {
        continue;
      }

      if (removedObjectsMap.has(deletedObject.boardUid)) {
        removedObjectsMap.get(deletedObject.boardUid)?.push(deletedObject.object);
      } else {
        removedObjectsMap.set(deletedObject.boardUid, [deletedObject.object]);
      }
    }

    if (removedObjectsMap.size === 0) {
      return;
    }

    const ySession = this.ySessions.get(sessionId);
    const documentUpdate = ySession.removeObjects(removedObjectsMap);

    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  /*
   * When an object is added we add a couple of properties to it for the collaborative part
   * uid: a unique id for the object that is used to find it in the list of fabric objects
   * fresh: denotes if the object was created on this users canvas or if its from a remote source
   * userId: not used currently
   */
  private handleObjectAdd(objectsToAdd: ObjectWithBoard[]): void {
    const [sessionId, frameUID] = this.getSessionData();
    if (!sessionId || !frameUID || !this.user) {
      return;
    }

    const addedObjectsMap: Map<string | undefined, YObject[]> = new Map();
    objectsToAdd
      .filter((target) => Object.keys(target.object).length !== 0)
      .map((target, index) => ({ yObject: target.object, index }))
      .forEach(({ yObject, index }) => {
        // we only need to send an update to remote clients for "fresh" object
        // (i.e. the ones added locally on this client)
        this.updateObjectProperties(yObject);

        if (yObject.fresh !== true) {
          return null;
        }

        // For multiple objects (group) added, calculate the absolute position for each object on the canvas
        // by applying the transform relative to the group
        this.calculateAbsolutePosition(yObject);

        // Turn off fresh to prevent ping-ponging of updates from remote clients.
        // Do this both on the outgoing object: target, and the live fabric object: objects[index]
        yObject.fresh = false;
        objectsToAdd[index].object.fresh = false;

        this.handleUndefinedTextboxCase(yObject);

        if (yObject.toObject(customFabricFields)) {
          if (addedObjectsMap.has(objectsToAdd[index].boardUid)) {
            addedObjectsMap
              .get(objectsToAdd[index].boardUid)
              ?.push(yObject.toObject(customFabricFields));
          } else {
            addedObjectsMap.set(objectsToAdd[index].boardUid, [
              yObject.toObject(customFabricFields),
            ]);
          }
        }
      });

    if (addedObjectsMap.size === 0) {
      return;
    }

    const ySession = this.ySessions.get(sessionId);
    const documentUpdate = ySession.addObjectsToDifferentBoards(addedObjectsMap);

    const update = this.createYUpdate(documentUpdate);
    if (!update) {
      return;
    }

    this.sendUpdateToServer('update', update);
  }

  private modifyBoardObjects(objects: YObject[]): YObject[] {
    const activeObjects = this.sharedDataService.fabricCanvas?.getActiveObjects() || [];
    const activeObject = this.sharedDataService.fabricCanvas?.getActiveObject();
    const activeTransform = activeObject?.calcTransformMatrix();
    const updatedObjects: YObject[] = [];

    for (const yObject of objects) {
      if (Object.keys(yObject).length === 0) {
        continue;
      }
      const vuid = uuidv4();
      yObject['vuid'] = vuid;
      let o = fabric.util.object.clone(yObject);
      // Total number of selected objects > 1 fabric and canvas items
      // FilteredTargets are fabric items only
      if (activeTransform && this.relationships[o.uid] && activeObjects.length > 1) {
        const relationship = this.relationships[o.uid];
        o = this.applyFabricMultTransform(o, activeTransform, relationship);
      }
      updatedObjects.push((o as fabric.Object).toObject(customFabricFields));
    }
    return updatedObjects;
  }

  private updateObjectProperties(yObject: YObject) {
    if (yObject.fresh === undefined) {
      yObject.userId = this.user!._id;
      // only set uid, if not already set (in case of new objects added, not copied)
      if (!yObject.uid) {
        yObject.ScribbleType = this.sharedDataService.getScribbleType();
        yObject.uid = this.getFabricScribbleUid(yObject.ScribbleType);
      }
      // The version of this object in case it was modified
      yObject.vuid = uuidv4();
      yObject.fresh = true;
    }
  }

  /**
   * This function was added to determine what are the conditions that can cause adding fabric textbox
   * without missing text property
   * related ticket : SPAC-10090
   */
  private handleUndefinedTextboxCase(yObject: YObject) {
    if (yObject.type === 'textbox' && (yObject as YObject & { text?: string }).text === undefined) {
      (yObject as YObject & { text?: string }).text = '';
      this.telemetry.event('fabric-textbox-no-text', {
        local: true,
      });
    }
  }

  private calculateAbsolutePosition(yObject: YObject) {
    const activeObject = this.sharedDataService.fabricCanvas?.getActiveObject();
    if (activeObject) {
      const activeTransform = activeObject.calcTransformMatrix();
      // Update the version of the object
      const vuid = uuidv4();
      yObject.vuid = vuid;
      let o = fabric.util.object.clone(yObject);
      if (this.relationships[o.uid]) {
        const relationship = this.relationships[o.uid];
        o = this.applyFabricMultTransform(o, activeTransform, relationship);
      }
      yObject = o;
    }
  }

  private getRealtimeScribbleUid(state: string) {
    // For realtime drawing, create and remember the uids for each scribble object.
    // This will be used to co-relate them to their corresponding Fabric objects,
    // so the remote clients can swap out realtime scribbles with persistent ones as soon
    // as they're received and rendered for them.
    let currentScribbleUid;
    if (state === 'start') {
      currentScribbleUid = uuidv4();
      this.pendingYDocScribbleUids.push(currentScribbleUid);
    } else {
      currentScribbleUid = this.pendingYDocScribbleUids[this.pendingYDocScribbleUids.length - 1];
    }
    return currentScribbleUid;
  }

  private getFabricScribbleUid(scribbleType: ScribbleType) {
    // For draw and highlight scribbles, use the same uids that were sent for their realtime strokes
    let uid;
    if (
      [ScribbleType.DRAW, ScribbleType.HIGHLIGHT].includes(scribbleType) &&
      this.pendingYDocScribbleUids.length
    ) {
      uid = this.pendingYDocScribbleUids.shift(); // remove the first element
    } else {
      uid = uuidv4();
    }
    return uid;
  }

  private async captureInvalidFrame(frame: Frame) {
    console.log('Sending invalid Update');
    if (!frame) {
      return;
    }
    const error = {
      frame: frame,
      stackTrace: Error().stack,
    };
    const jsn = JSON.stringify(error);

    const blob = new Blob([jsn], { type: 'application/json' });
    const file = new File([blob], 'sessionLogs.json');

    this.uploadService.uploadToFireStorage(file, (url) => {
      Sentry.captureException(new Error('Invalid frame detected'), {
        extra: {
          logsUrl: url,
        },
      });
    });
  }

  /*
   * Modifies a ySession and emits an update to all other users in the session
   * callback: a function that takes in a ySession and produces a yjs update
   */
  private modifyYSession(callback: (ySession: YSession) => Uint8Array, sessionId?: string) {
    if (!sessionId) {
      sessionId = this.getSessionData()[0];
    }

    if (!sessionId) {
      return;
    }

    const ySession = this.ySessions.get(sessionId);

    const yUpdate = callback(ySession);

    const update = this.createYUpdate(yUpdate);
    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  private handleConnectionStatus(): void {
    this.networkService.wbServer.pipe(untilDestroyed(this)).subscribe((value) => {
      if (value === ConnectionStatus.DISCONNECTED) {
        this.setIsOfflineIndicator(true);
      } else if (value === ConnectionStatus.CONNECTED) {
        this.setIsOfflineIndicator(false);
      }
    });
    this.networkService.browserIndicator.pipe(untilDestroyed(this)).subscribe((value) => {
      if (value === ConnectionStatus.DISCONNECTED) {
        this.setIsOfflineIndicator(true);
      } else if (value === ConnectionStatus.CONNECTED) {
        if (this.networkService.wbServer.value) {
          this.setIsOfflineIndicator(false);
        }
      }
    });
  }

  private setIsOfflineIndicator(newValue: boolean): void {
    // Check if the state is changing before entering critical section below (to avoid hammering on the mutex)
    if (this._isOffline === newValue) {
      return;
    }

    this.changeOfflineStateMutex.runExclusive(() => {
      // check again if the state is about to change, otherwise do nothing.
      if (this._isOffline === newValue) {
        return;
      }

      if (!newValue) {
        // back online
        this._reconnectAttempt = 0;
        this.networkStatus.next(NetworkStatus.CONNECTED);
        this.showReconnectedToServerNotification();
      }

      this._isOffline = newValue;
    });
  }

  isOfflineMode(): Promise<boolean> {
    return this.changeOfflineStateMutex.runExclusive(() => this.isOffline);
  }

  /**
   * build and show the notification when there is a connection issue
   * @private
   */
  private showReconnectingToServerNotification(): void {
    this.notificationToasterService.dismissNotificationsByCode([SUCCESSES.RECONNECTED_TO_SERVER]);
    this.notificationToasterService.showLoadingNotification(
      this.translateService.instant('Connection lost. Reconnecting...'),
      ERRORS.RECONNECTING_TO_SERVER,
    );
    this.telemetry.event('Reconnecting to server notification', {});
  }
  private showReconnectedToServerNotification() {
    this.notificationToasterService.dismissLoadingNotification(ERRORS.RECONNECTING_TO_SERVER);
    const title = [
      { icon: 'check' },
      this.translateService.instant('Reconnected. You’re back online!'),
    ];
    const titleElement = new IconMessageToasterElement(...title);
    const successNotificationData = new NotificationDataBuilder(SUCCESSES.RECONNECTED_TO_SERVER)
      .style(ToasterPopupStyle.SUCCESS)
      .type(NotificationType.SUCCESS)
      .timeOut(5)
      .width(254)
      .toastClass('connected-toast')
      .topElements([titleElement])
      .dismissable(false)
      .build();
    this.notificationToasterService.showNotification(successNotificationData);
    this.telemetry.event('Connected to server notification', {});
  }

  private sendEmote(gestureName: GestureName) {
    this.emote(emojiLibraryIndex[gestureName]);
  }

  private sendUpdateToServer(event: any, update: SessionsUpdate | null) {
    // skipping ack timeout for non-update events like network
    if (update?.sessionsUpdates[0].documentUpdate) {
      this.socket?.timeout(this.WB_SERVER_ACK_TIMEOUT).emit(
        event,
        update,
        (
          err: unknown,
          response: {
            serverStateVector: SessionUpdate | ServerStateUpdate;
            err: WhiteboardServerErrors;
          },
        ) => {
          if (err) {
            Sentry.captureException(new Error('WB server update ack timeout'));
            this.wbServerAckTimeoutSubject.next(true);
          } else if (response.err === WhiteboardServerErrors.UPDATE_FAILED) {
            // Sync with the server for the active space
            this.telemetry.event('Sync With Server Failed');
            this.sendSyncWithServer();
          } else if (response.serverStateVector) {
            const { serverStateVector } = response;
            this.updateServerState(serverStateVector);
            this.wbServerAckTimeoutSubject.next(false);
          }
        },
      );
      // increment yDoc send
      this.telemetry.incrementMetric(CountedMetric.yDocTx);
    } else {
      this.socket?.emit(event, update);
    }
    const allUpdates = update?.sessionsUpdates.map(
      (sessionUpdate) => sessionUpdate.documentUpdate?.yjs_update,
    );
    if (allUpdates) {
      for (const sentUpdate of allUpdates) {
        // Check if update is more than 100 KB
        if (sentUpdate && sentUpdate.byteLength > 102400) {
          this.telemetry.event('ydoc_large_update', {
            ydoc_update_size: sentUpdate.byteLength / 1024,
            spaceId: this.spaceRepo.activeSpace?._id,
            userId: this.user?._id,
          });
        }
      }
    }
  }

  private syncYUpdates(syncFromServer: SessionsUpdate) {
    syncFromServer.sessionsUpdates.forEach((sessionUpdate) => {
      if (sessionUpdate.documentUpdate && sessionUpdate.documentUpdate.yjs_update) {
        this.syncsInProgress.remove(sessionUpdate.sessionId);
        const ySession = this.ySessions.get(sessionUpdate.sessionId);
        ySession.applyUpdate(sessionUpdate);

        this.updateServerState(sessionUpdate);
        const update = ySession.getUpdate();
        if (update) {
          this.sendUpdateToServer('update', this.createYUpdate(update));
        }
      }
      const space = this.spaceRepo.getSpace(sessionUpdate.sessionId);
      if (space && !space.hasSpaceSynced) {
        const firstSpaceSyncSize = sessionUpdate?.documentUpdate?.yjs_update?.byteLength ?? 0;
        const initialGCSSpaceSize = space.initialGCSSpaceSize ?? 0;
        this.telemetry.setSessionVars({
          activeSessionSize: firstSpaceSyncSize + initialGCSSpaceSize,
        });
        this.spaceRepo.updateSpaceUI(sessionUpdate.sessionId, { hasSpaceSynced: true });
      }
    });
    this.sharedDataService.socketYUpdate.next();
  }

  private sendSyncWithServer(
    sessionId?: string,
    reconnection?: boolean,
    isFirstSpaceSync?: boolean,
  ) {
    if (this.flagsService.isFlagEnabled(FLAGS.ENABLE_SYNC_WITH_WB_SERVER)) {
      sessionId = sessionId || this.getSessionId();
      if (
        (!sessionId ||
          this.syncsInProgress.has(sessionId) ||
          !this.spaceRepo.getSpace(sessionId)?.isInitialSpaceStateLoaded) &&
        (!reconnection || !sessionId)
      ) {
        return;
      }
      const sessionSync: SessionsUpdate = {
        userId: this.user!._id,
        timestamp: new Date(),
        sessionsUpdates: [
          {
            sessionId: sessionId!,
            documentUpdate: {
              yjs_update: undefined,
            },
            clientStateVector: this.ySessions.get(sessionId).getClientStateVector(),
            isFirstSpaceSync: isFirstSpaceSync,
          },
        ],
      };
      this.syncsInProgress.add(sessionId, this.SYNC_TIMEOUT);
      this.socket?.emit(WebSocketEventType.SYNC_WITH_SERVER, sessionSync);
    }
  }

  private async syncSessionData(spaceId: string) {
    try {
      const spaceFromServer: Session | undefined = (
        await this.spacesService.getSession(spaceId).toPromise()
      )?.session;
      const cachedSpace = this.spaceRepo.activeSpace;
      if (!spaceFromServer || !cachedSpace) {
        return;
      }
      cachedSpace.populatedUsers = spaceFromServer.populatedUsers;
      cachedSpace.title = spaceFromServer.title;
      cachedSpace.isLocked = spaceFromServer.isLocked;
      cachedSpace.visibility = spaceFromServer.visibility;
      cachedSpace.sessionPermissions = spaceFromServer.sessionPermissions;
      cachedSpace.users = spaceFromServer.users;
      cachedSpace.users
        .filter((user) => !Session.isOwnedByUser(spaceFromServer, user._id))
        .forEach((user) => (user.userPermissions = spaceFromServer.sessionPermissions));
      cachedSpace.accessRequesters = spaceFromServer.accessRequesters;
      cachedSpace.populatedAccessRequesters = spaceFromServer.populatedAccessRequesters;
      cachedSpace.settings = spaceFromServer.settings;

      this.spaceRepo.updateSpace(spaceId, cachedSpace);
    } catch (err) {
      console.log(err);
    }
  }

  public callSyncSessionData(): void {
    this._syncSessionDataSubject.next();
  }

  private updateWebSocketConnectionStatus(
    wsConnectionstatus: ConnectionStatus.CONNECTED | ConnectionStatus.DISCONNECTED,
    shouldUpdateWBStatus = true,
  ) {
    if (shouldUpdateWBStatus) {
      this.networkService.wbServer.next(wsConnectionstatus);
    }
    this.disconnected = wsConnectionstatus == ConnectionStatus.DISCONNECTED;
  }

  getSocketId() {
    return this.socketId;
  }

  /*
   * Creates a differential update for a session with the minimal amount of context needed
   */
  private createDifferentialSessionUpdate(sessionId: string): SessionUpdate | undefined {
    const ySession = this.ySessions.get(sessionId);
    const update = ySession.getUpdate();

    const documentUpdate: DocumentUpdate = {
      yjs_update: update,
    };

    const sessionUpdate: SessionUpdate = {
      sessionId,
      documentUpdate,
    };
    return sessionUpdate;
  }

  private async syncSpaces(spaceIds: string[]) {
    // syncing with server should reconnect user to room
    spaceIds
      .filter((spaceId) => this.spaceRepo.getSpace(spaceId)?.isInitialSpaceStateLoaded)
      .forEach(async (spaceId) => {
        this.sendSyncWithServer(spaceId);
        await this.syncSessionData(spaceId);
      });
  }

  private joinSpaceRooms(sessionsWithAuth: SessionAuth[]) {
    this.socket?.emit(WebSocketEventType.JOIN_SOCKET_ROOMS, {
      sessionsWithAuth,
      chunking: this.flagsService.isFlagEnabled(FLAGS.SPACE_BINARY_CHUNKING),
      breakoutRoomsEnabled: this.flagsService.isFlagEnabled(FLAGS.BREAKOUT_ROOMS),
      enableDirectClientDownloadFromGCS: this.flagsService.isFlagEnabled(
        FLAGS.ENABLE_DIRECT_CLIENT_DOWNLOAD_FROM_GCS,
      ),
      shouldSendGcsDownloadLinkImmediately: this.flagsService.isFlagEnabled(
        FLAGS.ENABLE_DIRECT_CLIENT_DOWNLOAD_FROM_GCS,
      ),
    });
  }

  public getSpaceDownloadProgressObservable(): Observable<number> {
    return this._spaceDownloadProgress$.asObservable();
  }

  public sessionToJSON(sessionId: string) {
    return this.ySessions.get(sessionId).getYDocAsJSON();
  }

  public sessionToBase64(sessionId: string) {
    return this.ySessions.get(sessionId).getYDocAsBase64();
  }

  public getSessionSize(sessionId: string) {
    const byte = this.ySessions.get(sessionId).getYDocSize();
    return {
      sizeByte: `${byte} byte`,
      sizeKB: `${(byte * 0.001).toFixed(2)} kb`,
      sizeMB: `${(byte * 0.000001).toFixed(2)} mb`,
    };
  }

  public logSize() {
    if (this.activeSessionId) {
      const size = this.getSessionSize(this.activeSessionId);
      console.log({ sessionSize: '⚖️⚖️⚖️', size });
    }
  }

  public logSpace() {
    if (this.activeSessionId) {
      const sessionJSON = this.sessionToJSON(this.activeSessionId);
      console.log({ sessionSize: '✒️✒️✒️', sessionJSON });
    }
  }

  public downloadJSON() {
    if (this.activeSessionId) {
      const size = this.getSessionSize(this.activeSessionId);
      const sessionJSON = this.sessionToJSON(this.activeSessionId);
      const sessionData = { sessionId: this.activeSessionId, size, sessionJSON };
      const sessionJsonBlob = new Blob([JSON.stringify(sessionData, null, '\t')], {
        type: 'application/json',
      });

      const saveSessionJSON = document.createElement('a');
      saveSessionJSON.href = URL.createObjectURL(sessionJsonBlob);
      saveSessionJSON.download = `${this.activeSessionId} clone.json`;
      saveSessionJSON.click();
    }
  }

  public downloadBase64String() {
    if (this.activeSessionId) {
      const base64EncodedSpace = this.sessionToBase64(this.activeSessionId);
      const sessionTextBlob = new Blob([base64EncodedSpace], { type: 'text/plain' });
      const saveSessionText = document.createElement('a');
      saveSessionText.href = URL.createObjectURL(sessionTextBlob);
      saveSessionText.download = `${this.activeSessionId} Base64 clone.txt`;
      saveSessionText.click();
    }
  }

  public downloadBinary() {
    if (!this.activeSessionId) {
      return;
    }
    const binarySpace = this.ySessions.get(this.activeSessionId).getYDocAsBinary();
    const sessionBinaryBlob = new Blob([binarySpace], { type: 'application/octet-stream' });
    const saveSessionBinary = document.createElement('a');
    saveSessionBinary.href = URL.createObjectURL(sessionBinaryBlob);
    saveSessionBinary.download = `${this.activeSessionId} Binary clone`;
    saveSessionBinary.click();
  }

  private connectWebSocket() {
    this.socket?.connect();
  }

  private onReconnectError = (error) => {
    this.telemetry.event('Socket reconnect error', { error });
    this.networkService.wbServer.next(ConnectionStatus.DISCONNECTED);
  };

  private onReconnectAttempt = async (attempt) => {
    const clientNetworkState = await this.networkService.checkInternetAndGcpConnection(attempt);
    this.logAndReportClientNetworkState(clientNetworkState);

    if (this._reconnectAttempt === 3) {
      this.showReconnectingToServerNotification();
      this.networkStatus.next(NetworkStatus.DISCONNECTED);
    }

    this._reconnectAttempt += 1;
    this.telemetry.event('Socket reconnect attempt', { attempt });
  };

  private onReconnect = (attempt) => {
    this.telemetry.event('Socket reconnect successful', { attempt });
    modifiedSetTimeout(() => {
      this.sendSyncWithServer(this.activeSessionId, true);
      this.syncUserPresence('reconnected');
      this._syncSessionDataSubject.next();
    }, 1000);
  };

  private syncUserPresence(event: 'reconnected' | 'disconnected') {
    this.updateUserPresence.next(event);
  }

  private onReconnectFailed = () => {
    this.telemetry.event('reconnect_failed', {});
  };

  private isWsWorkerEnabled(): boolean {
    // Disable Web worker if its not supported by the browser
    if (typeof Worker === 'undefined') {
      return false;
    }
    return this.flagsService.isFlagEnabled(FLAGS.WEBSOCKET_WEB_WORKER);
  }

  public logSpaceLoadingTimeToFS(userIsRefreshing = false) {
    // handle race conditions if animationStartTime is not set by setActiveSession
    if (!this.animationStartTime) {
      return;
    }
    let timeElapsed = (new Date().getTime() - this.animationStartTime) / 1000;
    timeElapsed = Math.round((timeElapsed + Number.EPSILON) * 10) / 10; // round to 1 decimal place
    const spaceIsLoading = this.sharedDataService.dataLoading.getValue();
    if (!spaceIsLoading && !userIsRefreshing) {
      this.telemetry.event('[Spaces load time]', { animationDuration: timeElapsed });
      this.telemetry.endPerfScenario('Spaces load animation');
    } else if (spaceIsLoading && userIsRefreshing) {
      this.telemetry.event('[User waiting time before refreshing]', {
        animationDuration: timeElapsed,
      });
    }
  }

  public getCurrentAnalyticsSession(): void {
    if (!this.activeSessionId) {
      return;
    }
    this.socket?.emit(WebSocketEventType.GET_CURRENT_SESSION, this.activeSessionId);
  }

  private logAndReportClientNetworkState(clientNetworkState: ClientNetworkState) {
    if (
      clientNetworkState.globalEndpointSuccess >= this.CONSISTENCY_THRESHOLD &&
      clientNetworkState.gcpEndpointSuccess >= this.CONSISTENCY_THRESHOLD
    ) {
      clientNetworkState.disconnectionReason = 'Socket server is not reachable';
    }
    if (clientNetworkState.globalEndpointFailure >= this.CONSISTENCY_THRESHOLD) {
      clientNetworkState.disconnectionReason = 'No internet connection';
    } else if (clientNetworkState.gcpEndpointFailure >= this.CONSISTENCY_THRESHOLD) {
      clientNetworkState.disconnectionReason = 'Cannot access GCP';
    }

    this.telemetry.event('Network disconnected state', clientNetworkState);
  }

  /*
   * GCS Bucket CORS Configurations.
   * [
   *   {
   *    "origin": ["https://my.pencilapp.com", "https://canary.pencilapp.com"],
   *    "method": ["GET"],
   *    "responseHeader": ["*"],
   *    "maxAgeSeconds": 3600
   *  }
   * ]
   */
  private getYjsUpdateFromGCS(gcsSpaceDownload: GcsSpaceDownload): Observable<Uint8Array | null> {
    if (!gcsSpaceDownload.exists || !gcsSpaceDownload.gcsUrl) {
      // no backup file exist for the space
      return of(null);
    }
    const headers = {
      headers: new HttpHeaders({ 'Content-Type': 'application/json' }).set(
        ErrorInterceptorSkipHeader,
        '',
      ),
    };

    return this.http
      .get(gcsSpaceDownload.gcsUrl, {
        ...headers,
        responseType: 'arraybuffer',
        reportProgress: true,
        observe: 'events',
      })
      .pipe(
        tap((event) => {
          if (event.type === HttpEventType.DownloadProgress) {
            if (event.total) {
              const percentDone = Math.round((100 * event.loaded) / event.total);
              this._spaceDownloadProgress$.next(percentDone);
            }
          } else if (event.type === HttpEventType.Response) {
            this._spaceDownloadProgress$.next(100);
          }
        }),
      )
      .pipe(
        filter(
          (event): event is HttpResponse<ArrayBuffer> => event.type === HttpEventType.Response,
        ),
      )
      .pipe(map((event) => event.body))
      .pipe(map((fileData) => (fileData ? new Uint8Array(fileData) : null)))
      .pipe(
        catchError((error) => {
          // log the error and Re-throw for further handling
          this.telemetry.event('[failed_to load_yjs_from_GCS]', {
            error,
          });
          return throwError(() => error);
        }),
      );
  }

  public addAiLessonGeneratedContent(lesson: LessonProcessor) {
    if (!this.activeSessionId) {
      return;
    }
    const documentUpdate = this.ySessions.get(this.activeSessionId).insertAiGeneratedLesson(lesson);
    const update = this.createYUpdate(documentUpdate, this.activeSessionId);

    if (!update) {
      return;
    }
    this.sendUpdateToServer('update', update);
  }

  /**
   * Converts a FrameItem to a YCanvasItemObject, item.fresh is set to false
   */
  public convertFrameItemToYCanvasItem(frameItem: FrameItem): YCanvasItemObject {
    const item = frameItem as YCanvasItemObject;
    item.userId = this.userService.userId;
    item.uid = uuidv4();
    item.vuid = uuidv4();
    item.fresh = false;
    return item;
  }

  public convertFabricObjectToYObject(fabricObject: fabric.Object): YObject {
    const yObj = fabricObject as YObject;
    yObj.userId = this.userService.userId;
    yObj.uid = uuidv4();
    yObj.fresh = false;
    yObj.vuid = uuidv4();
    return yObj.toObject(customFabricFields);
  }
}
