import { Observable, Subject, Subscription, BehaviorSubject } from "rxjs";
import { filter, take, switchMap, delayWhen, mapTo } from "rxjs/operators";
import { AuthService } from "projects/portal-modules/src/lib/findex-auth";
import { Inject, Injectable } from "@angular/core";
import { EnvironmentSpecificConfig } from "../../environment/environment.common";
import { ENVIRONMENT } from "src/app/injection-token";

@Injectable({ providedIn: "root" })
export class WebsocketService {
    private reconnectDelay = 1000;
    private reconnect = true;
    private socket: WebSocket;
    private socketState = new BehaviorSubject<boolean>(false);
    private subject = new Subject<unknown>();
    private sendQueue = new Subject<unknown>();
    private sendSubscription: Subscription;
    private websocketEndpoint: string;

    constructor(private authService: AuthService, @Inject(ENVIRONMENT) environment: EnvironmentSpecificConfig) {
        this.websocketEndpoint = environment.threadsWebsockets;
        void this.createSocket();

        this.sendSubscription = this.sendQueue
            .pipe(delayWhen(() => this.waitForSocket()))
            .subscribe((message) => this.socketSend(message));
    }

    getEvents(): Observable<unknown> {
        return this.subject;
    }

    send(action: string, data: unknown): void {
        if (!action) {
            return;
        }
        this.sendQueue.next({ action, data });
    }

    close(): void {
        this.subject.complete();
        this.socketState.complete();
        this.sendQueue.complete();

        this.reconnect = false;

        if (this.sendSubscription) {
            this.sendSubscription.unsubscribe();
        }

        if (this.socket) {
            this.socket.close();
        }
    }

    private waitForSocket(): Observable<void> {
        return this.socketState.pipe(
            filter((state) => state === true),
            mapTo<boolean, void>(null),
        );
    }

    private async waitForAuthHeaders(): Promise<Record<string, string>> {
        const headers = await this.authService
            .getValidUser()
            .pipe(
                take(1),
                switchMap(() => this.authService.getVerifiedHttpHeaders()),
            )
            .toPromise();

        return headers;
    }

    private async createSocket(): Promise<void> {
        const headers = await this.waitForAuthHeaders();
        if (!headers) {
            return;
        }
        const queryString = Object.entries(headers)
            .map(([key, val]) => `${key}=${encodeURIComponent(val)}`)
            .join("&");

        const url = `${this.websocketEndpoint}?${queryString}`;
        this.socket = new WebSocket(url);

        this.socket.onopen = this.socketOpened;
        this.socket.onmessage = this.socketMessage;
        this.socket.onerror = this.socketError;
        this.socket.onclose = this.socketClosed;
    }

    private socketSend(message: unknown): void {
        console.info("Sending websocket message", message);
        this.socket.send(JSON.stringify(message));
    }

    private socketClosed = (event: CloseEvent): void => {
        console.info("Socket closed", event.code);
        this.socketState.next(false);

        if (!this.reconnect) {
            return;
        }
        setTimeout(() => this.createSocket(), this.reconnectDelay);
    };

    private socketOpened = (_event: Event): void => {
        console.info("Socket opened");
        this.socketState.next(true);
    };

    private socketMessage = (event: MessageEvent): void => {
        try {
            if (event.data && typeof event.data === "string") {
                const data = JSON.parse(event.data);
                this.subject.next(data);
            }
        } catch (err) {
            console.error("Error processing event", event, err);
        }
    };

    private socketError = (event: Event): void => {
        console.error("socket error", event);
        this.socket.close();
    };
}
