DEV Community

CoffeeBeagle
CoffeeBeagle

Posted on

TypeScript Channel like go

I wanted to use channel like Golang:)

ref: queue.ts

import { Queue } from "queue";
import { sleep } from "bun";

type Config = {
  /**
    * @description (ms)
    */
  checkInterval: number

  /**
    * @description buffer size of queue
    */
  capacity: number

  /**
    * @description timeout to wait for reading and writing(ms)
    */
  timeout: number

  /**
    * @description timeout to wait for reading(ms)
    */
  readTimeout?: number

  /**
    * @description timeout to wait for writing(ms)
    */
  writeTimeout?: number
}

const Config = {
  padding(config: Partial<Config>): Config {
    return {
      checkInterval: config?.checkInterval ?? 300,
      capacity: config?.capacity ?? 1,
      timeout: config?.timeout ?? 1000,
      readTimeout: config?.readTimeout,
      writeTimeout: config?.writeTimeout,
    }
  },
}


export interface Sender<T> {
  send(v: T): Promise<void>;
}

export interface Receiver<T> {
  receive(): Promise<T>;
}

/***
  * @todo support mutex
  */
export class Chan<T> implements Sender<T>, Receiver<T> {
  private readonly config: Config;
  private readonly buf: Queue<T>;
  public constructor(
    buf: T[] = [],
    config: Partial<Config> = {},
  ) {
    this.buf = new Queue(buf);
    this.config = Config.padding(config);
  }

  private get readTimeout(): number {
    return this.config.readTimeout ?? this.config.timeout;
  }
  private get writeTimeout(): number {
    return this.config.writeTimeout ?? this.config.timeout;
  }
  private checkReadTimeout(from: number): boolean {
    return Date.now() - from > this.readTimeout;
  }
  private checkWriteTimeout(from: number): boolean {
    return Date.now() - from > this.writeTimeout;
  }

  public send(v: T): Promise<void> {
    return new Promise(async (resolve, reject) => {
      const from = Date.now()
      while (this.buf.size() >= this.config.capacity) {
        await sleep(this.config.checkInterval);
        if (this.checkWriteTimeout(from)) {
          return reject(new Error("timeout"));
        }
      }
      this.buf.enqueue(v);
      resolve();
    })
  }

  public async receive(): Promise<T> {
    return new Promise<T>(async (resolve, reject) => {
      const from = Date.now();
      while (this.buf.size() <= 0) {
        await sleep(this.config.checkInterval);

        if (this.checkReadTimeout(from)) {
          return reject(new Error("timeout"));
        }
      }
      const v = this.buf.dequeue()!;
      resolve(v);
    })
  }
}
Enter fullscreen mode Exit fullscreen mode

Gist

Top comments (0)