import { useGitpodAPI } from "@/hooks/use-gitpod-api";
import { handleAutomationEvents } from "@/queries/automations-queries";
import { handleEnvironmentClassEvent, handleEnvironmentEvent } from "@/queries/environment-queries";
import { handleProjectEvent } from "@/queries/project-queries";
import { handleRunnerEvent } from "@/queries/runner-queries";
import { handleUserPreferenceEvent } from "@/queries/user-preferences-queries";
import { useAuthenticatedUser } from "@/queries/user-queries";
import { Code, ConnectError } from "@connectrpc/connect";
import { useQueryClient } from "@tanstack/react-query";
import { ResourceType } from "gitpod-next-api/gitpod/v1/event_pb";
import { useEffect, useRef } from "react";

// TODO(at) we need proper mocs for api.eventService.watchEvents
const enableEvents = process.env.NODE_ENV !== "test";

type Scope =
    | {
          case: "environmentId";
          value: string;
      }
    | {
          case: "organization";
          value: true;
      };

export const useStreamEvents = (scope: Scope = { case: "organization", value: true }) => {
    const api = useGitpodAPI();
    const { data: user } = useAuthenticatedUser();
    const client = useQueryClient();

    const abortControllerRef = useRef(new AbortController());
    useEffect(() => {
        if (!enableEvents) {
            return;
        }
        if (!client || !user) {
            return;
        }
        if (scope.case === "environmentId" && !scope.value) {
            return;
        }

        const abortController = new AbortController();
        abortControllerRef.current = abortController;
        runUntilAborted(abortController, async () => {
            console.log("watching events for " + (scope.case === "environmentId" ? "environment" : "organization"));
            for await (const response of api.eventService.watchEvents(
                {
                    scope,
                },
                {
                    signal: abortController.signal,
                    timeoutMs: -1, // disable default timeout
                },
            )) {
                if (abortController.signal.aborted) {
                    return;
                }

                switch (response.resourceType) {
                    case ResourceType.ENVIRONMENT:
                        await handleEnvironmentEvent(api, client, response);
                        break;
                    case ResourceType.RUNNER:
                        await handleRunnerEvent(api, client, response);
                        break;
                    case ResourceType.ENVIRONMENT_CLASS:
                        await handleEnvironmentClassEvent(client);
                        break;
                    case ResourceType.PROJECT:
                        await handleProjectEvent(api, client, response);
                        break;
                    case ResourceType.TASK:
                    case ResourceType.SERVICE:
                    case ResourceType.TASK_EXECUTION: {
                        const environmentId = scope.case === "environmentId" ? scope.value : undefined;
                        if (!environmentId) {
                            console.error("Received automation event outside of environment scope", response);
                            break;
                        }
                        await handleAutomationEvents(api, client, response, environmentId);
                        break;
                    }
                    case ResourceType.USER_PREFERENCE: {
                        await handleUserPreferenceEvent(api, client, response);
                        break;
                    }
                }
            }
        }).catch((error) => {
            if (abortController.signal.aborted) {
                return;
            }
            console.error("Error while streaming events", error);
        });

        return () => {
            abortController.abort();
        };
    }, [api, client, scope, user]);

    return {
        unsubscribe: () => {
            if (abortControllerRef.current) {
                abortControllerRef.current.abort();
            }
        },
    };
};

async function runUntilAborted(abortcontroller: AbortController, asyncFunction: () => Promise<void>) {
    while (!abortcontroller.signal.aborted) {
        try {
            await asyncFunction();
        } catch (error) {
            if (error instanceof ConnectError) {
                if (
                    error.code === Code.NotFound ||
                    error.code === Code.PermissionDenied ||
                    error.code === Code.Unauthenticated ||
                    error.code === Code.Internal
                ) {
                    abortcontroller.abort();
                    return;
                }
            }
            if (abortcontroller.signal.aborted) {
                return;
            }
            // let's wait a sec before retrying
            await new Promise((resolve) => setTimeout(resolve, 1000));
        }
    }
}
