Series Navigation
Message Subgraph (Rust)
Now, let's create the Rust subgraph. Let's add a Rust service to our monorepo -- first we'll need the Nx plugin, and then we'll need to generate the project:
nx add @monodon/rust
nx generate @monodon/rust message-service
We'll be using async-graphql
for our code-first GraphQL, axum
to serve our schema as a websever, and tokio
for an async runtime. We'll need some utility libraries for futures (among other things) for a simple broker (since we're not using Redis), as well. Let's add all of the dependencies we'll need for our service to Cargo.toml
:
[package]
name = "message-service"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.90"
async-graphql = { version = "7.0.6", default-features = false, features = ["tracing", "chrono", "playground", "graphiql"] }
async-graphql-axum = "7.0.3"
async-stream = "0.3.5"
axum = { version = "0.7.5", features = ["macros"] }
axum-extra = { version = "0.9.4", default-features = false, features = ["tracing", "typed-header"] }
futures-util = "0.3.30"
futures-channel = "0.3.30"
futures-timer = "3.0.3"
handlebars = "5.1.2"
once_cell = "1.19"
slab = "0.4.9"
tokio = { version = "1.38", features = ["full"] }
tokio-stream = "0.1.17"
I have a specific template I like to use for Rust projects, because I like main.rs
to be as small as possible. Let's add some files to bootstrap ourselves:
// main.rs
use rust_service::cli;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
cli::run().await
}
// lib.rs
pub mod cli;
pub mod graphql;
Let's add some code for our CLI crate:
use anyhow::Context;
pub async fn run() -> anyhow::Result<()> {
let (send, mut recv) = tokio::sync::mpsc::channel(1);
let mut handlers = vec![];
println!("Starting messages graphql subgraph");
let subgraph_send = send.clone();
handlers.push(tokio::spawn(async move {
let _ = subgraph_send.try_send(
crate::graphql::server::run_server().await.context("Failed to start messages graphql server"),
);
}));
let reason = recv.recv().await.expect("Didn't receive msg");
for handle in handlers {
handle.abort();
}
reason
}
Ok, now for our GraphQL server. Let's create a Message type:
// src/graphql/types.rs
use async_graphql::SimpleObject;
#[derive(SimpleObject, Clone)]
pub(super) struct Message {
pub id: String,
pub text: String,
pub to_player_id: String,
pub from_player_id: String,
}
We'll create a broker.rs
file in src/graphql
as well inspired by this example one from async-graphql
: simple_broker.rs
. We do need to modify it to add a filter for Stream, however:
pub fn subscribe_filtered<F>(filter: F) -> impl Stream<Item = T>
where
F: Fn(&T) -> future::Ready<bool> + Send + 'static,
T: Sync + Send + Clone + 'static,
{
Self::subscribe().filter(filter)
}
Ok, now for the schema:
// src/graphql/schema.rs
use async_graphql::{*, futures_util::Stream, futures_util::future};
use super::{types::*, broker::SimpleBroker};
pub struct Query;
#[Object]
impl Query {
async fn message(&self, id: String) -> async_graphql::Result<Message> {
// this is here because async-graphql needs one query at least
Ok(Message {
id,
text: "Hello, world!".to_string(),
to_player_id: "1".to_string(),
from_player_id: "2".to_string(),
})
}
}
pub struct Mutation;
#[derive(SimpleObject)]
pub struct MessagePayload {
message: Message,
}
#[derive(InputObject)]
struct SendMessageInput {
text: String,
to_player_id: String,
from_player_id: String,
}
#[Object]
impl Mutation {
async fn send_message(&self, input: SendMessageInput) -> async_graphql::Result<MessagePayload> {
let message = Message {
id: "1".to_string(),
text: input.text,
to_player_id: input.to_player_id,
from_player_id: input.from_player_id,
};
SimpleBroker::<Message>::publish(message.clone());
Ok(MessagePayload { message })
}
}
pub struct Subscription;
#[Subscription]
impl Subscription {
async fn messages(&self, player_id: String) -> impl Stream<Item = Message> {
SimpleBroker::<Message>::subscribe_filtered(move |message| future::ready(message.to_player_id == player_id))
}
}
And finally, the server:
// src/graphql/server/mod.rs
use async_graphql::{http::GraphiQLSource, Schema};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
use axum::{routing::get, Router, response::{IntoResponse, self}};
use tokio::net::TcpListener;
use crate::graphql;
pub async fn run_server() -> anyhow::Result<()> {
let schema = Schema::build(graphql::Query, graphql::Mutation, graphql::Subscription)
.finish();
let app = Router::new()
.route(
"/",
get(graphiql).post_service(GraphQL::new(schema.clone())),
)
.route_service("/ws", GraphQLSubscription::new(schema));
println!("Starting graphql server on port {}", 8081);
axum::serve(TcpListener::bind("127.0.0.1:8081").await.unwrap(), app)
.await
.unwrap();
Ok(())
}
async fn graphiql() -> impl IntoResponse {
response::Html(
GraphiQLSource::build()
.endpoint("/")
.subscription_endpoint("/ws")
.finish(),
)
}
We can run this with:
nx run message-service:run
And then run the following GraphQL:
subscription {
messages(playerId: "jojo") {
text
}
}
mutation {
sendMessage(input: { text: "hello", toPlayerId: "jojo", fromPlayerId: "jojo"}) {
message {
text
}
}
}
As with the Golang service for spells, we can play around with this. We need to use multiple browser tabs again to test the subscription filtering, because this also uses GraphiQL - see this GitHub issue for clarity.
Now, let's move onto the Node.js microservice in Part 3!
Top comments (0)