import {from, of, defer, Observable as RxjsObservable} from 'rxjs';
import { map, first, concatMap, skipWhile, switchMap } from 'rxjs/operators';
import {syncStacktrace} from 'auto-trace';
import Pusher from 'pusher-js/dist/web/pusher.min.js';
import { fetchAsObservable } from './fetcher.js';
import canopyUrls from 'canopy-urls!sofe';
import auth from 'cp-client-auth!sofe';
import { rxjsOnlineObservable } from './online-listeners.js'

const PUSHER_KEY =
  canopyUrls.getEnvironment() === canopyUrls.PRODUCTION_ENVIRONMENT
    ? '98ba77c3dc4178f50c4b'
    : 'b41431fba10735c5209f';

let error,
  pusher,
  channels = {};

export function onPusher(type, channelName) {
  return rxjsOnlineObservable.pipe(
    skipWhile(online => !online),
    switchMap(() => {
      return from([channelName])
        .pipe(
          concatMap(channelName =>
            channelName
            ? of(channelName)
            : auth
            .getLoggedInUserAsObservable()
            .pipe(first())
            .pipe(map(loggedInUser => `private-${loggedInUser.id}`)),
          ),
        )
        .pipe(
          concatMap(channelName => {
            if (!pusher) {
              pusher = new Pusher(PUSHER_KEY, {
                authorizer: function(channel, options) {
                  return {
                    authorize: function(socketId, callback) {
                      fetchAsObservable(
                        `${canopyUrls.getAPIUrl()}/pusher-authenticate?socketId=${socketId}&channel=${
                      channel.name
                    }`,
                      ).subscribe(
                        authData => {
                          try {
                            callback(null, authData);
                          } catch (error) {
                            callback(true, error);
                          }
                        },
                        e => {
                          callback(true, e);
                        },
                      );
                    },
                  };
                },
              });
            }

            let channel;
            if (channels[channelName]) {
              channel = channels[channelName];
            } else {
              channel = pusher.subscribe(channelName);
              channels[channelName] = channel;
            }

            return defer(() =>
              RxjsObservable.create(observer => {
                let retry = true;

                if (error) {
                  processError(error);
                }

                channel.bind(type, processPush);
                channel.bind('pusher:subscription_error', processError);

                function processError(e) {
                  error = e;
                  setTimeout(() => observer.error(error), 1000);
                }

                function processPush(data) {
                  observer.next(data);
                }

                return () => {
                  channel.unbind(type, processPush);
                  channel.unbind('pusher:subscription_error', processError);
                };
              }),
            );
          }),
        );
    })
  )
}
