DEV Community

Cover image for Quine Ingest Streams
Michael Aglietti
Michael Aglietti

Posted on • Edited on • Originally published at thatdot.com

Quine Ingest Streams

Quine is optimized to process high volumes of data in motion and then stream out high-quality insights in real-time. The ingest stream is where a streaming graph starts. It connects to data producers, transforms the data, then populates a streaming graph to be analyzed by standing queries.

Quine

Let's get under the hood to understand how ingest streams work.

Quine is fundamentally a stream-oriented data processor that uses a graph data model. This provides optimal integration with streaming data producers and consumers such as Kafka and Kinesis. Quine builds on this streaming foundation to provide batch-like capabilities by converting data stored in files to streaming data to load into the graph.

Ingest Stream Concepts

What is an Ingest Stream?

An ingest stream connects a data source to Quine and prepares the emitted data for the streaming graph. Within the ingest stream, an ingest query, written in Cypher, updates the streaming graph nodes and edges as data is received.

Backpressuring Ingest Streams

Inevitably, when streaming data producers outpace consumers, the consumer will become overwhelmed. In Quine, as an ingest stream begins to get more data than it can process, it manages the dataflow to avoid becoming overwhelmed using "backpressure."

A backpressured system does not buffer, it causes producers upstream to not send data at a rate greater than it can process. The problem with buffering is that a buffer will eventually run out of space. And then what? The system must decide what to do when the buffer is full: drop new results, drop old results, crash the system, or backpressure.

Backpressure is a protocol defining how to send a logical signal UP the stream with information about the downstream consumers readiness to receive more data. That backpressure signal follows the same path as data moving downstream, but in reverse. If downstream is not ready to consume, then upstream does does not send.

Quine uses a reactive stream implementation of backpressure, Akka Streams, built on top of the actor model to ensure that the ingestion and processing of streams are resilient.

Curious about the operational challenge associated with reactive streams? Read the Reactive Manifiesto to understand the problems faced by every streaming processor in a high-volume data pipeline.

Asynchronous, non-blocking backpressure is the only method to ensure that all data from a high-volume stream is processed without data loss or processing delays.

All Nodes Exist

With a graph data model, nodes are the primary unit of data — much like a "row" is the primary unit of data in a relational database. However, unlike traditional graph data systems, a Quine user never has to create a node directly. Instead, the system functions as if all nodes exist.

Quine represents every possible node as an existing "empty node" with no interesting history. As data streams into the system, the node becomes interesting, and Quine creates a history for the node.

We added an idFrom function to Cypher that takes any number of arguments and deterministically produces a node ID from that data. This is similar to a consistent-hashing strategy, except that the ID produced from this function is always an ID that conforms to the type chosen for the ID provider.

You will use idFrom in the ingest query part of every ingest stream that you create. For example, the absolute minimum ingest query to load incoming data into the graph is simply a wrapper around the idFrom function.

MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that
Enter fullscreen mode Exit fullscreen mode

Historical Versioning

Each node in the graph records all of its historical changes over time. When a node's properties or edges are changed, the change event and timestamp are saved to an append-only log for that particular node. This historical log can be replayed up to any desired moment in time, allowing for the system to quickly answer questions using the state of the graph as it was in the past. This is a technique known as Event Sourcing, applied individually to each node.

Syntax and Structure

The first step when defining an ingest stream is to understand the overall shape of your data. This includes identifying the data elements necessary for standing queries to use in a MATCH.

An ingest query is defined by setting a type described by the API documentation. Quine supports eight types of ingest streams. Each type has a unique form and requires a specific structure to configure properly.

For example, constructing an ingest stream via the /api/v1/ingest/{name} API endpoint to read data from standard in and store each line as a node looks similar to the example below.

{
    "type": "StandardInputIngest",
    "format": {
        "type": "CypherLine",
        "query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"
    }
}
Enter fullscreen mode Exit fullscreen mode

Quine natively reads from standard-in, passing each line into a Cypher query as: $that. A unique node ID is generated using idFrom($that). Then, each line is stored as a line parameter associated with a new node in the streaming graph.

