DEV Community

Cover image for Scaling Zensearch's capabilities to query the whole database
FDiaz
FDiaz

Posted on

Scaling Zensearch's capabilities to query the whole database

Previously I've been able to crawl and index web pages for my search engine without a problem, until my database grew more than what RabbitMQ's message queue was capable of holding. If a message in a message queue exceeds its default size, RabbitMQ will throw an error and panic, I could change the default size but that would not scale if my database grows, so in order for users to crawl web pages without having to worry about the message broker crashing.

Creating Segments

I've implemented a function to create segments with a maximum segment size or MSS from the same principles from TCP when creating segments, the segment contains an 8 byte header where each 4 byte of the 8 byte header is the sequence number and the total segment count, and the rest of the body is the payload of the segmented database.

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

Enter fullscreen mode Exit fullscreen mode

Parsing incoming segments

This method of creating small segments of a large dataset would help scale the database query even if the database grows.

Now how does the search engine parse the buffer and transform each segments into a web page array?

Reading from segment buffers

First extract the segment header, since the header contains 2 properties namely Sequence number and Total Segments,

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

Enter fullscreen mode Exit fullscreen mode

Handling retransmission and requeuing of segments

we'll use the Former for retransmission/requeuing of the segments, so if the expected sequence number is not what was received then re-queue every segment starting from the current one.

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }
Enter fullscreen mode Exit fullscreen mode

Appending segment payloads

The Latter will be used for breaking out of listening to the producer (database service) if the total number of segments received by the search engine is equal to the length of the total segments that is to be sent by the database service then break out and parse the aggregated segment buffer, if not the keep listening and append the segment payload buffer to a web page buffer to hold bytes from all of the incoming segments.

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }
Enter fullscreen mode Exit fullscreen mode

I use vim btw

Thank you for coming to my ted talk, I will be implementing more features and fixes for zensearch.

Top comments (0)