import { Observable, of } from 'rxjs';
import { catchError, filter, take, toArray } from 'rxjs/operators';
import { fromFetch } from 'rxjs/fetch';
// import { tag } from "rxjs-spy/operators/tag";
import ndjson from 'ndjson';

/**
 * @param  {Response} resp
 * @return {RxJS Observable}
 */
function selector (resp) {
  return Observable.create(observer => {
    const decoder = new TextDecoder();
    const reader = resp.body.getReader();
    const parse$ = ndjson.parse();
    let subscribed = true;

    // map our NodeJS readable stream to Rx Observable
    parse$.on('data', observer.next.bind(observer));
    parse$.on('finish', observer.complete.bind(observer));
    parse$.on('error', observer.error.bind(observer));

    function parse (reader) {
      return reader.read().then(({ value, done }) => {
        if (done || !subscribed) return parse$.end();

        // write the decoded text values to ndjson parser
        parse$.write(decoder.decode(value));

        // recursively read
        return parse(reader);
      });
    }

    // start parsing our response stream
    parse(reader)
      .finally(() => reader.cancel())
      .catch(console.warn);

    return () => {
      subscribed = false;
    }
  });
}

/**
 * @param  {array} fetchArgs args to supply to a fetch request
 * @return {RxJS.Observable}
 */
export default function streamMovieList (url, fetchOptions = {}, limit = Infinity) {
  return fromFetch(url, { selector, ...fetchOptions }).pipe(
    catchError(e => {
      console.error(e);
      return of(null);
    }),
    filter(m => !!m),
    take(limit),
    toArray(),
  );
}
