Class KafkaStreamsTracing


  • public final class KafkaStreamsTracing
    extends Object
    Use this class to decorate Kafka Stream Topologies and enable Tracing.
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      static KafkaStreamsTracing create​(KafkaTracing kafkaTracing)  
      static KafkaStreamsTracing create​(MessagingTracing messagingTracing)  
      static KafkaStreamsTracing create​(Tracing tracing)  
      <K,​V>
      org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>>
      filter​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a filter transformer.
      <K,​V>
      org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>>
      filterNot​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a filterNot transformer.
      <K,​V,​KR,​VR>
      org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>>
      flatMap​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> mapper)
      Create a flatMap transformer, similar to KStream.flatMap(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.
      <K,​V>
      org.apache.kafka.streams.processor.ProcessorSupplier<K,​V>
      foreach​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
      Create a foreach processor, similar to KStream.foreach(ForeachAction), where its action will be recorded in a new span with the indicated name.
      org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
      Provides a KafkaClientSupplier with tracing enabled, hence Producer and Consumer operations will be traced.
      org.apache.kafka.streams.KafkaStreams kafkaStreams​(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
      Creates a KafkaStreams instance with a tracing-enabled KafkaClientSupplier.
      <K,​V,​KR,​VR>
      org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>>
      map​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> mapper)
      Create a map transformer, similar to KStream.map(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.
      <V,​VR>
      org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR>
      mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,​VR> mapper)
      Create a mapValues transformer, similar to KStream.mapValues(ValueMapper), where its mapper action will be recorded in a new span with the indicated name.
      <K,​V,​VR>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR>
      mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,​V,​VR> mapper)
      Create a mapValues transformer, similar to KStream.mapValues(ValueMapperWithKey), where its mapper action will be recorded in a new span with the indicated name.
      <K,​V>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V>
      mark​(String spanName)
      Create a mark transformer, similar to KStream.peek(ForeachAction), but no action is executed.
      <K,​V>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V>
      markAsFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a markAsFiltered valueTransformer.
      <K,​V>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V>
      markAsNotFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a markAsNotFiltered valueTransformer.
      static KafkaStreamsTracing.Builder newBuilder​(MessagingTracing messagingTracing)  
      static KafkaStreamsTracing.Builder newBuilder​(Tracing tracing)  
      <K,​V>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V>
      peek​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
      Create a peek transformer, similar to KStream.peek(ForeachAction), where its action will be recorded in a new span with the indicated name.
      <K,​V>
      org.apache.kafka.streams.processor.ProcessorSupplier<K,​V>
      processor​(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processorSupplier)
      Create a tracing-decorated ProcessorSupplier
      <K,​V,​R>
      org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R>
      transformer​(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformerSupplier)
      Create a tracing-decorated TransformerSupplier
      <V,​VR>
      org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR>
      valueTransformer​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformerSupplier)
      Create a tracing-decorated ValueTransformerSupplier
      <K,​V,​VR>
      org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR>
      valueTransformerWithKey​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKeySupplier)
      Create a tracing-decorated ValueTransformerWithKeySupplier
    • Method Detail

      • kafkaClientSupplier

        public org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
        Provides a KafkaClientSupplier with tracing enabled, hence Producer and Consumer operations will be traced.

        This is mean to be used in scenarios KafkaStreams creation is not controlled by the user but framework (e.g. Spring Kafka Streams) creates it, and KafkaClientSupplier is accepted.

      • kafkaStreams

        public org.apache.kafka.streams.KafkaStreams kafkaStreams​(org.apache.kafka.streams.Topology topology,
                                                                  Properties streamsConfig)
        Creates a KafkaStreams instance with a tracing-enabled KafkaClientSupplier. All Topology Sources and Sinks (including internal Topics) will create Spans on records processed (i.e. send or consumed).

        Use this instead of KafkaStreams constructor.

        Simple example:

        
         // KafkaStreams with tracing-enabled KafkaClientSupplier
         KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
         
        See Also:
        TracingKafkaClientSupplier
      • processor

        public <K,​V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processor​(String spanName,
                                                                                                     org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processorSupplier)
        Create a tracing-decorated ProcessorSupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
         
        See Also:
        TracingKafkaClientSupplier
      • transformer

        public <K,​V,​R> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformer​(String spanName,
                                                                                                                       org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformerSupplier)
        Create a tracing-decorated TransformerSupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier)
                .to(outputTopic);
         
      • valueTransformer

        public <V,​VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformer​(String spanName,
                                                                                                                   org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformerSupplier)
        Create a tracing-decorated ValueTransformerSupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier)
                .to(outputTopic);
         
      • valueTransformerWithKey

        public <K,​V,​VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKey​(String spanName,
                                                                                                                                                 org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKeySupplier)
        Create a tracing-decorated ValueTransformerWithKeySupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier)
                .to(outputTopic);
         
      • foreach

        public <K,​V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> foreach​(String spanName,
                                                                                                   org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
        Create a foreach processor, similar to KStream.foreach(ForeachAction), where its action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
         
      • peek

        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> peek​(String spanName,
                                                                                                                    org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
        Create a peek transformer, similar to KStream.peek(ForeachAction), where its action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...)
                .to(outputTopic);
         
      • mark

        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> mark​(String spanName)
        Create a mark transformer, similar to KStream.peek(ForeachAction), but no action is executed. Instead, only a span is created to represent an event as part of the stream process.

        A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) in a stream process.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.mark("beginning-complex-map")
                .map(complexTransformation1)
                .filter(predicate)
                .mapValues(complexTransformation2)
                .transform(kafkaStreamsTracing.mark("end-complex-transformation")
                .to(outputTopic);
         
      • map

        public <K,​V,​KR,​VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> map​(String spanName,
                                                                                                                                                                      org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> mapper)
        Create a map transformer, similar to KStream.map(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...)
                .to(outputTopic);
         
      • flatMap

        public <K,​V,​KR,​VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> flatMap​(String spanName,
                                                                                                                                                                                    org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> mapper)
        Create a flatMap transformer, similar to KStream.flatMap(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...)
                .to(outputTopic);
         
      • filter

        public <K,​V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filter​(String spanName,
                                                                                                                                                     org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a filter transformer.

        WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter.

        In that case, consider using markAsFiltered(String, Predicate) instead which uses ValueTransformerWithKey API instead.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...)
               .to(outputTopic);
         
      • filterNot

        public <K,​V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filterNot​(String spanName,
                                                                                                                                                        org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a filterNot transformer.

        WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider using markAsNotFiltered(String, Predicate) instead which uses ValueTransformerWithKey API instead.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...)
               .to(outputTopic);
         
      • markAsFiltered

        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsFiltered​(String spanName,
                                                                                                                              org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a markAsFiltered valueTransformer.

        Instead of filtering, and not emitting values downstream as filter does; markAsFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering.

        This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transformValues(kafkaStreamsTracing.markAsFiltered("myFilter", (k, v) -> ...)
               .filterNot((k, v) -> Objects.isNull(v))
               .to(outputTopic);
         
      • markAsNotFiltered

        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsNotFiltered​(String spanName,
                                                                                                                                 org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a markAsNotFiltered valueTransformer.

        Instead of filtering, and not emitting values downstream as filterNot does; markAsNotFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering.

        This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transformValues(kafkaStreamsTracing.markAsNotFiltered("myFilter", (k, v) -> ...)
               .filterNot((k, v) -> Objects.isNull(v))
               .to(outputTopic);
         
      • mapValues

        public <K,​V,​VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> mapValues​(String spanName,
                                                                                                                                   org.apache.kafka.streams.kstream.ValueMapperWithKey<K,​V,​VR> mapper)
        Create a mapValues transformer, similar to KStream.mapValues(ValueMapperWithKey), where its mapper action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...)
                .to(outputTopic);
         
      • mapValues

        public <V,​VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> mapValues​(String spanName,
                                                                                                            org.apache.kafka.streams.kstream.ValueMapper<V,​VR> mapper)
        Create a mapValues transformer, similar to KStream.mapValues(ValueMapper), where its mapper action will be recorded in a new span with the indicated name.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...)
                .to(outputTopic);