import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { filter, take } from 'rxjs/operators';

/**
 * Cache the output of the source observable, providing a mechanism to trigger a new fetch using the reset$
 * @param reset$ observable that will be subscribed to internally for triggering cache resets
 * observable will not be subscribed to until the first emit from the reset$ observable
 */
export function resettableCache(reset$: Observable<void>) {
  return <T>(source: Observable<T>): Observable<T> => {
    let fetchInProgress = false;

    // assign two internal subject for the plumbing
    // the result subject i=s a behaviour subject used to store the cached value
    const resultSubject = new BehaviorSubject<T>(null);

    // the mechanism subject is used to relay (but not store) the underlying error and complete events
    const mechanismSubject = new Subject<T>();

    // whenever reset is called, clear the cached value and perform a fetch
    reset$.subscribe({
      next() {
        resultSubject.next(null);
        // eslint-disable-next-line no-use-before-define
        fetch();
      }
    });

    // handle subscribers to the output observable
    return new Observable(subscriber => {

      // if, when a new subscriber joins, the current value is null, trigger a fetch
      if (resultSubject.getValue() === null) {
        // eslint-disable-next-line no-use-before-define
        fetch();
      }

      // whenever there is a new subscriber, subscribe to the latest internal result, and pass it directly through to the subscriber
      const outgoingSubscription = resultSubject.subscribe({
        next(value) {
          subscriber.next(value);
        }
      });

      // also subscribe to the mechanism subject, passing through the error and complete events
      const outgoingMechanismSubscription = mechanismSubject.subscribe({
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      });

      // return the unsubscribe handler
      return () => {
        outgoingMechanismSubscription.unsubscribe();
        outgoingSubscription.unsubscribe();
      };

      // finally, ensure that null is filtered out
    }).pipe(filter(output => output !== null)) as Observable<T>;

    // define the internal fetch mechanism
    function fetch() {
      if (!fetchInProgress) {
        fetchInProgress = true;
        // TODO: wrap in a single-run wrapper

        // immediately wipe the output value so that any new subscribers are forced to wait for the new value
        resultSubject.next(null);

        // subscribe once to the source observable
        source.pipe(take(1)).subscribe({
          next(value) {
            // plumb the result into the resultSubject
            resultSubject.next(value);
          },
          error(error) {
            // plumb the errors through to the mechanism subject
            resultSubject.next(null);
            mechanismSubject.error(error);
            fetchInProgress = false;
          },
          complete() {
            // plumb the completes through to the mechanism subject
            mechanismSubject.complete();
            fetchInProgress = false;
          }
        });
      }
    }
  };
}
