import { AnyMessage, Message, MethodInfo, PartialMessage, ServiceType } from '@bufbuild/protobuf';
import {
  ContextValues,
  PromiseClient,
  StreamResponse,
  Transport,
  UnaryResponse,
  createPromiseClient,
} from '@connectrpc/connect';
import { MethodDescriptor, Rpc } from './Rpc';

export type ClientClass<T extends ServiceType> = new (channel: Rpc) => PromiseClient<T>;

/**
 * @param generatedImplementation The codegenerated service client implementation.
 */
export function clientFactory<T extends ServiceType>(generatedImplementation: T): ClientClass<T> {
  const clientClass = class {
    constructor(channel: Rpc) {
      const client = createPromiseClient(generatedImplementation, new TransportAdaptor(channel));
      Object.assign(this, client);
    }
  };

  return clientClass as ClientClass<T>;
}

class TransportAdaptor implements Transport {
  constructor(private readonly channel: Rpc) {}

  async unary<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
    service: ServiceType,
    method: MethodInfo<I, O>,
    signal: AbortSignal | undefined,
    _timeoutMs: number | undefined,
    header: HeadersInit | undefined,
    input: PartialMessage<I>,
    _contextValues?: ContextValues | undefined
  ): Promise<UnaryResponse<I, O>> {
    const methodDescriptor: MethodDescriptor = { service, method };
    const metadata = new Headers(header);
    const abortController = new AbortController();
    try {
      const responseMessage = await this.channel.unary(
        methodDescriptor,
        input,
        metadata,
        anySignal([abortController.signal, signal])
      );

      const response: UnaryResponse<I, O> = {
        service,
        method,
        stream: false,
        message: responseMessage as O,
        header: undefined as any,
        trailer: undefined as any,
      };

      return response;
    } finally {
      // must call abort() to clean up the abort controller
      // otherwise there'll be a memory leak.
      // this doesn't actually abort anything though, the unary
      // request has already completed by this point.
      abortController.abort();
    }
  }

  stream<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
    _service: ServiceType,
    _method: MethodInfo<I, O>,
    _signal: AbortSignal | undefined,
    _timeoutMs: number | undefined,
    _header: HeadersInit | undefined,
    _input: AsyncIterable<PartialMessage<I>>,
    _contextValues?: ContextValues | undefined
  ): Promise<StreamResponse<I, O>> {
    throw new Error('Method not implemented.');
  }
}

function anySignal(signals: Iterable<AbortSignal | undefined>): AbortSignal {
  const controller = new AbortController();

  for (const signal of signals) {
    if (signal) {
      if (signal.aborted) {
        controller.abort(signal.reason);
        return signal;
      }

      signal.addEventListener('abort', () => controller.abort(signal.reason), {
        signal: controller.signal,
      });
    }
  }

  return controller.signal;
}
