I wanted to use channel like Golang:)
ref: queue.ts
import { Queue } from "queue";
import { sleep } from "bun";
type Config = {
/**
* @description (ms)
*/
checkInterval: number
/**
* @description buffer size of queue
*/
capacity: number
/**
* @description timeout to wait for reading and writing(ms)
*/
timeout: number
/**
* @description timeout to wait for reading(ms)
*/
readTimeout?: number
/**
* @description timeout to wait for writing(ms)
*/
writeTimeout?: number
}
const Config = {
padding(config: Partial<Config>): Config {
return {
checkInterval: config?.checkInterval ?? 300,
capacity: config?.capacity ?? 1,
timeout: config?.timeout ?? 1000,
readTimeout: config?.readTimeout,
writeTimeout: config?.writeTimeout,
}
},
}
export interface Sender<T> {
send(v: T): Promise<void>;
}
export interface Receiver<T> {
receive(): Promise<T>;
}
/***
* @todo support mutex
*/
export class Chan<T> implements Sender<T>, Receiver<T> {
private readonly config: Config;
private readonly buf: Queue<T>;
public constructor(
buf: T[] = [],
config: Partial<Config> = {},
) {
this.buf = new Queue(buf);
this.config = Config.padding(config);
}
private get readTimeout(): number {
return this.config.readTimeout ?? this.config.timeout;
}
private get writeTimeout(): number {
return this.config.writeTimeout ?? this.config.timeout;
}
private checkReadTimeout(from: number): boolean {
return Date.now() - from > this.readTimeout;
}
private checkWriteTimeout(from: number): boolean {
return Date.now() - from > this.writeTimeout;
}
public send(v: T): Promise<void> {
return new Promise(async (resolve, reject) => {
const from = Date.now()
while (this.buf.size() >= this.config.capacity) {
await sleep(this.config.checkInterval);
if (this.checkWriteTimeout(from)) {
return reject(new Error("timeout"));
}
}
this.buf.enqueue(v);
resolve();
})
}
public async receive(): Promise<T> {
return new Promise<T>(async (resolve, reject) => {
const from = Date.now();
while (this.buf.size() <= 0) {
await sleep(this.config.checkInterval);
if (this.checkReadTimeout(from)) {
return reject(new Error("timeout"));
}
}
const v = this.buf.dequeue()!;
resolve(v);
})
}
}
Top comments (0)