import { getLogger } from '../../../utils/logger';
import { getEventBus } from '../../core/event-bus/event-bus';
import { BackgroundFetchEmitStreamChunkPayload } from '../types';

import { STREAM_CHUNK_EVENT, STREAM_ERROR_EVENT } from './stream-events';

export const createReadableStream = (requestId: string, abortSignal?: AbortSignal | null): ReadableStream => {
    let streamController: ReadableStreamDefaultController | undefined;
    let streamClosed = false;
    const logger = getLogger('background-fetch-handle-stream');

    const handleAbortSignal = () => {
        if (streamController && !streamClosed) {
            try {
                streamController.close();
            } catch {}
        }

        streamClosed = true;
    };

    if (abortSignal) {
        abortSignal.addEventListener('abort', handleAbortSignal);
    }

    const handleStream = ({ data, meta }: BackgroundFetchEmitStreamChunkPayload) => {
        if (streamController && meta?.requestId === requestId) {
            const { value, done } = data;

            if (done || abortSignal?.aborted) {
                try {
                    streamController.close();
                } catch {}

                streamClosed = true;
            } else {
                const uint8array = Uint8Array.from((value ?? '').split(',').map(Number));

                streamController.enqueue(uint8array);
            }
        }
    };

    const handleStreamError = ({ meta }: { meta: { requestId: string } }) => {
        if (streamController && meta?.requestId === requestId) {
            if (!streamClosed) {
                try {
                    streamController.error(new Error('Stream error'));
                } catch {}

                streamClosed = true;
            }
        }
    };

    const eventBus = getEventBus();

    const config: UnderlyingDefaultSource = {
        start(controller) {
            streamController = controller;

            eventBus.addListener(STREAM_CHUNK_EVENT, handleStream);
            eventBus.addListener(STREAM_ERROR_EVENT, handleStreamError);
        },
        cancel(reason: unknown) {
            streamClosed = true;
            logger.log(`Cancel stream handle. request id [${requestId}], reason: `, reason ?? 'unknown');

            eventBus.removeListener(STREAM_CHUNK_EVENT, handleStream);
            eventBus.removeListener(STREAM_ERROR_EVENT, handleStreamError);

            if (abortSignal) {
                abortSignal.removeEventListener('abort', handleAbortSignal);
            }
        },
    };

    return new ReadableStream(config);
};
