DEV Community

Cover image for Handling Sharded Data in Distributed Systems: A Deep Dive into Joins, Broadcasts, and Query Optimization
Piyush Chauhan
Piyush Chauhan

Posted on

Handling Sharded Data in Distributed Systems: A Deep Dive into Joins, Broadcasts, and Query Optimization

In modern distributed databases, the need for scaling data horizontally has led to the widespread adoption of sharding. While sharding helps manage large datasets across multiple nodes, it introduces challenges, particularly when performing joins and ensuring efficient data retrieval. In this article, we explore various concepts and techniques that address these challenges, particularly focusing on broadcast joins, shard key alignment, and distributed query engines like Presto and BigQuery. Additionally, we demonstrate how to handle these problems in real-world applications using Node.js and Express.


Sharding Example in Node.js with Express.js

Here’s how you can implement sharding in PostgreSQL using Node.js and Express.js.

PostgreSQL Sharding Example

Using Citus or manual logical sharding with Node.js:

Example with Logical Sharding

  1. Setup Tables for Shards:
    Use tables for shards (user_data on shard1 and user_data on shard2).

  2. Create an Express.js API:
    Distribute queries based on a shard key (e.g., user_id).

   const express = require('express');
   const { Pool } = require('pg');

   const poolShard1 = new Pool({ connectionString: 'postgresql://localhost/shard1' });
   const poolShard2 = new Pool({ connectionString: 'postgresql://localhost/shard2' });

   const app = express();
   app.use(express.json());

   const getShardPool = (userId) => (userId % 2 === 0 ? poolShard1 : poolShard2);

   app.post('/user', async (req, res) => {
       const { userId, data } = req.body;
       const pool = getShardPool(userId);
       try {
           await pool.query('INSERT INTO user_data (user_id, data) VALUES ($1, $2)', [userId, data]);
           res.status(200).send('User added successfully');
       } catch (err) {
           console.error(err);
           res.status(500).send('Error inserting user');
       }
   });

   app.get('/user/:userId', async (req, res) => {
       const userId = parseInt(req.params.userId, 10);
       const pool = getShardPool(userId);
       try {
           const result = await pool.query('SELECT * FROM user_data WHERE user_id = $1', [userId]);
           res.status(200).json(result.rows);
       } catch (err) {
           console.error(err);
           res.status(500).send('Error retrieving user');
       }
   });

   app.listen(3000, () => console.log('Server running on port 3000'));
Enter fullscreen mode Exit fullscreen mode

1. Sharding in Distributed Databases

Sharding is the process of horizontally partitioning data across multiple database instances, or shards, to improve performance, scalability, and availability. Sharding is often necessary when a single database instance cannot handle the volume of data or traffic.

Sharding Strategies:

  • Range-based Sharding: Data is distributed across shards based on a key's range, e.g., partitioning orders by order_date.
  • Hash-based Sharding: Data is hashed by a shard key (e.g., user_id) to distribute the data evenly across shards.
  • Directory-based Sharding: A central directory keeps track of where data resides in the system.

However, when related tables are sharded on different keys, or when a table requires a join with another table across multiple shards, performance can degrade due to the need for scatter-gather operations. This is where understanding broadcast joins and shard key alignment becomes crucial.


2. Challenges with Joins in Sharded Systems

When data resides in different shards, performing joins between those shards can be complex. Here's a breakdown of the common challenges:

1. Shard Key Misalignment:

In many systems, tables are sharded on different keys. For example:

  • users table might be sharded by user_id.
  • orders table might be sharded by region.

When performing a join (e.g., orders.user_id = users.user_id), the system needs to fetch data from multiple shards because the relevant records may not reside in the same shard.

2. Scatter-Gather Joins:

In a scatter-gather join, the system must:

  • Send requests to all shards holding relevant data.
  • Aggregate results across shards. This can significantly degrade performance, especially when data is spread out over many shards.

3. Broadcast Joins:

A broadcast join occurs when one of the tables being joined is small enough to be broadcast to all shards. In this case:

  • The small table (e.g., users) is replicated across all nodes where the larger, sharded table (e.g., orders) resides.
  • Each node can then join its local data with the broadcasted data, avoiding the need for cross-shard communication.

3. Using Distributed Query Engines for Sharded Data

Distributed query engines like Presto and BigQuery are designed to handle sharded data and join queries efficiently across distributed systems.

Presto/Trino:

Presto is a distributed SQL query engine designed for querying large datasets across heterogeneous data sources (e.g., relational databases, NoSQL databases, data lakes). Presto performs joins across distributed data sources and can optimize queries by minimizing data movement between nodes.

Example Use Case: Joining Sharded Data with Presto

In a scenario where orders is sharded by region and users is sharded by user_id, Presto can perform a join across different shards using its distributed execution model.

