import { Injectable, NgZone } from '@angular/core';
import { ReconnectingWebSocket } from './reconnecting.websocket';
import { RSIConfig } from './rsi.config';
import { BehaviorSubject, Observable } from 'rxjs';
import { filter, map } from 'rxjs/operators';

@Injectable()
export class RSISocket {
    private ws: ReconnectingWebSocket;
    private messages: BehaviorSubject<any>;
    public status: BehaviorSubject<any>;
    private connectPromise: Promise<ReconnectingWebSocket>;
    private subscriptions: Array<string> = [];

    constructor(private zone: NgZone, private config: RSIConfig) {
        this.messages = new BehaviorSubject<any>({});
        this.status = new BehaviorSubject<any>({});
        this.connect();
    }

    public close() {
        this.ws.close();
    }

    public connect() {
        if (this.ws) {
            this.ws.close();
        }

        return (this.connectPromise = new Promise((resolve, reject) => {
            this.ws = new ReconnectingWebSocket(this.config.getSocketPath());
            this.ws.onopen = event => {
                this.zone.run(() => {
                    this.reconnectListeners();
                    this.status.next({ type: 'open', data: event });
                    resolve(this.ws);
                });
            };

            this.ws.onclose = (ev: CloseEvent) => {
                this.status.next({ type: 'close', data: ev });
            };

            this.ws.onmessage = event => {
                this.zone.run(() => {
                    this.messages.next(JSON.parse(event.data));
                });
            };

            this.ws.onerror = errorEvent => {
                this.status.next({ type: 'error', data: errorEvent });
            };
        }));
    }

    reconnectListeners() {
        this.subscriptions.forEach((data: any) => {

          const subscribeConfig: any = {
            type: 'subscribe',
            event: data.destination
          };

          if (data.hasOwnProperty('interval')) {
            subscribeConfig.interval = data.interval;
          }

          if (data.hasOwnProperty('updatelimit')) {
            subscribeConfig.updatelimit = data.updatelimit;
          }

          this.ws.send(JSON.stringify(subscribeConfig));

        });
    }

    public subscribe(data: any) {
        const subscribeConfig: any = {
            type: 'subscribe',
            event: data.destination
        };

        if (data.hasOwnProperty('interval')) {
            subscribeConfig.interval = data.interval;
        }

        if (data.hasOwnProperty('updatelimit')) {
            subscribeConfig.updatelimit = data.updatelimit;
        }

        this.connectPromise.then(() => {
            this.ws.send(JSON.stringify(subscribeConfig));
          this.subscriptions.push(data);

        });
    }

    getDataMessages(destination: string) {
        return this.getMessages(destination).pipe(
          filter(data => data.type === 'data'),
          map(message => message.data)
        );

    }

    getMessages(destination: string): Observable<any> {
        const filterByDestination = message => {
            if (message && message.event) {
                let event: string = message.event;
                // There are some inconsistencies in the event property sometime it has a # instead of an / at the end!!!
                while (event.endsWith('/') || event.endsWith('#')) {
                    event = event.substr(0, event.length - 1);
                }
                while (destination.endsWith('/') || destination.endsWith('/')) {
                    destination = destination.substr(0, destination.length - 1);
                }
                return event.endsWith(destination);
            }
            return false;
        };

        return this.messages.pipe(
          filter(filterByDestination)
        );
    }

    public unsubscribe(destination: string) {
        this.ws.send(JSON.stringify({ type: 'unsubscribe', event: destination }));
    }
}
