If you are starting with KafkaStreams, or with streaming applications in general, sometimes is hard to come up with appropriate solutions to applications that you would previously consider trivial to implement. But it is just a matter of getting used to the new APIs and concepts, and seeing a bunch of examples.
I recently got this email inquiry (feel free to send me others!) about how KafkaStreams could be used:
I've a sensor data coming out of device and it has latitude/longitude along with other information. The device serial number is the key.
My requirement is to calculate distance between 2 consecutive messages for the device.
I've a kafka topic and each message in the topic has lat/lon and event timestamp. For example:
Device | GpsDateTime | Latitude | Longitude |
---|---|---|---|
xyz | 2016-11-30 22:38:36 | 32.685757 | -96.735942 |
xyz | 2016-11-30 22:39:07 | 32.687347 | -96.732841 |
xyz | 2016-11-30 22:39:37 | 32.68805 | -96.729726 |
I would like to create a new KStream on the above topic and enrich it with distance.
Device | GpsDateTime | Latitude | Longitude | Distance |
---|---|---|---|---|
xyz | 2016-11-30 22:38:36 | 32.685757 | -96.735942 | 0 |
xyz | 2016-11-30 22:39:07 | 32.687347 | -96.732841 | 0.340 |
xyz | 2016-11-30 22:39:37 | 32.68805 | -96.729726 | 0.302 |
What would be the best approach to refer the previous message lat/lon for a device?
As we are talking about keeping some state, the first thing that pops in our minds is that we must use a KTable, because we have drilled in our heads that state requires a DB. As we have always read that a KafkaStreams KTable is the streaming equivalent to a DB table, it seems natural to reach for a KTable for any problem in our streaming applications that requires some state to be maintained.
A possible solution for the above application would be:
(defn create-kafka-stream-topology-ktable []
(let [^StreamsBuilder builder (StreamsBuilder.)
_ (-> builder
(.stream device-data-topic)
(.groupByKey)
(.aggregate (initializer [])
(aggregator [k new-point [previous _]]
[new-point previous]))
(.toStream)
(.mapValues (value-mapper [current previous]
(if previous
(assoc current :dist (distance current previous))
(assoc current :dist 0))))
(.to "points-with-distance"))]
builder))
So we use a KTable to generate pairs of and then we just transform those two values into one, adding the distance between both values to the current-value.
Running this streaming application seems to work:
device-gsp-coords : xyz , {:lat 32.685757, :lon -96.735942, :time 2016-11-30 22:38:36 }
points-with-distance : xyz , {:lat 32.685757, :lon -96.735942, :time 2016-11-30 22:38:36, :dist 0}
device-gsp-coords : xyz , {:lat 32.687347, :lon -96.732841, :time 2016-11-30 22:39:07 }
points-with-distance : xyz , {:lat 32.687347, :lon -96.732841, :time 2016-11-30 22:39:07, :dist 0.340}
device-gsp-coords : xyz , {:lat 32.68805, :lon -96.729726, :time 2016-11-30 22:39:37 }
points-with-distance : xyz , {:lat 32.68805, :lon -96.729726, :time 2016-11-30 22:39:37, :dist 0.302}
But what happens if we get a lot of messages for a given device in a short period of time?
device-gsp-coords : xyz1 , {:lat 32.685757, :lon -96.735942, :time 2016-11-30 22:38:36, :ts 1538755525616}
device-gsp-coords : xyz1 , {:lat 32.687347, :lon -96.732841, :time 2016-11-30 22:39:07, :ts 1538755525618}
device-gsp-coords : xyz1 , {:lat 32.68805, :lon -96.729726, :time 2016-11-30 22:39:37, :ts 1538755525620}
points-with-distance : xyz1 , {:lat 32.685757, :lon -96.735942, :time 2016-11-30 22:38:36, :ts 1538755525616, :dist 0}
points-with-distance : xyz1 , {:lat 32.68805, :lon -96.729726, :time 2016-11-30 22:39:37, :ts 1538755525620, :dist 0.302}
It looks like that the middle value (the one with distance 0.340) has disappeared, but notice that the distance calculation of the last message is exactly the same previously.
Reading the documentation of the KStream#aggregate method it becomes clear what happens:
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the same key. The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of parallel running Kafka Streams instances, and the configuration parameters for cache size, and commit interval.
Note that this scenario can happen not just then device sends a lot of information in a short time, but will also happen if your application has a lot of catch up work to do, like when starting for the very first time.
But Why?
If you were to query a row in a traditional DB table at two different times, would you know how many times the row had changed between those two times? Would you be able to retrieve all those intermediate values? No.
KTables are again equivalent to DB tables, and as in these, using a KTable means that you just care about the latest state of the row/entity, which means that any previous states can be safely thrown away. Not in vain a KTable is backed up by a compacted topic.
So this becomes an excellent test to know if it is appropriate to use a KTable:
If you deleted all states but the last, would your application still be correct?
In the above example, we see that we actually care about each position. If the requirement was to know the total distance traveled since the start of time, then a KTable would be appropriate.
It is important to note that being able to throw away intermediate state is also an optimization, as thousands of input messages can end up producing just a handful of output messages, improving the processing time, and avoiding a lot of IO and compaction work.
A stateful KStream solution
In KafkaStreams, stateful transformations are not exclusive of KTables, we also found them in KStreams and in the Processor API (remember that KTables and KStreams are build on top of the Processor API).
Using the KStream#transformValues method we end up with:
(defn create-kafka-stream-topology-kstream []
(let [^StreamsBuilder builder (StreamsBuilder.)
state-name "last-device-state"
store (Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore state-name)
(EdnSerde.)
(EdnSerde.))
_ (-> builder
(.addStateStore store)
(.stream device-data-topic)
(.transformValues (value-transformer-with-store
state-name
(fn [store key current]
(let [previous (.get store key)]
(.put store key current)
(if-not previous
(assoc current :dist 0)
(assoc current :dist (distance current previous))))))
(into-array [state-name]))
(.to "points-with-distance"))]
builder))
So we manually create a state store and then we use it to store/retrieve the previous value when doing the computation.
Pretty simple and neat.
All the code can be found here, including
a Docker Compose file that will run Kafka, Zookeeper plus three instances of this service, so you can play around with it.
The details of how to build and run it are in the repository.
Top comments (0)