import { Injectable, NgZone } from '@angular/core';
import {
  Observable,
  throttleTime,
  bufferTime,
  bufferWhen,
  filter,
  EMPTY,
  ObservableInput,
  observeOn,
  ThrottleConfig,
  asyncScheduler,
  MonoTypeOperatorFunction,
  debounceTime,
  tap,
  Subject,
  Subscription,
  finalize,
  defer,
} from 'rxjs';
import { inZoneScheduler, modifiedTimer, outsideZoneScheduler } from './ZoneUtils';

class BufferTime {
  private source$: Observable<any>;
  private timeoutDuration: number;
  private skipEmpty: boolean;

  private buffer: any[] = [];
  private valuesSubject = new Subject<any[]>();

  private timeoutStarted = false;
  private sourceSubscription?: Subscription;

  constructor(source$: Observable<any>, timeoutDuration: number, skipEmpty = true) {
    this.source$ = source$;
    this.timeoutDuration = timeoutDuration;
    this.skipEmpty = skipEmpty;
    this.setupBuffer();
  }

  private setupBuffer() {
    this.sourceSubscription = this.source$
      .pipe(
        tap((value) => {
          this.buffer.push(value);
          if (!this.timeoutStarted) {
            this.startTimeout();
          }
        }),
      )
      .subscribe();
  }

  private startTimeout() {
    if (this.timeoutStarted) {
      return;
    }
    this.timeoutStarted = true;
    modifiedTimer(this.timeoutDuration, true).subscribe(() => {
      this.emitBuffer();
    });
  }

  private emitBuffer() {
    if (!this.skipEmpty || this.buffer.length > 0) {
      this.valuesSubject.next(this.buffer);
      this.buffer = [];
    }
    this.timeoutStarted = false;
  }

  get bufferedValues$(): Observable<any[]> {
    return this.valuesSubject.pipe(
      finalize(() => {
        this.sourceSubscription?.unsubscribe();
      }),
    );
  }
}

@Injectable()
export class ObservableUtils {
  private readonly inZoneScheduler = inZoneScheduler(this.zone);
  private readonly outsideZoneScheduler = outsideZoneScheduler(this.zone);

  constructor(private zone: NgZone) {}

  // Returns a new observable that emits a value from the source observable one every `time` ms.
  // See: https://rxjs.dev/api/operators/throttleTime (with default throttle config)
  public throttledObservableOf = <T>(
    source: Observable<T> | undefined,
    time: number,
    config?: ThrottleConfig,
  ): Observable<T> => {
    if (!source) {
      return EMPTY;
    }
    let throttled: Observable<T>;
    if (NgZone.isInAngularZone()) {
      // Timeouts used by `throttleTime` should be run "outside" zone
      throttled = source.pipe(throttleTime(time, this.outsideZoneScheduler, config));
      // Notifications to subscribers should be run "inside" zone
      throttled = throttled.pipe(observeOn(this.inZoneScheduler));
    } else {
      throttled = source.pipe(throttleTime(time, asyncScheduler, config));
    }
    return throttled;
  };

  // Returns a new observable that collects all events from from the source observable over `time` ms, and emits them together.
  // The process then repeats.
  // See: https://rxjs.dev/api/operators/bufferTime
  public bufferedObservableOf = <T>(
    source: Observable<T> | undefined,
    time: number,
    skipEmpty = true,
  ): Observable<T[]> => {
    if (!source) {
      return EMPTY;
    }
    return defer(() => new BufferTime(source, time, skipEmpty).bufferedValues$);
  };

  // Returns a new observable that collects all events from from the source observable over `time` ms, or until
  // `maxBufferSize` is reached (whichever is first), and emits them together.
  // The process then repeats.
  // See: https://rxjs.dev/api/operators/bufferTime
  public bufferedObservableMaxBufferSizeOf = <T>(
    source: Observable<T> | undefined,
    time: number,
    maxBufferSize: number,
    skipEmpty = true,
  ): Observable<T[]> => {
    if (!source) {
      return EMPTY;
    }
    let buffered: Observable<T[]>;
    if (NgZone.isInAngularZone()) {
      // Timeouts used by `bufferTime` should be run "outside" zone
      buffered = source.pipe(bufferTime(time, time, maxBufferSize, this.outsideZoneScheduler));
    } else {
      buffered = source.pipe(bufferTime(time, time, maxBufferSize));
    }
    if (skipEmpty) {
      buffered = buffered.pipe(filter((value) => value.length > 0));
    }
    if (NgZone.isInAngularZone()) {
      // Notifications to subscribers should be run "inside" zone
      buffered = buffered.pipe(observeOn(this.inZoneScheduler));
    }
    return buffered;
  };

  // Returns a new observable that collects all events from from the source observable until the `when` observable emits,
  // and emits them together. The process then repeats.
  // See: https://rxjs.dev/api/operators/bufferTime
  public bufferedObservableWhenOf = <T>(
    source: Observable<T> | undefined,
    when: () => ObservableInput<any>, // specificies when to close, emit, and reset the buffer
    skipEmpty = true,
  ): Observable<T[]> => {
    if (!source) {
      return EMPTY;
    }
    let buffered = source.pipe(bufferWhen(when));
    if (skipEmpty) {
      buffered = buffered.pipe(filter((value) => value.length > 0));
    }
    return buffered;
  };

  public debounceTime<T>(dueTime: number): MonoTypeOperatorFunction<T> {
    return (source) => {
      if (NgZone.isInAngularZone()) {
        return source.pipe(debounceTime(dueTime, this.outsideZoneScheduler));
      } else {
        return source.pipe(debounceTime(dueTime));
      }
    };
  }
}