When creating an ingest stream via the API, you are given the opportunity to name the stream with a name that has meaning. For example, you can name the above ingest stream standardIn to make it easier to reference in your application.

Alternatively, creating an ingest stream via a recipe, Quine automatically assigns a name to each stream using the format INGEST-# where the first ingest stream defined in the recipe is INGEST-1 and subsequent ingest streams are named in order with # counting up.

Here is the same ingest stream defined in a Quine Recipe.

ingestStreams:
  - type: StandardInputIngest
    format:
      type: CypherLine
      query: |-
        MATCH (n)
        WHERE id(n) = idFrom($that)
        SET n.line = $that
Enter fullscreen mode Exit fullscreen mode

Ingest Stream Reporting

Inspecting Ingest Streams via the API

Quine exposes a series of API endpoints that enable you to monitor and manage ingest streams while in operation. The complete endpoint definitions are available in the API documentation.

Let's take a look at the information available from the INGEST-1 ingest stream from the Ethereum Tag Propagation Recipe.

Start the recipe.

> java -jar quine-x.x.x.jar -r ethereum
Enter fullscreen mode Exit fullscreen mode

List the ingest streams started by the ethereum recipe using the /api/v1/ingest endpoint.

❯ curl -s "http://localhost:8080/api/v1/ingest" | jq '. | keys'
[
  "INGEST-1",
  "INGEST-2"
]
Enter fullscreen mode Exit fullscreen mode

The ethereum recipe creates two ingest streams; INGEST-1 and INGEST-2.

Now, view the ingest stream stats using the /api/v1/ingest/INGEST-1 endpoint.

❯ curl -s "http://localhost:8080/api/v1/ingest/INGEST-1" | jq
{
  "name": "INGEST-1",
  "status": "Running",
  "settings": {
    "format": {
      "query": "MATCH (BA), (minerAcc), (blk), (parentBlk)\nWHERE\n  id(blk) = idFrom('block', $that.hash)\n  AND id(parentBlk) = idFrom('block', $that.parentHash)\n  AND id(BA) = idFrom('block_assoc', $that.hash)\n  AND id(minerAcc) = idFrom('account', $that.miner)\nCREATE\n  (minerAcc)<-[:mined_by]-(blk)-[:header_for]->(BA),\n  (blk)-[:preceded_by]->(parentBlk)\nSET\n  BA:block_assoc,\n  BA.number = $that.number,\n  BA.hash = $that.hash,\n  blk:block,\n  blk = $that,\n  minerAcc:account,\n  minerAcc.address = $that.miner",
      "parameter": "that",
      "type": "CypherJson"
    },
    "url": "https://ethereum.demo.thatdot.com/blocks_head",
    "parallelism": 16,
    "type": "ServerSentEventsIngest"
  },
  "stats": {
    "ingestedCount": 57,
    "rates": {
      "count": 57,
      "oneMinute": 0.045556443551085735,
      "fiveMinute": 0.06175571100053622,
      "fifteenMinute": 0.04159128290271318,
      "overall": 0.07659077758191643
    },
    "byteRates": {
      "count": 78451,
      "oneMinute": 62.49789862393008,
      "fiveMinute": 84.92629746711795,
      "fifteenMinute": 57.22987512826503,
      "overall": 105.41446006900763
    },
    "startTime": "2022-05-17T18:56:08.161500Z",
    "totalRuntime": 744041
  }
}
Enter fullscreen mode Exit fullscreen mode

Reporting on Ingest Stream progress using a Status Query

When creating an ingest query via a recipe, you can add a status query that runs continuously. For example, the status query below prints the information for each graph node, and a link to the visualization in the web UI.

statusQuery:
  cypherQuery: MATCH (n) RETURN count(n)
Enter fullscreen mode Exit fullscreen mode

Ingest Stream Blog Series

Over the next few weeks, we will cover the most common ingest streams in separate blog posts.

  • Ingesting data from an Internet Source
  • Ingesting CSV data
  • Ingesting Log Files
  • Ingesting Data from Kafaka
  • Ingesting AWS Service Queues
  • Quine in a Data Pipeline

Top comments (0)