DEV Community

Cover image for Subscriptions and Live Queries - Real Time with GraphQL
TheGuildBot for The Guild

Posted on • Edited on • Originally published at the-guild.dev

Subscriptions and Live Queries - Real Time with GraphQL

This article was published on Wednesday, April 21, 2021 by Laurin Quast @ The Guild Blog

Subscriptions are the go-to solution for adding real-time capabilities to a GraphQL-powered
application. At the same time the term GraphQL Live Query floats around and can often be found in
the context of subscriptions.

While
GraphQL Subscriptions have been a part of the GraphQL Specification
for some time, GraphQL Live Queries are not part of the specification and further there is no RFC
going on.

However,
discussion about GraphQL Live Queries started way back when GraphQL Subscriptions were designed.

So let's take a recap of GraphQL Subscriptions, take a look at existing Live Query Solutions today
and compare the differences between the two solutions for real-time.

Note: This blog post is also available as a YouTube video.

Subscription Recap

The
GraphQL Subscription RFC was merged back in March 2017.
The first major and wide-adopted transport implementation was (and probably is)
subscriptions-transport-ws. It was
developed by Apollo, but unfortunately they seem to have abandoned it since then. Fortunately, we
now have a successor graphql-ws.

A subscription operation looks similar to this.

subscription onPostAddedSubscription {
  onPostAdded {
    post {
      id
      author {
        id
        login
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

In contrast to a GraphQL query operation, a subscription operation is only allowed to select a
single field on the GraphQL Subscription root type.

Furthermore, executing a subscription operation represents a stream of results where executing a
query operation only results in a single result.

Let's take a quick look at the graphql-js reference implementation!

Since promises cannot represent multiple values over time, the graphql-js reference
implementations uses AsyncIterator, which
is a native structure similar to Observables (which one might already know by having dug a bit
deeper into the most-widely adopted GraphQL clients).

Each subscription root field must provide a subscribe function that returns an AsyncIterator and
optionally has a resolve function for mapping the published events.

When a subscription operation is executed, the subscribe function of that field resolver is called
and the AsyncIterator returned by it will be used as the source for the stream of events returned
to the client.

Once the AsyncIterator publishes a value (the payload or event), the optional resolve function
on the selected subscription root field is called with the value. All subsequently executed
resolvers in the resolver/type tree behave like normal query resolvers.

The most basic implementation for a counter would look similar to this:

const sleep = (t = 1000) => new Promise(res => setTimeout(res, t))

const resolvers = {
  Subscription: {
    countToNumber: {
      subscribe: async function* (_, args) {
        for (let counter = 1; counter <= args.toNumber; counter++) {
          yield { countToNumber: counter }
          await sleep()
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The above subscription will count up to the number provided via the toNumber argument (while
having a delay of one second between each message) and then complete.

Of course, in a real-world application we would like to subscribe to other event sources instead of
some static, pre-defined events.

The most common used (but not best maintained) library for such a PubSub engine in the GraphQL
context is graphql-subscriptions. There
are also adapters available for more distributed systems (where all GraphQL API replicas must be
notified about the event) e.g. over Redis.

If there is not need for scaling horizontally the graphql-subscriptions package can be omitted and
be replaced with the Node.js native events module:

import { EventEmitter, on } from 'events'

export const createPubSub = <TTopicPayload extends { [key: string]: unknown }>(
  eventEmitter: EventEmitter
) => {
  return {
    publish: <TTopic extends Extract<keyof TTopicPayload, string>>(
      topic: TTopic,
      payload: TTopicPayload[TTopic]
    ) => void emitter.emit(topic as string, payload),
    subscribe: async function* <TTopic extends Extract<keyof TTopicPayload, string>>(
      topic: TTopic
    ): AsyncIterableIterator<TTopicPayload[TTopic]> {
      const asyncIterator = on(emitter, topic)
      for await (const [value] of asyncIterator) {
        yield value
      }
    }
  }
}

const pubSub = createPubSub(new EventEmitter())
Enter fullscreen mode Exit fullscreen mode

A (type-safe) PubSub implementation in 21 lines of code. We will use this for the example below.

In my opinion the different pub sub implementations should rather be based on EventEmitter instead
of graphql-subscriptions. A PubSub can but musn't be used together with GraphQL. By choosing the
name graphql-subscriptions it gives the impression that the logic is specific to GraphQL and
reduces other possible contributions from people that need a similar event abstraction.

Therefore, I hope the next generation/iteration of Node.js PubSub implementations is less specific.

Having said that, let's take a look at a more "real-world" like example of using subscriptions with
PubSub:

const resolvers = {
  Subscription: {
    onPostAdded: {
      subscribe: async function* (_, _, context) {
        for await (const { id } of context.pubSub.subscribe('POST_ADDED')) {
          const post = await context.postStore.load(id, context.viewer)
          if (post === null) {
            continue
          }
          yield { onPostAdded: loadedPost }
        }
      }
    }
  },
  Mutation: {
    postAdd: (_, args, context) => {
      const post = context.postStore.create(args)
      // wo don't wanna publish the whole object via our event emitter, the id should be sufficient
      context.pubSub.publish('POST_ADDED', { id: args.id })
      return post
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Until now, we only took a look at the resolvers. Let's also quickly check the subscribe function
exported from graphql-js, which can be used for executing subscriptions.

import { subscribe } from 'graphql'

const subscribeResult = await subscribe({
  schema,
  document
})

if (isAsyncIterable(subscribeResult)) {
  for await (const executionResult of subscribeResult) {
    sendToClient(executionResult)
  }
} else {
  sendToClient(subscribeResult)
}
Enter fullscreen mode Exit fullscreen mode

The subscribe function returns either a AsyncIterable that publishes multiple ExecutionResults
or a single ExecutionResult in case the setup of the subscription somehow fails.

The interesting thing is that we can use any transport for delivering the results to client. The
most popular implementation (as mentioned before) is subscriptions-transport-ws. Unfortunately,
since it is poorly maintained, the GraphQL Working Group came up with a new implementation over
WebSockets, graphql-ws.

But we are not forced to use WebSockets at all.
Server Side Events)
might be a more lightweight solution for both our server and client.

It is actually a shame that the default express-graphql reference HTTP transport implementation
does not come with a built-in subscription solution.

Fortunately, we now have libraries like
Graphql Helix, which, in my humble opinion, should
replace express-graphql as the reference HTTP implementation since GraphQL Helix is also not tied
to any web server framework.

For public GraphQL APIs, I am convinced that Server Sent Events is the future as there is less work
required for implementing the protocol.

I also built
my own transport over Socket.io,
which uses WebSockets by default and HTTP polling as a fallback.

As you can see, with GraphQL subscriptions, we are free to choose the best transport for our
application requirements!


Now that we took a look at how GraphQL Subscription resolvers are implemented on the server-side,
lets also check out how we can consume the GraphQL API on the client-side!

Usually we will have a network interface that is called by our favorite GraphQL client. Every single
client has a different name and implementation. Apollo calls them links, Relay calls them fetcher
functions, and urql calls them exchanges.

All of them have one thing in common. They are working with observable-like data structures, which
basically means for consuming subscriptions, all major GraphQL client-libraries decided to use
Observables, instead of AsyncIterators (which are still not part of the ECMA Spec as of October
2020).

Like an AsyncIterator, an Observable can represent multiple values. As already mentioned, each
client library and transport have slightly different interfaces. I will use graphql-ws with
relay-runtime as an example.

The example is taken straight from the graphql-ws section.

import { createClient } from 'graphql-ws'
import { Network, Observable, RequestParameters, Variables } from 'relay-runtime'

const subscriptionsClient = createClient({
  url: 'wss://i.love/graphql',
  connectionParams: () => {
    const session = getSession()
    if (!session) {
      return {}
    }
    return {
      Authorization: `Bearer ${session.token}`
    }
  }
})

// yes, both fetch AND subscribe handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create(sink => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'))
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables
      },
      {
        ...sink,
        error: err => {
          if (err instanceof Error) {
            sink.error(err)
          } else if (err instanceof CloseEvent) {
            sink.error(
              new Error(
                `Socket closed with event ${err.code}` + err.reason
                  ? `: ${err.reason}` // reason will be available on clean closes
                  : ''
              )
            )
          } else {
            // GraphQLError[]
            sink.error(new Error(err.map(({ message }) => message).join(', ')))
          }
        }
      }
    )
  })
}

const network = Network.create(fetchOrSubscribe, fetchOrSubscribe)
const store = new Store(new RecordSource())
export const environment = new Environment({ network, store })
Enter fullscreen mode Exit fullscreen mode

With this configuration, Relay can now execute subscription operations. Because the graphql-ws
protocol is way more complex than the GraphQL over HTTP protocol, we use the client exported from
the graphql-ws package instead. This results in some additional bundle-size. As mentioned before SSE
might be a better, lightweight alternative.

That aside, let's start with a basic subscription that should update one of our components.

Our PostRender already shows some content.

const PostQuery = graphql`
  query PostQuery($postId: ID!) {
    post(id: $postId) {
      id
      title
      content {
        text
      }
      totalLikeCount
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId })

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode

As a new feature requirement, the like count of the post should get updated once someone hits the
like button.

We could choose different ways of implementing such a subscription.

  1. General Subscription for changed post
subscription PostChanged($postId: ID!) {
  onPostChanged(postId: $postId) {
    post {
      id
      totalLikeCount
    }
  }
}
Enter fullscreen mode Exit fullscreen mode
  1. Specific Subscription
subscription PostLikeCountChanged($postId: ID!) {
  onPostLikeCountChanged(postId: $postId) {
    totalLikeCount
  }
}
Enter fullscreen mode Exit fullscreen mode

Both solutions have different implications.

  1. General Subscription for changed post

This approach is not limited to notifying whether the totalLikeCount of the post likes have
changed; in the future we could adjust the selection set on the post field as it also returns a
Post type similar to our already existing Query.post field. It will automatically update the
post record already in the cache as the Relay store (similar to other clients) can identify the post
object via the id field. The drawback is that we could potentially send too much data over the wire.
E.g. if we also wanted to subscribe to title changes all additional selected fields are sent to the
client each time the underlying event is emitted, even if only the totalLikeCount value has
changed.

const PostChangedSubscription = graphql`
  subscription PostChanged($postId: ID!) {
    onPostChanged(postId: $postId) {
      post {
        id
        totalLikeCount
      }
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId })
  // thats all we gotta do
  useSubscription(PostChangedSubscription, /* variables */ { postId })

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode
  1. Specific Subscription

This subscription is specifically designed for only an update of the totalCount. However, the
subscription result returns no Post type. Therefore, we cannot make use of the automatic cache
updates via the id. We have to additionally define a handler for updating the post in the cache.

const PostLikeCountChangedSubscription = graphql`
  subscription PostLikeCountChanged($postId: ID!) {
    onPostLikeCountChanged(postId: $postId) {
      totalCount
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId })
  useSubscription(
    PostLikeCountChangedSubscription,
    /* variables */ { postId },
    {
      // we need to manually update the cache :(
      updater: (store, payload) => {
        const record = store.get(postId)
        if (record) {
          record.setValue('totalLikeCount', payload.onPostLikeCountChanged.totalCount)
        }
      }
    }
  )

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode

Obviously, for this example, no sane person would actually want to choose the second solution over
the first one.

But as our business requirements might get more complex we might need to do manual cache updates.

A very good example for this is lists. Imagine us having a set of data in which a single item
changes. The "easy to implement" solution would be to just refetch the complete list every time a
single item is added/removed/changed. However, For a list containing hundreds of items only sending
the changed item to the client might be the smarter and faster solution...

This can be implemented via a union type.

type OnUserAdded {
  user: User!
}
type OnUserRemoved {
  removedUserId: ID!
}

union OnUserListChange = OnUserAdded | OnUserRemoved

type Subscription {
  onUserListChange: OnUserListChange!
}
Enter fullscreen mode Exit fullscreen mode

The corresponding code, including handling the cache updates:

const UserListQuery = graphql`
  query UserListQuery {
    users {
      id
      name
    }
  }
`

const UserListChangeSubscription = graphql`
  subscription UserListChangeSubscription {
    onUserListChange {
      __typename
      ... on OnUserAdded {
        user {
          id
          name
        }
      }
      ... on OnUserRemoved {
        removedUserId
      }
    }
  }
`

const UserListRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(UserListQuery)
  useSubscription(
    UserListChangeSubscription,
    /* variables */ { postId },
    {
      // we need to manually update the cache :(
      updater: (store, payload) => {
        const rootField = store.getRootField()
        const users = rootField.getLinkedRecords('users')
        if (users) {
          switch (payload.onUserListChange.__typename) {
            case 'OnUserAdded': {
              const newUser = store.create(payload.onUserListChange.user.id, 'User')
              newUser.setValue('name', payload.onUserListChange.user.name)
              rootField.setLinkedRecords('users', [...users, newUser])
              break
            }
            case 'onUserRemoved': {
              rootField.setLinkedRecords(
                users.map(user => user.getDataID() !== payload.onUserListChange.removedUserId)
              )
              break
            }
          }
        }
      }
    }
  )

  return <UserList {...props} />
}
Enter fullscreen mode Exit fullscreen mode

As our application grows the manual cache update code can become so complex and confusing that I
have considered switching back into simply refetching the queries in some applications.

Fortunately, Relay contributors have
worked on some nice query directives that allow
reducing such cache update code. It won't cover all cases though.

In all of the above examples we responded (more or less implicitly) to data change events.

Subscriptions can be used to apply data changes on the client. But they are probably not the best
tool for that.

Before taking a look of what could be a better tool, let's look at another usage example for
subscriptions.

const OnPlaySound = graphql`
  subscription OnPlayChatSound {
    onPlayChatSound {
      soundUrl
    }
  }
`

const ChatRenderer = () => {
  const chat = useQuery(ChatQuery)
  useSubscription(
    OnPlaySound,
    useMemo({
      onNext: payload => {
        playSound(payload.onPlayChatSound.soundUrl)
      }
    })
  )

  return <Chat chat={chat} />
}
Enter fullscreen mode Exit fullscreen mode

The difference here is that we are not manipulating our existing data but rather executing a side
effect.

Subscriptions can also be used for side effects that should not alter or touch any data in the
cache.

Live Queries

What is a live query? Well, there is no specification for that so the term is ambiguous. Today,
there are several solutions one could describe as live queries.

All those solutions have one thing in common: Trying to keep the client state in sync with the
server
.

Which can be paraphrased as observing data.

Before we take a look at how all of those implementations, let's break down what we should or can
expect from a live query implementation.

Automatically update the clients

This is one is pretty obvious. The live query implementation should keep the client state as close
to the server state as possible.

Efficiently update clients

If only a part of our data has changed we don't necessarily need to send the complete execution
result over the wire. Only sending the changed parts might be more efficient. In general the network
overhead should be minimal.

Flexibility

Ideally a solution should not be tied to a specific database or coupled with some SaaS service that
won't be around next year.

Adoptability

In case we already have some kind of GraphQL schema or server implementation, the live query
solution should be adobtable without changing everything and starting new.

Polling a Query

The easiest solution for implementing live queries would be to poll a query in intervals. Most
GraphQL clients have such a feature already implemented.

const PostQuery = graphql`
  query PostQuery($postId: ID!) {
    post(id: $postId) {
      id
      title
      content {
        text
      }
      likes {
        totalCount
      }
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(
    PostQuery,
    /* variables */ { postId },
    {
      pollIntervall: 5000
    }
  )

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode
  1. Automatically updates clients. No.

Depending upon the use-case, this could be a valid solution, but for true real-time applications
that require instant feedback, this is not precise enough due to the delay caused by the poll
interval.

  1. Efficiently updates clients. No.

The whole execution result is sent over to the client for every single time the operation is
re-executed. A lot more data than necessary is transported over the wire to the client even if
nothing has changed from the last poll interval.

  1. Flexibility. High.

Straight forward, as this does not rely on any changes on the server and only slight changes on our
frontend.

  1. Adoptability. High.

Straight forward, again almost no changes required.

Live Queries over Subscriptions

We already had a "live query over subscription"-like example above.

const PostChangedSubscription = graphql`
  subscription PostChanged($postId: ID!) {
    onPostChanged(postId: $postId) {
      post {
        id
        title
        likes {
          totalCount
        }
      }
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId })
  // thats all we gotta do
  useSubscription(PostChangedSubscription, /* variables */ { postId })

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode

Let's ditch the PostQuery completely and instead use a PostSubscription that always emits an
initial event.

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!) {
    post(id: $postId) {
      id
      title
      likes {
        totalCount
      }
    }
  }
`

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useSubscription(PostChangedSubscription, /* variables */ { postId })

  return <Post {...props} />
}
Enter fullscreen mode Exit fullscreen mode

A server resolver implementation could look similar to this:

const resolvers = {
  Subscription: {
    post: async function* (_, { id }, context) {
      let loadPost = () => context.postStore.load(id, context.viewer)
      // publish first post
      yield { post: await loadPost() }
      // publish post again once change event is emitted
      for await (const _ of context.pubSub.subscribe(`Post:${id}`)) {
        yield { post: await loadPost() }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

We replace two operations with a single one!

A similar approach is used by
Hasura and also
PostGraphile.

The obvious drawback of both platforms is the lock-in into using a specific database. Of course that
might not be a problem for most people, but having a general solution that works with any data
source would be nice as more complex schema could fetch from different database types or other third
party APIs.

Those implementations keep track of all the resources in the execution result and re-execute the
subscription operation once any of those resources changes.

The resolver implementation above only responds to emitted post changes. In order keep track of all
the resources defined in an operation's selection set, we will have to come up with a smart
abstraction.

Another drawback of subscriptions for live queries is the limitation of only selecting one root
subscription field, which is defined by the GraphQL subscription specification. Furthermore, we must
also redeclare our query type fields to the subscription type.

There is a workaround we can apply for re-exporting the whole Query type via a sub-field in the
subscription type.

type Query {
  user(id: ID!): User
  post(id: ID!): Post
}

type Subscription {
  live: Query!
}
Enter fullscreen mode Exit fullscreen mode

This approach would allow us to query everything on the query object type via the live field on the
subscription object type, without having the limit of only being able to query one resource or
having to redeclare every resource field resolver on the subscription type. Neat!

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!, $userId: ID!) {
    live {
      post(id: $postId) {
        id
        title
        totalLikeCount
      }
      user(id: $userId) {
        id
        name
      }
    }
  }
`
Enter fullscreen mode Exit fullscreen mode

Okay, now we can select everything we could have selected with our query operation!

  1. Automatically updates clients. Yes.

When using services like PostGraphile and Hasura that is the case. However, for any additional
resolvers that are added on top of the service schema, we cannot implement an invalidation
mechanism. In user-land we will have to come up with an implementation of resource tracking by
ourselves.

  1. Efficiently updates clients. No.

The whole execution result is sent over to the client for every single time a live query is
invalidated.

  1. Flexibility. Low.

Both Hasura and PostGraphile are tightly coupled to a SQL database. For any custom resolvers, we
have to come up with the mechanism for resource tracking and invalidation ourselves.

  1. Adoptability. Low.

Switching to a server powered by PostGraphile or Hasura with an already existing GraphQL schema is
no easy task.

GraphQL Live Queries over Subscriptions with JSON Patch

Ideally, we only want to send a patch to the client that provides instructions on how to get from
the previous execution result to the next. The lack of these instructions has been a big flaw in the
previous two implementations.

The RFCs and implementations for the @defer and @stream introduced ways of sending (delayed)
partial results to clients. However, those "patch" operations are currently highly limited to a
"replace at path" and "append to list" operation.

A format such as JSON Patch might be a better alternative for
live queries.

graphql-live-subscriptions tries to solve
that with Subscription.live field that exposes both a Query and a JSON patch field.

Schema Types for graphql-live-subscriptions

scalar JSON

type RFC6902Operation {
  op: String!
  path: String!
  from: String
  value: JSON
}

type LiveQuery {
  patch: [RFC6902Operation]
  query: Query
}

type Subscription {
  live: LiveQuery!
}
Enter fullscreen mode Exit fullscreen mode

A live query operation can be declared similar to our PostSubscription document above.

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!, $userId: ID!) {
    live {
      query {
        post(id: $postId) {
          id
          title
          totalLikeCount
        }
        user(id: $userId) {
          id
          name
        }
      }
      patch {
        op
        path
        from
        value
      }
    }
  }
`
Enter fullscreen mode Exit fullscreen mode

The difference is that the type returned by the live field has two fields instead of a single one.
The query field, which selects the selection set from Query type and a patch field which is a
JSON Patch operation. When executing the given operation against the server the initial result will
have the data selected by the query field selection set included. All following values will have
no query value (null), but instead an array of patch operations that describe the changes for
updating the last result to the next result.

Initial result

{
  "data": {
    "live": {
      "query": {
        "post": {
          "id": "1",
          "title": "foo",
          "totalLikeCount": 10
        },
        "user": {
          "id": "10",
          "name": "Laurin"
        }
      },
      "patch": null
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Patch result (increase totalLikeCount)

{
  "data": {
    "live": {
      "query": null
    },
    "patch": [
      {
        "op": "replace",
        "path": "post/totalLikeCount",
        "from": null,
        "value": 11
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

The clients must then implement the logic for
applying the patch operation to their client cache
or for applying the patches on the initial result in the network/fetcher/link layer.

The server implementation uses an event emitter and an immutable state tree for detecting changes
that must be sent to clients. The patch is automatically generated from the next immutable state
that is compared against the last which got emitted via an EventEmitter.

While the idea is quite nice, the implementation is obviously meant for backends that already use
reactive or immutable data structures. Having to rewrite our existing GraphQL layer to support live
queries is a big trade-off. Furthermore, the library is not maintained that well. I've made PRs to
make the library compatible with recent GraphQL versions, but these have yet to be merged. Using
unions and interfaces is not possible. Having to patch a library with patch-package before even
being usable is generally a bad sign.

  1. Automatically updates clients. Yes.

When implementing our schema conform to the library, this library delivers precise results once the
immutable state has changed.

  1. Efficiently updates clients. Yes.

Initially a result tree is sent to the client. Afterwards, only JSON patches that must be applied to
the initial result are sent to the client.

  1. Flexibility. Kind of. ✳️

We don't rely on any third party services, however, we forced into immutability to some extend.

  1. Adoptability. It depends. ✳️

Adding an immutable layer to our existing schema might be a pretty big change. Furthermore, the
library does lack support for some GraphQL features such as Interfaces and Unions.

GraphQL Live Queries via the @live Directive

There are companies out there, like Facebook, that are
already using this approach. There is also a GraphQL framework available in Go that supports live
queries out of the box! Check out thunder here.

The idea behind the @live directive is that it is used to mark that the client is interested in
keeping that query execution result as up to date as possible. The implementation, however, is up to
user-land.

query ($id: ID!) @live {
  post(id: $id) {
    id
    title
    totalLikeCount
  }
}
Enter fullscreen mode Exit fullscreen mode

The idea of just making any query without additional overhead a live query is very appealing from
the view of a frontend developer. From a backend perspective, however that raises new questions.
Just adding a directive on the operation on frontend won't make the whole backend reactive.

After having built an example app with graphql-live-subscriptions from scratch, studying the flaws
of that library and being uncomfortable with the vendor lock-in of services such as PostGraphile and
Hasura, I decided to approach the problem of live queries in a more pluggable way, by using the
@live directive.

@n1ru4l/graphql-live-query A Common Definition and Set of Utilities for Determining of a Live Query

This module provides two things.

  1. GraphQLLiveDirective that can be added to any schema.
import { GraphQLSchema, specifiedDirectives } from 'graphql'
import { GraphQLLiveDirective } from '@n1ru4l/graphql-live-query'
import { mutation, query, subscription } from './schema'

const schema = new GraphQLSchema({
  query,
  mutation,
  subscription,
  directives: [GraphQLLiveDirective, /* Keep @defer/@stream/@if/@skip */ ...specifiedDirectives]
})
Enter fullscreen mode Exit fullscreen mode
  1. isLiveQueryOperationDefinitionNode

This is a simple function that takes a OperationDefinitionNode and returns true if it is a live
query document.

These utility functions can be found here on Github

Those functions alone might not seem that helpful alone, but they are a contract live query
execution engines could built on. Such as the package we are talking about next 😇.

@n1ru4l/in-memory-live-query-store Keep Track and Invalidate Resources Selected by a Live Query in an Efficient, but Generic Way

The InMemoryLiveQueryStore.execute function is a drop in replacement for the execute function
provided by the graphql package.

When encountering a query operation that is marked with the @live directive it will return a
AsyncIterator instead of a Promise that can be used for sending multiple results to the client.
Similar to how subscribe (or defer/stream) works.

Internally, the store keeps track of the resources selected in the live query operation selection
set. That means all root query field coordinates (e.g. Query.post) and global resource identifiers
(e.g. Post:1). The store can then be notified to re-execute live query operations that select a
given root query field or resource identifier via the InMemoryLiveQueryStore.invalidate method
with the corresponding resource identifier or field coordinates. A resource identifier is composed
out of the type name and the actual resolved id value separated by a colon, but this behavior can be
customized. For ensuring that the store keeps track of all our query resources we should always
select the id field on our object types. The store will only keep track of fields with the name id
and the type ID! (GraphQLNonNull(GraphQLID)).

import { parse } from 'graphql'
import { InMemoryLiveQueryStore } from '@n1ru4l/in-memory-live-query-store'
import { schema } from './schema'

const inMemoryLiveQueryStore = new InMemoryLiveQueryStore()

const rootValue = {
  todos: [
    {
      id: '1',
      content: 'Foo',
      isComplete: false
    }
  ]
}

inMemoryLiveQueryStore
  .execute({
    schema,
    operationDocument: parse(/* GraphQL */ `
      query todosQuery @live {
        todos {
          id
          content
          isComplete
        }
      }
    `),
    rootValue: rootValue,
    contextValue: {},
    variableValues: null,
    operationName: 'todosQuery'
  })
  .then(async result => {
    if (isAsyncIterable(result)) {
      for (const value of result) {
        // Send to client in real-world app :)
        console.log(value)
      }
    }
  })

// Invalidate by resource identifier
rootValue.todos[0].isComplete = true
inMemoryLiveQueryStore.invalidate(`Todo:1`)

// Invalidate by root query field coordinate
rootValue.todos.push({ id: '2', content: 'Baz', isComplete: false })
inMemoryLiveQueryStore.invalidate(`Query.todos`)
Enter fullscreen mode Exit fullscreen mode

When using an ORM such as Prisma, we can simply add a middleware for automatically invalidating
resources.

Use Prisma middleware for resource invalidation

// Register middleware for automatic model invalidation
prisma.$use(async (params, next) => {
  const resultPromise = next(params)

  if (params.action === 'update') {
    resultPromise.then(res => {
      if (res?.id) {
        // invalidate `Post:1` on update
        liveQueryStore.invalidate(`${params.model}:${res.id}`)
      }
    })
  }

  return resultPromise
})
Enter fullscreen mode Exit fullscreen mode

In case we have multiple server replicas some PubSub implementation can be used for distributing the
events.

PubSub with Redis

import Redis from 'ioredis'

const redis = new Redis()
redis.subscribe('invalidations')
redis.on('message', (channel, message) => {
  if (channel === 'invalidations') {
    // message is "Post:1"
    liveQueryStore.invalidate(message)
  }
})
Enter fullscreen mode Exit fullscreen mode

The transports graphql-ws (GraphQL over WebSocket), graphql-helix (GraphQL over SEE) and
@n1ru4l/socket-io-graphql-server (GraphQL over Socket.io), support providing a custom execute
function that is allowed to return AsyncIterables (thanks to the recent changes required for
@defer and @stream). All we have to do is to pass the InMemoryLiveQueryStore.execute to our
server factory!

Example with graphql-ws

import http from 'http'
import { parse, subscribe } from 'graphql'
import { useServer } from 'graphql-ws/lib/use/ws'
// yarn add ws
import ws from 'ws'
import { InMemoryLiveQueryStore } from '@n1ru4l/in-memory-live-query-store'
import { schema } from './schema'

const inMemoryLiveQueryStore = new InMemoryLiveQueryStore()

const server = http.createServer(function weServeSocketsOnly(_, res) {
  res.writeHead(404)
  res.end()
})

const wsServer = new ws.Server({
  server,
  path: '/graphql'
})

useServer(
  {
    schema,
    execute: inMemoryLiveQueryStore.execute,
    subscribe
  },
  wsServer
)

server.listen(443)
Enter fullscreen mode Exit fullscreen mode

The best thing is you can start playing around with it today! The fully functional implementation is
available as
@n1ru4l/in-memory-live-query-store on Github.
Feel free to create any issues regarding missing features or documentation. Let's shape the future
for GraphQL live queries together!

@n1ru4l/graphql-live-query-patch: Optional JSON Patch Middleware for Smaller Payloads over the Wire

GraphQL's execution result payloads can become quite huge. Sending those over the wire can be
expensive at some point. Especially when they are sent often for fast updating state. JSON patch is
a handy standard for only sending change instructions over the wire which can potentially reduce
such huge payloads.

Instead of having JSON patches enabled by default, it is a totally optional module, that can be
applied on the client and the server for deflating (create patches on the server) and inflating
(apply patches on the client) the execution results. Smaller projects might even be better off not
using JSON patch at all, as the patch payload might be bigger than the whole query result.

The patches are created by comparing the latest execution result with the previous execution result.
That means the server will always have to store the latest execution result as long as the live
query is active.

Here are some example execution results after applying the patch generator

Initial result

{
  "data": {
    "post": {
      "id": "1",
      "title": "foo",
      "totalLikeCount": 10
    },
    "user": {
      "id": "10",
      "name": "Laurin"
    }
  },
  "revision": 1
}
Enter fullscreen mode Exit fullscreen mode

Patch result (increase totalLikeCount)

{
  "patch": [
    {
      "op": "replace",
      "path": "post/totalLikeCount",
      "value": 11
    }
  ],
  "revision": 2
}
Enter fullscreen mode Exit fullscreen mode

On the server adding the patch generation middleware is easy function composition:

import { flow } from 'fp-ts/function'
import { createApplyLiveQueryPatchGenerator } from '@n1ru4l/graphql-live-query-patch'

const applyLiveQueryPatchGenerator = createApplyLiveQueryPatchGenerator()

const execute = flow(liveQueryStore.execute, applyLiveQueryPatchGenerator)
// same as
const execute0 = (...args) => applyLiveQueryPatchGenerator(liveQueryStore.execute(...args))
Enter fullscreen mode Exit fullscreen mode

On the client we now need to build an execution result out of the initial result and the patch
results, because our clients do not understand the graphql live query json patch protocol!

Applying the middleware is pretty easy as well!

import { flow } from 'fp-ts/function'
import { createApplyLiveQueryPatch } from '@n1ru4l/graphql-live-query-patch'

const applyLiveQueryPatch = createApplyLiveQueryPatch()

const execute = flow(networkInterface.execute, applyLiveQueryPatch)
// same as
const execute0 = (...args) => applyLiveQueryPatch(networkInterface.execute(...args))
Enter fullscreen mode Exit fullscreen mode

The library is optimized for network interfaces that return AsyncIterables. We can easily wrap out
favorite network interface (that uses observable style sinks) in an AsyncIterable with
@n1ru4l/push-pull-async-iterable-iterator!

Example with graphql-ws

import { pipe } from 'fp-ts/function'
import { createClient } from 'graphql-ws/lib/use/ws'
import { createApplyLiveQueryPatch } from '@n1ru4l/graphql-live-query-patch'
import { makeAsyncIterableIteratorFromSink } from '@n1ru4l/push-pull-async-iterable-iterator'

const client = createClient({
  url: 'ws://localhost:3000/graphql'
})

const applyLiveQueryPatch = createApplyLiveQueryPatch()

const asyncIterator = pipe(
  makeAsyncIterableIteratorFromSink(sink => {
    const dispose = client.subscribe(
      {
        query: 'query @live { hello }'
      },
      {
        next: sink.next,
        error: sink.error,
        complete: sink.complete
      }
    )
    return () => dispose()
  }),
  applyLiveQueryPatch
)

// stop live query after 5 seconds.
setTimeout(() => {
  asyncIterator.return()
}, 5000)

for await (const executionResult of asyncIterator) {
  console.log(executionResult)
}
Enter fullscreen mode Exit fullscreen mode

The whole package can be found
@n1ru4l/graphql-live-query-patch on GitHub.
It is also usable and feedback is highly appreciated. It currently has its flaws with list diffing,
so further thoughts and ideas are highly appreciated.

Note: We now separated the json-patch package into a separate
@n1ru4l/graphql-live-query-patch-json-patch package. @n1ru4l/graphql-live-query-patch now only
includes common logic shared between patch payload implementations. The reason for this is that
there are more efficient patch formats available, such as jsondiffpatch, which generate smaller
patches for list changes. The latter is now available as the separate package
@n1ru4l/graphql-live-query-patch-jsondiffpatch.

So let's take a look at this modular implementation approach regarding the aspects we used before.

  1. Automatically update the clients. Yes.

The approach of pushing the invalidation responsibility to the server might at first seem like a
drawback, but a smart abstraction (such as a ORM middleware), can result in pretty responsive
applications.

  1. Efficiently updates clients. Yes.

In case our execution result payloads are getting too big we can easily enable JSON patches by
adding a middleware. Furthermore, the middleware is totally live query implementation independent!
That means if our projects requires a different implementation of the live query engine, the
middleware can still be applied as long as the execution result is compatible with the live query
execution result format.

  1. Flexibility. High.

Any database or third-party API can be used as long as we can somehow invalidate the resource (via
some PubSub system etc.). As all incorporated libraries are pretty pluggable only a few can be used
and others be replaced by something that might make more sense for a specific project.

  1. Adoptability. High.

This library can be added to any existing GraphQL.js schema without any hassle if you use transport
that allows full control over the schema creation and execution phase, such as graphql-helix or
graphql-ws which you are hopefully already using! Resource invalidation code can be added to the
mutation resolvers over time gradually to reflect the needs of the frontend. The possibilities of
resource invalidation are endless and the logic for those can be added incrementally. E.g. via an
ORM middleware, in our mutation code or maybe even on our GraphQL gateway.

What's Next?

Summed up this could be the start of a pluggable live query ecosystem that is flexible enough to be
compatible with a wide range of databases, graphql transports and schema builders instead of
focusing too much on a niche. In general, instead of having less flexible and bulky servers that try
to be too opinionated and restrictive in their schema -> execution -> transport flow, GraphQL
tooling should become more modular. At the same time this, however, should not imply that there is
no need for opinionated framework-like approaches (as long as they give enough flexibility, e.g. by
being composed out of modular packages).

I hope that more people will start exploring the possibilities of GraphQL live queries and also
GraphQL in general! Eventually, we could even come up with an official live RFC for the spec!

There is a lot more to think about such as

  • Partial query re-execution (e.g. by building ad-hoc queries that only select affected resources)
  • Mechanisms for batching similar query operations (e.g. for preventing execution of same operation multiple times after invalidation)
  • ~~Better list diffing/format for live query patches (as JSON patch performs rather poorly on list diffs)~~ (Solved by introducing @n1ru4l/graphql-live-query-patch-jsondiffpatch)
  • What ever is bothering you while reading this article! 😄

If you are a enthusiastic tinkerer and plan to build something with the libraries above, share it
with me and everyone else, so we can get more insights into different use-cases and requirements for
all kind of applications!

Let's continue the discussion on Discord, Slack or
on GitHub

Here are some more resource for you to thinker around with:

GraphQL Live Query libraries and example todo app that sync across all connected clients.

Experimental GraphQL Playground that implements RFC features (defer/stream) and live queries.

More information on how the InMemoryLiveQueryStore keeps track of resources.

Further thoughts on how the Relay Spec could help backing a live query system.

Top comments (0)