Class KafkaStreamsTracing


  • public final class KafkaStreamsTracing
    extends Object
    Use this class to decorate Kafka Stream Topologies and enable Tracing.
    • Method Detail

      • kafkaClientSupplier

        public org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
        Provides a KafkaClientSupplier with tracing enabled, hence Producer and Consumer operations will be traced.

        This is mean to be used in scenarios KafkaStreams creation is not controlled by the user but framework (e.g. Spring Kafka Streams) creates it, and KafkaClientSupplier is accepted.

      • kafkaStreams

        public org.apache.kafka.streams.KafkaStreams kafkaStreams​(org.apache.kafka.streams.Topology topology,
                                                                  Properties streamsConfig)
        Creates a KafkaStreams instance with a tracing-enabled KafkaClientSupplier. All Topology Sources and Sinks (including internal Topics) will create Spans on records processed (i.e. send or consumed).

        Use this instead of KafkaStreams constructor.

        Simple example:

        
         // KafkaStreams with tracing-enabled KafkaClientSupplier
         KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
         
        See Also:
        TracingKafkaClientSupplier
      • mark

        @Deprecated
        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> mark​(String spanName)
        Create a mark transformer, similar to KStream.peek(ForeachAction), but no action is executed. Instead, only a span is created to represent an event as part of the stream process.

        A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) in a stream process.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .transformValues(kafkaStreamsTracing.mark("beginning-complex-map")
                .map(complexTransformation1)
                .filter(predicate)
                .mapValues(complexTransformation2)
                .transform(kafkaStreamsTracing.mark("end-complex-transformation")
                .to(outputTopic);
         
      • markAsFiltered

        @Deprecated
        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsFiltered​(String spanName,
                                                                                                                              org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a markAsFiltered valueTransformer.

        Instead of filtering, and not emitting values downstream as filter does; markAsFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering.

        This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transformValues(kafkaStreamsTracing.markAsFiltered("myFilter", (k, v) -> ...)
               .filterNot((k, v) -> Objects.isNull(v))
               .to(outputTopic);
         
      • markAsNotFiltered

        @Deprecated
        public <K,​V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,​V,​V> markAsNotFiltered​(String spanName,
                                                                                                                                 org.apache.kafka.streams.kstream.Predicate<K,​V> predicate)
        Create a markAsNotFiltered valueTransformer.

        Instead of filtering, and not emitting values downstream as filterNot does; markAsNotFiltered creates a span, marking it as filtered or not. If filtered, value returned will be null and will require an additional non-null value filter to complete the filtering.

        This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
               .transformValues(kafkaStreamsTracing.markAsNotFiltered("myFilter", (k, v) -> ...)
               .filterNot((k, v) -> Objects.isNull(v))
               .to(outputTopic);
         
      • process

        public <KIn,​VIn,​KOut,​VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,​VIn,​KOut,​VOut> process​(String spanName,
                                                                                                                                                           org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,​VIn,​KOut,​VOut> processorSupplier)
        Create a tracing-decorated ProcessorSupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .process(kafkaStreamsTracing.process("my-processor", myProcessorSupplier);
         
        See Also:
        TracingKafkaClientSupplier
      • processValues

        public <KIn,​VIn,​VOut> org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,​VIn,​VOut> processValues​(String spanName,
                                                                                                                                                   org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,​VIn,​VOut> processorSupplier)
        Create a tracing-decorated FixedKeyProcessorSupplier

        Simple example using Kafka Streams DSL:

        
         StreamsBuilder builder = new StreamsBuilder();
         builder.stream(inputTopic)
                .processValues(kafkaStreamsTracing.processValues("my-processor", myFixedKeyProcessorSupplier);
         
        See Also:
        TracingKafkaClientSupplier