import { Observable, ReplaySubject, Subject } from 'rxjs';
import { take, takeUntil } from 'rxjs/operators';
import { Type } from './type';

export class RXJSUtils {
  static filterUndefinedAndNull() {
    return function<T>(source: Observable<T>): Observable<T> {
      return new Observable(subscriber => {
        source.subscribe({
          next(value) {
            if (Type.isDefined_NotNull(value)) {
              subscriber.next(value);
            }
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static filterUndefined() {
    return function<T>(source: Observable<T>): Observable<T> {
      return new Observable(subscriber => {
        source.subscribe({
          next(value) {
            if (Type.isDefined(value)) {
              subscriber.next(value);
            }
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static filterNull() {
    return function<T>(source: Observable<T>): Observable<T> {
      return new Observable(subscriber => {
        source.subscribe({
          next(value) {
            if (Type.notNull(value)) {
              subscriber.next(value);
            }
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static filterFalse() {
    return function(source: Observable<boolean>): Observable<boolean> {
      return new Observable(subscriber => {
        source.subscribe({
          next(value) {
            if (value !== false) {
              subscriber.next(value);
            }
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static filterValue(val: any) {
    return function(source: Observable<boolean>): Observable<boolean> {
      return new Observable(subscriber => {
        source.subscribe({
          next(value) {
            if (value !== val) {
              subscriber.next(value);
            }
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static takeUntilObsTrue(signal: Observable<boolean>) {
    return function(source: Observable<any>): Observable<any> {
      return new Observable(subscriber => {
        const obs = new ReplaySubject<boolean>(1);
        signal.pipe(RXJSUtils.filterUndefinedAndNull(), RXJSUtils.filterFalse(), take(1)).subscribe(obs);
        source.pipe(takeUntil(obs)).subscribe({
          next(value) {
            subscriber.next(value);
          },
          error(error) {
            subscriber.error(error);
          },
          complete() {
            subscriber.complete();
          }
        });
      });
    };
  }

  static bufferUntilObsTrue(signal: Observable<boolean>, options?: { bufferSize?: number, bufferedTime?: number }) {
    const bufferSizeDefined = Type.isDefined(options) && Type.isDefined(options.bufferSize);
    const bufferedTimeDefined = Type.isDefined(options) && Type.isDefined(options.bufferedTime);

    const buffer = new ReplaySubject<any>(
      ((bufferSizeDefined) ?
        options.bufferSize :
        Infinity
      ),
      ((bufferedTimeDefined) ?
        options.bufferedTime :
        Infinity
      ),
    );
    return function(source: Observable<any>): Observable<any> {
      // Buffer items
      source.pipe(RXJSUtils.takeUntilObsTrue(signal)).subscribe((value: any) => {
        buffer.next(value);
      });
      return new Observable(subscriber => {
        // Got a signal, transfer all items, unsubscribe from buffer and subscribe to souce
        signal.pipe(RXJSUtils.filterUndefinedAndNull(), RXJSUtils.filterFalse(), take(1)).subscribe((_: boolean) => {
          buffer.subscribe((value: any) => {
            subscriber.next(value);
          });
          buffer.complete();
          source.subscribe({
            next(value) {
              subscriber.next(value);
            },
            error(error) {
              subscriber.error(error);
            },
            complete() {
              subscriber.complete();
            }
          });
        });
      });
    };
  }
}
