Deduping Messages in Kafka Streams the Right Way

Most examples I found out in the wild of how to deduplicate identical or unchanged messages, I’ve recently discovered, do it the wrong way. By wrong way I mean they used a groupByKey & aggregate to compare previous/current values and then filter out the unchanged values. This seemed like a creative way of leveraging the DSL functionality. The problem with this method, is because this wasn’t the intent of the Kafka Streams DSL’s groupBy & aggregate (which is performing aggregations), various features need to be turned off or worked around to prevent the ‘changed’ events being lost by the DSL’s optimisations.

The “right way” is a bit subjective, so to quell the inevitable quibbles, let’s settle for the “deduping messages in Kafka Streams: not the wrong way”.

The streams DSL, performs optimisations on aggregations by caching the output before sending downstream and also caching it before writing to the state store and the changelog topic it’s persisted to. Effectively doing the same as log compacting client side – before being written to any topic. The common de-dupe solution goes thus:

  1. group the stream by the key
  2. aggregate with an initial value of an empty wrapper (used to flag the returned value as changed or not)
  3. when the first value is received return something like new Pair(value, true)
  4. when the next value is received compare the previous response (previousPair.first) with newValue,
    • If you determine it hasn’t changed return
          new Pair(previousPair.first, false)  
    • If it has change return
          new Pair(newValue, true)
  5. The next step in the chain is to filter out unchanged values:
    var filtered = stream.filter(pair –> pair.second);

The Aggregate & Filter Implementation

The code roughly looks like:

builder.stream("my-input-topic")
	.groupByKey()
	.aggregate(
		() -> Pair.of(null, false),
		(key, newValue, aggregate) -> {
			if (aggregate.getLeft() != null && aggregate.getLeft().equals(newValue))
				return Pair.of(newValue, true);
			else
				return Pair.of(aggregate.getLeft(), false);
		})
	.filter(Pair::getRight) // only allow true === changed values
	.map(Pair::getLeft) // unwrap the value
	.to("my-output-topic");

The problem with this solution is the record caching. If you receive the following events: “A”, then a long enough gap then “B” followed by “B” within the commit interval and within the buffer limit, then record caching would turn the output of the aggregator:

  1. Pair(“A”, true)
  2. Pair(“B”, true)
  3. Pair(“B”, false) 

into this, which gets passed onto the filter:

  1. Pair(“A”, true)
  2. Pair(“B”, false) 

As it caches the output of the aggregation, combine results #2 & #3 into the last event received which is #3. This makes sense if your doing a max, min or sum aggregation because what you care about is the most recent total at this moment in time, not the intermediate values (in most cases that is). To turn of the record caching and get the aggregator to behave how the implementation expected you can set the following properties on the Kafka Streams instance:

Aggregate & Filter Workaround

var topologyProps = new Properties();
topologyProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
topologyProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1);
var streams = new KafkaStreams(topology, topologyProps);

But these settings are global so apply to all DSL topologies, turning off any optimising that can be gained elsewhere in your streams application.

After searching for a better solution (and now it seems to be the only one I can find) I came across this post by Jaroslaw Kijanowski. A variant/base of his proposed solution is also included in confluent’s examples and they also have a tutorial on the same: How to find distinct values in a stream of events.

The Processor API Based Deduplication

I’ve built on Jaroslaw’s solution and Confluent’s example to provide a closer drop-in solution to hopefully make it easier for people newer to Kafka Streams to see how it’s applied and to also give a good starting point for your own. I wrote a more real-world solution (close to what will be implemented in our system) with a few variations, that can be dropped into a checkout of confluent’s examples repo in this gist.

What they both show is that the simple solution to the aggregation problem is to use the Processor API. The DSL has easy integration hooks with the Processor API, the two of which I’ve include here are .transform() and .flatTransformValues(). These are stateful processors, each call to transform compares the current value with what’s stored in the StateStore using your own domain specific comparison function (to call .equals or to compare one attribute etc). The advantage of using this API is you now have more control over the store and this example use streams’ inbuilt timer to remove any “old” entries in the store with the assumption that any downstream system can tolerate duplicates – the processor is used to remove unnecessary noise from feeds like a CDC from a database changelog that gets updated regularly with a batch job and you only care about records whose fields have changed not the records that have had their timestamp update.

.transform Implementations

.The first implementation is just a merge of the two examples above into DeduplicationValueTransformerIntegrationTest.java – if the new value if different to the last value then propagate it and update the state store, otherwise update the timestamp in the state store so you can keep track of which keys are still active and not expire them. The expiration in this first example uses a windowed state store. The second solution instead uses a TimestampedKeyValueStore using customisable expiration policy – which can be easily adapted to just forget records that haven’t been changed in X days to allow periodic refreshes of the downstream caches/stores for example. The both use the DSL transform function:

var deduplicated = stream.transform(
  new DedupeValueForKey(
      (value1, value2) -> value1.equals(value2),
      retentionPeriod,
      Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
  )
);

Which when check the Javadoc states that as this transformer can change the key the stream will be marked for repartitioning. We’re not changing the key and repartitioning is a wasted operation when it's not required so if we instead use flatTransformValues, it doesn't mark it for repartitioning which is the third solution.

.flatTransformValue Implementation

var deduplicated = stream.flatTransformValues(
    new DedupeValueWithKey(
        (value1, value2) -> value1.equals(value2),
        Materialized.with(new ByteArraySerde(), new Serdes.StringSerde())
    )
);

The simplified value transformer (ignoring the expiry timer function) is:
private static class DedupeValueTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>> {
    private TimestampedKeyValueStore<K, V> eventStore;
    private final BiFunction<V, V, Boolean> valueComparator;

    @Override
    public Iterable<V> transform(K key, V value) {
        if (value == null) {
            return Collections.emptyList();
        } else {
            final Iterable<V> output;
            ValueAndTimestamp<V> storedValue = eventStore.get(key);
            if (context.timestamp() < storedValue.timestamp()) {
                // message is older than last stored, ignoring
                output = Collections.emptyList();
            } else if (isDuplicate(key, value, storedValue)) {
                output = Collections.emptyList();
                eventStore.put(key, ValueAndTimestamp.make(storedValue.value(), context.timestamp()));
            } else {
                output = Collections.singleton(value);
                eventStore.put(key, ValueAndTimestamp.make(value, context.timestamp()));
            }
            return output;
        }
    }

    private boolean isDuplicate(final K key, final V value, ValueAndTimestamp<V> storedValue) {
        boolean isDuplicate = false;
        if (storedValue != null) {
            V previous = storedValue.value();
            isDuplicate = (valueComparator.apply(previous, value) == Boolean.TRUE);
        }
        return isDuplicate;
    }
}
private class DedupeValueWithKey<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>> {
    @Override
    public ValueTransformerWithKey<K, V, Iterable<V>> get() {
        return new DedupeValueTransformer();
    }
}

Comments