Query:

SELECT o.order_id, u.user_name
FROM orders o
JOIN users u
ON o.user_id = u.user_id;
Enter fullscreen mode Exit fullscreen mode

Presto will:

  1. Use scatter-gather to fetch relevant users records.
  2. Join data across nodes.

Google BigQuery:

BigQuery is a fully-managed, serverless data warehouse that excels at running large-scale analytical queries. While BigQuery abstracts away the details of sharding, it automatically partitions and distributes data across many nodes for optimized querying. It can handle large datasets with ease and is especially effective for analytical queries where data is partitioned by time or other dimensions.

Example Use Case: Joining Sharded Tables in BigQuery
SELECT o.order_id, u.user_name
FROM `project.dataset.orders` o
JOIN `project.dataset.users` u
ON o.user_id = u.user_id
WHERE o.order_date BETWEEN '2024-01-01' AND '2024-12-31';
Enter fullscreen mode Exit fullscreen mode

BigQuery automatically handles the partitioning and distribution, minimizing the need for manual sharding.


4. Handling Shard Key Misalignment in Node.js Applications

When dealing with sharded data in Node.js applications, issues like misaligned shard keys and the need for scatter-gather joins often arise. Here’s how you can approach these challenges using Node.js and Express.

Handling Broadcast Joins in Node.js

If a join requires broadcasting a small table (e.g., users) across all shards, you can implement the join in the application layer by fetching the small table once and using it to join with data from sharded tables.

const getShardData = async (shardPool, query, params) => {
    const client = await shardPool.connect();
    const result = await client.query(query, params);
    client.release();
    return result.rows;
};

// Fetch broadcasted data (small table)
const getBroadcastJoinResults = async () => {
    const productData = await getShardData(productPool, 'SELECT * FROM products');
    const ordersData = await getShardData(ordersPool, 'SELECT * FROM orders');

    // Perform join in memory
    const result = ordersData.map(order => {
        const product = productData.find(p => p.product_id === order.product_id);
        return { ...order, product_name: product ? product.product_name : null };
    });

    return result;
};
Enter fullscreen mode Exit fullscreen mode

Handling Scatter-Gather Queries in Node.js

For queries that involve scatter-gather joins (e.g., when shard keys are misaligned), you will need to query all shards and aggregate the results in your application layer.

const getScatterGatherJoinResults = async (shardPools, userId) => {
    const userQuery = 'SELECT * FROM users WHERE user_id = $1';
    const postsQuery = 'SELECT * FROM posts WHERE user_id = $1';

    // Query all shards
    const userPromises = shardPools.users.map(pool => getShardData(pool, userQuery, [userId]));
    const postPromises = shardPools.posts.map(pool => getShardData(pool, postsQuery, [userId]));

    // Aggregate results
    const users = (await Promise.all(userPromises)).flat();
    const posts = (await Promise.all(postPromises)).flat();

    // Join data
    return posts.map(post => {
        const user = users.find(u => u.user_id === post.user_id);
        return { ...post, user_name: user ? user.user_name : null };
    });
};
Enter fullscreen mode Exit fullscreen mode

5. Best Practices for Query Optimization with Sharded Data

When dealing with sharded data and performing joins, consider the following best practices:

  1. Align Shard Keys: When possible, ensure that related tables use the same shard key. This minimizes the need for cross-shard joins and improves performance.

  2. Denormalization: In scenarios where joins are frequent, consider denormalizing your data. For instance, you can store user information directly in the posts table, reducing the need for a join.

  3. Use Broadcast Joins for Small Tables: If one of the tables is small enough, broadcast it to all nodes to avoid scatter-gather queries.

  4. Pre-Join Data: For frequently accessed data, consider pre-joining and storing the results in a materialized view or a cache.

  5. Leverage Distributed Query Engines: For complex analytical queries, use systems like Presto or BigQuery that handle distributed joins and optimizations automatically.


6. Best Practices for Cursor-Based Pagination with Sharded Data

In a distributed system with such sharding, cursor-based pagination needs to be handled carefully, especially because data is spread across multiple shards. The key is to:

  1. Split the queries: Query each shard independently for relevant data.
  2. Handle pagination in chunks: Decide how to paginate across the shard data (either on posts or users), and gather relevant results.
  3. Join at the application level: Fetch results from each shard, join the data in memory, and then apply the cursor logic for the next page.

Let's walk through how we can implement this with Node.js and Express, taking into account that data resides on different shards and requires post-fetch joins at the application level.

How to Handle Pagination and Joins with Sharded Tables

Let’s assume we have:

  • posts table sharded by user_id.
  • users table sharded by user_id.

We want to retrieve paginated posts for a given user, but since users and posts are on different shards, we'll need to split the query, handle pagination, and then perform the join at the application level.

