import { Injectable, OnDestroy } from '@angular/core';
import { Centrifuge, Subscription } from 'centrifuge';
import { environment } from 'src/environments/environment';
import { Observable, Subject, filter } from 'rxjs';
import { Actions, concatLatestFrom, ofType } from '@ngrx/effects';
import { Store } from '@ngrx/store';
import { FirmType, MessagingMessage } from '../state/types';
import { AuthActions } from '../state/auth/auth.actions';
import { UserSettingsStorage } from '../storage/user.settings.storage';
import { MessagingActions } from '../state/messaging/messaging.actions';
import { AppState } from '../state/appState';
import { FeatureSelectors } from '../state/feature/feature.selectors';
import { FirmSelectors } from '../state/firm/firm.selectors';

@Injectable({ providedIn: 'root' })
export class MessagingService implements OnDestroy {
  private messages: Subject<MessagingMessage>;
  private channelSubscription: Subscription;
  private client: Centrifuge;

  constructor(
    actions$: Actions,
    private store: Store<AppState>,
  ) {
    this.messages = new Subject();
    actions$.pipe(ofType(AuthActions.resetAuthStateAndStorage)).subscribe(() => this.closeMessaging());
    this.store
      .select(FeatureSelectors.selectFeaturesState)
      .pipe(
        concatLatestFrom(() => this.store.select(FirmSelectors.selectFirm)),
        filter(([, firm]) => Boolean(firm)),
        filter(([features]) => Boolean(features?.initialized) && Boolean(features?.features?.messaging)),
      )
      .subscribe(([{ features }, firm]) => {
        if (!features.messaging) {
          this.closeMessaging();
          return;
        }

        this.initMessaging(firm);
      });
  }

  ngOnDestroy(): void {
    this.closeMessaging();
  }

  initMessaging = (firm: FirmType) => {
    const token = new UserSettingsStorage().loadSetting('jwt');
    // Use WebSocket transport endpoint.
    this.client = new Centrifuge(environment.messagingUrl, {
      token,
      minReconnectDelay: 5000,
      maxReconnectDelay: 30000,
      maxServerPingDelay: 60000,
    });

    // Allocate Subscription to a channel.
    const channel = `${environment.messagingNamespace}:firm-${firm.id}`;
    this.channelSubscription = this.client.newSubscription(channel, { data: token });

    // React on channel real-time publications.
    this.channelSubscription.on('publication', (message: MessagingMessage) => {
      this.messages.next(message);
    });

    this.channelSubscription.on('error', (ctx) => this.store.dispatch(MessagingActions.error({ ctx })));
    this.channelSubscription.on('subscribing', () => this.store.dispatch(MessagingActions.subscribing()));
    this.channelSubscription.on('subscribed', () => this.store.dispatch(MessagingActions.subscribed()));
    this.channelSubscription.on('unsubscribed', () => this.store.dispatch(MessagingActions.unsubscribed()));

    this.client.on('connecting', () => this.store.dispatch(MessagingActions.connecting()));
    this.client.on('connected', () => this.store.dispatch(MessagingActions.connected()));
    this.client.on('disconnected', () => this.store.dispatch(MessagingActions.disconnected()));
    this.client.on('error', (ctx) => this.store.dispatch(MessagingActions.error({ ctx })));

    // Trigger subscribe process.
    this.channelSubscription.subscribe();

    // Trigger actual connection establishement.
    this.client.connect();
  };

  get messages$(): Observable<MessagingMessage> {
    return this.messages.asObservable();
  }

  private closeMessaging = () => {
    this.channelSubscription?.unsubscribe();
    this.client?.disconnect();
  };
}
