Class KafkaStreamsTracing

java.lang.Object
brave.kafka.streams.KafkaStreamsTracing

public final class KafkaStreamsTracing
extends Object
Use this class to decorate Kafka Stream Topologies and enable Tracing.
  • Nested Class Summary

    Nested Classes 
    Modifier and Type Class Description
    static class  KafkaStreamsTracing.Builder  
  • Method Summary

    Modifier and Type Method Description
    static KafkaStreamsTracing create​(KafkaTracing kafkaTracing)  
    static KafkaStreamsTracing create​(MessagingTracing messagingTracing)  
    static KafkaStreamsTracing create​(Tracing tracing)  
    <K,​ V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filter​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
    Create a filter transformer.
    <K,​ V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filterNot​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
    Create a filterNot transformer.
    <K,​ V,​ KR,​ VR>
    org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>>
    flatMap​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> mapper)
    Create a flatMap transformer, similar to KStream.flatMap(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.
    <K,​ V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> foreach​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
    Create a foreach processor, similar to KStream.foreach(ForeachAction), where its action will be recorded in a new span with the indicated name.
    org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
    Provides a KafkaClientSupplier with tracing enabled, hence Producer and Consumer operations will be traced.
    org.apache.kafka.streams.KafkaStreams kafkaStreams​(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
    Creates a KafkaStreams instance with a tracing-enabled KafkaClientSupplier.
    <K,​ V,​ KR,​ VR>
    org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>>
    map​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> mapper)
    Create a map transformer, similar to KStream.map(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.
    <V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,​VR> mapper)
    Create a mapValues transformer, similar to KStream.mapValues(ValueMapper), where its mapper action will be recorded in a new span with the indicated name.
    <K,​ V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,​V,​VR> mapper)
    Create a mapValues transformer, similar to KStream.mapValues(ValueMapperWithKey), where its mapper action will be recorded in a new span with the indicated name.
    <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> mark​(String spanName)
    Create a mark transformer, similar to KStream.peek(ForeachAction), but no action is executed.
    <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
    Create a markAsFiltered valueTransformer.
    <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsNotFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
    Create a markAsNotFiltered valueTransformer.
    static KafkaStreamsTracing.Builder newBuilder​(MessagingTracing messagingTracing)  
    static KafkaStreamsTracing.Builder newBuilder​(Tracing tracing)  
    <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> peek​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
    Create a peek transformer, similar to KStream.peek(ForeachAction), where its action will be recorded in a new span with the indicated name.
    <K,​ V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processor​(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processorSupplier)
    Create a tracing-decorated ProcessorSupplier
    <K,​ V,​ R> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformer​(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformerSupplier)
    Create a tracing-decorated TransformerSupplier
    <V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformer​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformerSupplier)
    Create a tracing-decorated ValueTransformerSupplier
    <K,​ V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKey​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKeySupplier)
    Create a tracing-decorated ValueTransformerWithKeySupplier

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Method Details

    • create

      public static KafkaStreamsTracing create​(Tracing tracing)
    • create

      public static KafkaStreamsTracing create​(MessagingTracing messagingTracing)
      Since:
      5.10
    • create

      public static KafkaStreamsTracing create​(KafkaTracing kafkaTracing)
      Since:
      5.9
    • newBuilder

      public static KafkaStreamsTracing.Builder newBuilder​(Tracing tracing)
      Since:
      5.10
    • newBuilder

      public static KafkaStreamsTracing.Builder newBuilder​(MessagingTracing messagingTracing)
      Since:
      5.10
    • kafkaClientSupplier

      public org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
      Provides a KafkaClientSupplier with tracing enabled, hence Producer and Consumer operations will be traced. This is mean to be used in scenarios KafkaStreams creation is not controlled by the user but framework (e.g. Spring Kafka Streams) creates it, and KafkaClientSupplier is accepted.
    • kafkaStreams

      public org.apache.kafka.streams.KafkaStreams kafkaStreams​(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
      Creates a KafkaStreams instance with a tracing-enabled KafkaClientSupplier. All Topology Sources and Sinks (including internal Topics) will create Spans on records processed (i.e. send or consumed). Use this instead of KafkaStreams constructor.

      Simple example:

      
       // KafkaStreams with tracing-enabled KafkaClientSupplier
       KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
       
      See Also:
      TracingKafkaClientSupplier
    • processor

      public <K,​ V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processor​(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> processorSupplier)
      Create a tracing-decorated ProcessorSupplier

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
       
      See Also:
      TracingKafkaClientSupplier
    • transformer

      public <K,​ V,​ R> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformer​(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​R> transformerSupplier)
      Create a tracing-decorated TransformerSupplier

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier)
              .to(outputTopic);
       
    • valueTransformer

      public <V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformer​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> valueTransformerSupplier)
      Create a tracing-decorated ValueTransformerSupplier

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier)
              .to(outputTopic);
       
    • valueTransformerWithKey

      public <K,​ V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKey​(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> valueTransformerWithKeySupplier)
      Create a tracing-decorated ValueTransformerWithKeySupplier

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier)
              .to(outputTopic);
       
    • foreach

      public <K,​ V> org.apache.kafka.streams.processor.ProcessorSupplier<K,​V> foreach​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
      Create a foreach processor, similar to KStream.foreach(ForeachAction), where its action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
       
    • peek

      public <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> peek​(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,​V> action)
      Create a peek transformer, similar to KStream.peek(ForeachAction), where its action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...)
              .to(outputTopic);
       
    • mark

      public <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> mark​(String spanName)
      Create a mark transformer, similar to KStream.peek(ForeachAction), but no action is executed. Instead, only a span is created to represent an event as part of the stream process. A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) in a stream process.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.mark("beginning-complex-map")
              .map(complexTransformation1)
              .filter(predicate)
              .mapValues(complexTransformation2)
              .transform(kafkaStreamsTracing.mark("end-complex-transformation")
              .to(outputTopic);
       
    • map

      public <K,​ V,​ KR,​ VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> map​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​org.apache.kafka.streams.KeyValue<KR,​VR>> mapper)
      Create a map transformer, similar to KStream.map(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...)
              .to(outputTopic);
       
    • flatMap

      public <K,​ V,​ KR,​ VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> flatMap​(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,​V,​Iterable<org.apache.kafka.streams.KeyValue<KR,​VR>>> mapper)
      Create a flatMap transformer, similar to KStream.flatMap(KeyValueMapper), where its mapper action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...)
              .to(outputTopic);
       
    • filter

      public <K,​ V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filter​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a filter transformer. WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider using markAsFiltered(String, Predicate) instead which uses ValueTransformerWithKey API instead.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
             .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...)
             .to(outputTopic);
       
    • filterNot

      public <K,​ V> org.apache.kafka.streams.kstream.TransformerSupplier<K,​V,​org.apache.kafka.streams.KeyValue<K,​V>> filterNot​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a filterNot transformer. WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider using markAsNotFiltered(String, Predicate) instead which uses ValueTransformerWithKey API instead.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
             .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...)
             .to(outputTopic);
       
    • markAsFiltered

      public <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a markAsFiltered valueTransformer. Instead of filtering, and not emitting values downstream as filter does; markAsFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering. This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
             .transformValues(kafkaStreamsTracing.markAsFiltered("myFilter", (k, v) -> ...)
             .filterNot((k, v) -> Objects.isNull(v))
             .to(outputTopic);
       
    • markAsNotFiltered

      public <K,​ V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsNotFiltered​(String spanName, org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
      Create a markAsNotFiltered valueTransformer. Instead of filtering, and not emitting values downstream as filterNot does; markAsNotFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering. This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
             .transformValues(kafkaStreamsTracing.markAsNotFiltered("myFilter", (k, v) -> ...)
             .filterNot((k, v) -> Objects.isNull(v))
             .to(outputTopic);
       
    • mapValues

      public <K,​ V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​VR> mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,​V,​VR> mapper)
      Create a mapValues transformer, similar to KStream.mapValues(ValueMapperWithKey), where its mapper action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...)
              .to(outputTopic);
       
    • mapValues

      public <V,​ VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,​VR> mapValues​(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,​VR> mapper)
      Create a mapValues transformer, similar to KStream.mapValues(ValueMapper), where its mapper action will be recorded in a new span with the indicated name.

      Simple example using Kafka Streams DSL:

      
       StreamsBuilder builder = new StreamsBuilder();
       builder.stream(inputTopic)
              .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...)
              .to(outputTopic);