Approach:

  1. Query the Relevant Shards:

    • First, you need to query the posts table across the shards to fetch the posts.
    • After fetching the relevant posts, use the user_id from the posts to query the users table (again, across shards).
  2. Pagination Strategy:

    • Pagination on posts: You can use created_at, post_id, or another unique field to paginate the posts table.
    • Pagination on users: You may need to fetch user data separately or use the user_id as a cursor to paginate through users.
  3. Application-Level Join:

    • After retrieving data from the relevant shards (for both posts and users), join them at the application level.
  4. Handling the Cursor:

    • After fetching the first page, use the last created_at or post_id (from the posts) as the cursor for the next query.

Example Implementation

1. Query Posts Across Shards

Here we will execute queries across different posts shards, filtering by a cursor (e.g., created_at or post_id).

2. Query Users Across Shards Using Post Data

Once we have the relevant post_id and user_id from the first query, we will fetch user data from the relevant shards.

const { Pool } = require('pg');  // PostgreSQL pool

// Assume pools for different shards
const shardPools = {
    posts: [
        new Pool({ connectionString: 'postgres://user@shard1' }), 
        new Pool({ connectionString: 'postgres://user@shard2' })
    ],
    users: [
        new Pool({ connectionString: 'postgres://user@shard1' }),
        new Pool({ connectionString: 'postgres://user@shard2' })
    ]
};

// Function to fetch posts from multiple shards
async function getPostsFromShards(cursor, limit) {
    const postQuery = `
        SELECT post_id, user_id, content, created_at
        FROM posts
        WHERE created_at > $1
        ORDER BY created_at
        LIMIT $2
    `;

    // Query each shard for posts
    const postPromises = shardPools.posts.map(pool => {
        return pool.query(postQuery, [cursor || '1970-01-01', limit]);
    });

    const postResults = await Promise.all(postPromises);

    // Flatten results and sort by created_at
    let posts = postResults.flatMap(res => res.rows);
    posts.sort((a, b) => new Date(a.created_at) - new Date(b.created_at));

    // Get the cursor for the next page
    const nextCursor = posts.length > 0 ? posts[posts.length - 1].created_at : null;

    return { posts, nextCursor };
}

// Function to fetch users from relevant shards based on post data
async function getUsersForPosts(posts) {
    // Get unique user_ids from posts
    const userIds = [...new Set(posts.map(post => post.user_id))];

    // Query each shard for relevant user data
    const userPromises = shardPools.users.map(pool => {
        const userQuery = 'SELECT user_id, user_name FROM users WHERE user_id = ANY($1)';
        return pool.query(userQuery, [userIds]);
    });

    const userResults = await Promise.all(userPromises);

    // Flatten the user data
    let users = userResults.flatMap(res => res.rows);

    return users;
}

// Combine posts and user data at the application level
async function getPaginatedPostsWithUsers(cursor = null, limit = 10) {
    // Step 1: Fetch posts from shards
    const { posts, nextCursor } = await getPostsFromShards(cursor, limit);

    // Step 2: Fetch users for the posts
    const users = await getUsersForPosts(posts);

    // Step 3: Join posts and users by user_id
    const postsWithUsers = posts.map(post => {
        const user = users.find(u => u.user_id === post.user_id);
        return { ...post, user_name: user ? user.user_name : null };
    });

    return { posts: postsWithUsers, nextCursor };
}

// Express route to serve paginated posts with users
app.get('/posts', async (req, res) => {
    const { cursor, limit = 10 } = req.query;
    const { posts, nextCursor } = await getPaginatedPostsWithUsers(cursor, limit);

    res.json({
        posts,
        nextCursor  // The cursor to be used for the next page
    });
});
Enter fullscreen mode Exit fullscreen mode

Key Details:

  1. Pagination on posts: The cursor is based on the created_at field or another unique field in posts, which is used to paginate through results.
  2. Query Shards Independently: Since posts and users are sharded on different keys, we query each shard independently, gathering data from all shards before performing the join at the application level.
  3. Cursor Handling: After retrieving the results, we use the last created_at (or post_id) from the posts to generate the cursor for the next page.
  4. Join at Application Level: After fetching data from the relevant shards, we join the posts with user data based on user_id in memory.

Conclusion

Managing sharded data in distributed systems presents unique challenges, particularly when it comes to performing efficient joins. Understanding techniques like broadcast joins, scatter-gather joins, and leveraging distributed query engines can significantly improve query performance. Additionally, in application-level queries, it's essential to consider shard key alignment, denormalization, and optimized query strategies. By following these best practices and utilizing the right tools, developers can ensure that their applications handle sharded data effectively and maintain performance at scale.

Top comments (0)