DEV Community

Joseph Sutton
Joseph Sutton

Posted on

Polyglot Microservices: Federated Subscriptions in Golang, Rust, and Node.js, Pt. 2

Series Navigation

  1. Golang Microservices (spells)
  2. Rust Microservice (messages)
  3. Node.js Microservice (players)
  4. Gateway

Message Subgraph (Rust)

Now, let's create the Rust subgraph. Let's add a Rust service to our monorepo -- first we'll need the Nx plugin, and then we'll need to generate the project:

nx add @monodon/rust
nx generate @monodon/rust message-service
Enter fullscreen mode Exit fullscreen mode

We'll be using async-graphql for our code-first GraphQL, axum to serve our schema as a websever, and tokio for an async runtime. We'll need some utility libraries for futures (among other things) for a simple broker (since we're not using Redis), as well. Let's add all of the dependencies we'll need for our service to Cargo.toml:

[package]
name = "message-service"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.90"
async-graphql = { version = "7.0.6", default-features = false, features = ["tracing", "chrono", "playground", "graphiql"] }
async-graphql-axum = "7.0.3"
async-stream = "0.3.5"
axum = { version = "0.7.5", features = ["macros"] }
axum-extra = { version = "0.9.4", default-features = false, features = ["tracing", "typed-header"] }
futures-util = "0.3.30"
futures-channel = "0.3.30"
futures-timer = "3.0.3"
handlebars = "5.1.2"
once_cell = "1.19"
slab = "0.4.9"
tokio = { version = "1.38", features = ["full"] }
tokio-stream = "0.1.17"
Enter fullscreen mode Exit fullscreen mode

I have a specific template I like to use for Rust projects, because I like main.rs to be as small as possible. Let's add some files to bootstrap ourselves:

// main.rs
use rust_service::cli;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    cli::run().await
}
Enter fullscreen mode Exit fullscreen mode
// lib.rs
pub mod cli;
pub mod graphql;
Enter fullscreen mode Exit fullscreen mode

Let's add some code for our CLI crate:

use anyhow::Context;

pub async fn run() -> anyhow::Result<()> {
  let (send, mut recv) = tokio::sync::mpsc::channel(1);
  let mut handlers = vec![];

  println!("Starting messages graphql subgraph");
  let subgraph_send = send.clone();
  handlers.push(tokio::spawn(async move {
    let _ = subgraph_send.try_send(
      crate::graphql::server::run_server().await.context("Failed to start messages graphql server"),
    );
  }));

  let reason = recv.recv().await.expect("Didn't receive msg");
    for handle in handlers {
        handle.abort();
    }

    reason
}
Enter fullscreen mode Exit fullscreen mode

Ok, now for our GraphQL server. Let's create a Message type:

// src/graphql/types.rs
use async_graphql::SimpleObject;

#[derive(SimpleObject, Clone)]
pub(super) struct Message {
  pub id: String,
  pub text: String,
  pub to_player_id: String,
  pub from_player_id: String,
}
Enter fullscreen mode Exit fullscreen mode

We'll create a broker.rs file in src/graphql as well inspired by this example one from async-graphql: simple_broker.rs. We do need to modify it to add a filter for Stream, however:

    pub fn subscribe_filtered<F>(filter: F) -> impl Stream<Item = T>
    where
        F: Fn(&T) -> future::Ready<bool> + Send + 'static,
        T: Sync + Send + Clone + 'static,
    {
        Self::subscribe().filter(filter)
    }
Enter fullscreen mode Exit fullscreen mode

Ok, now for the schema:

// src/graphql/schema.rs
use async_graphql::{*, futures_util::Stream, futures_util::future};
use super::{types::*, broker::SimpleBroker};

pub struct Query;

#[Object]
impl Query {
  async fn message(&self, id: String) -> async_graphql::Result<Message> {
    // this is here because async-graphql needs one query at least
    Ok(Message {
      id,
      text: "Hello, world!".to_string(),
      to_player_id: "1".to_string(),
      from_player_id: "2".to_string(),
    })
  }
}

pub struct Mutation;

#[derive(SimpleObject)]
pub struct MessagePayload {
  message: Message,
}

#[derive(InputObject)]
struct SendMessageInput {
  text: String,
  to_player_id: String,
  from_player_id: String,
}

#[Object]
impl Mutation {
  async fn send_message(&self, input: SendMessageInput) -> async_graphql::Result<MessagePayload> {
    let message = Message {
      id: "1".to_string(),
      text: input.text,
      to_player_id: input.to_player_id,
      from_player_id: input.from_player_id,
    };

    SimpleBroker::<Message>::publish(message.clone());

    Ok(MessagePayload { message })
  }
}

pub struct Subscription;

#[Subscription]
impl Subscription {
    async fn messages(&self, player_id: String) -> impl Stream<Item = Message> {
        SimpleBroker::<Message>::subscribe_filtered(move |message| future::ready(message.to_player_id == player_id))
    }
}
Enter fullscreen mode Exit fullscreen mode

And finally, the server:

// src/graphql/server/mod.rs

use async_graphql::{http::GraphiQLSource, Schema};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
use axum::{routing::get, Router, response::{IntoResponse, self}};
use tokio::net::TcpListener;

use crate::graphql;

pub async fn run_server() -> anyhow::Result<()> {
  let schema = Schema::build(graphql::Query, graphql::Mutation, graphql::Subscription)
    .finish();

  let app = Router::new()
        .route(
            "/",
            get(graphiql).post_service(GraphQL::new(schema.clone())),
        )
        .route_service("/ws", GraphQLSubscription::new(schema));

    println!("Starting graphql server on port {}", 8081);
    axum::serve(TcpListener::bind("127.0.0.1:8081").await.unwrap(), app)
        .await
        .unwrap();
    Ok(())
}

async fn graphiql() -> impl IntoResponse {
    response::Html(
        GraphiQLSource::build()
            .endpoint("/")
            .subscription_endpoint("/ws")
            .finish(),
    )
}
Enter fullscreen mode Exit fullscreen mode

We can run this with:

nx run message-service:run
Enter fullscreen mode Exit fullscreen mode

And then run the following GraphQL:

subscription {
  messages(playerId: "jojo") {
    text
  }
}
Enter fullscreen mode Exit fullscreen mode
mutation {
  sendMessage(input: { text: "hello", toPlayerId: "jojo", fromPlayerId: "jojo"}) {
    message {
      text
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

As with the Golang service for spells, we can play around with this. We need to use multiple browser tabs again to test the subscription filtering, because this also uses GraphiQL - see this GitHub issue for clarity.

Now, let's move onto the Node.js microservice in Part 3!

Top comments (0)