Package brave.kafka.streams
Class KafkaStreamsTracing
java.lang.Object
brave.kafka.streams.KafkaStreamsTracing
public final class KafkaStreamsTracing extends Object
Use this class to decorate Kafka Stream Topologies and enable Tracing.
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaStreamsTracing.Builder
-
Method Summary
Modifier and Type Method Description static KafkaStreamsTracing
create(KafkaTracing kafkaTracing)
static KafkaStreamsTracing
create(MessagingTracing messagingTracing)
static KafkaStreamsTracing
create(Tracing tracing)
<K, V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>>
filter(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Create a filter transformer.<K, V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>>
filterNot(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Create a filterNot transformer.<K, V, KR, VR>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>>flatMap(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> mapper)
Create a flatMap transformer, similar toKStream.flatMap(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.<K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V>
foreach(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
Create a foreach processor, similar toKStream.foreach(ForeachAction)
, where its action will be recorded in a new span with the indicated name.org.apache.kafka.streams.KafkaClientSupplier
kafkaClientSupplier()
Provides aKafkaClientSupplier
with tracing enabled, hence Producer and Consumer operations will be traced.org.apache.kafka.streams.KafkaStreams
kafkaStreams(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
Creates aKafkaStreams
instance with a tracing-enabledKafkaClientSupplier
.<K, V, KR, VR>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<KR,VR>>map(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> mapper)
Create a map transformer, similar toKStream.map(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.<V, VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR>
mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,VR> mapper)
Create a mapValues transformer, similar toKStream.mapValues(ValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.<K, V, VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR>
mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,V,VR> mapper)
Create a mapValues transformer, similar toKStream.mapValues(ValueMapperWithKey)
, where its mapper action will be recorded in a new span with the indicated name.<K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>
mark(String spanName)
Create a mark transformer, similar toKStream.peek(ForeachAction)
, but no action is executed.<K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>
markAsFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Create a markAsFiltered valueTransformer.<K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>
markAsNotFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Create a markAsNotFiltered valueTransformer.static KafkaStreamsTracing.Builder
newBuilder(MessagingTracing messagingTracing)
static KafkaStreamsTracing.Builder
newBuilder(Tracing tracing)
<K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>
peek(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
Create a peek transformer, similar toKStream.peek(ForeachAction)
, where its action will be recorded in a new span with the indicated name.<K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V>
processor(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processorSupplier)
Create a tracing-decoratedProcessorSupplier
<K, V, R> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R>
transformer(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformerSupplier)
Create a tracing-decoratedTransformerSupplier
<V, VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR>
valueTransformer(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformerSupplier)
Create a tracing-decoratedValueTransformerSupplier
<K, V, VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR>
valueTransformerWithKey(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKeySupplier)
Create a tracing-decoratedValueTransformerWithKeySupplier
-
Method Details
-
create
-
create
- Since:
- 5.10
-
create
- Since:
- 5.9
-
newBuilder
- Since:
- 5.10
-
newBuilder
- Since:
- 5.10
-
kafkaClientSupplier
public org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()Provides aKafkaClientSupplier
with tracing enabled, hence Producer and Consumer operations will be traced. This is mean to be used in scenariosKafkaStreams
creation is not controlled by the user but framework (e.g. Spring Kafka Streams) creates it, andKafkaClientSupplier
is accepted. -
kafkaStreams
public org.apache.kafka.streams.KafkaStreams kafkaStreams(org.apache.kafka.streams.Topology topology, Properties streamsConfig)Creates aKafkaStreams
instance with a tracing-enabledKafkaClientSupplier
. All Topology Sources and Sinks (including internal Topics) will create Spans on records processed (i.e. send or consumed). Use this instead ofKafkaStreams
constructor.Simple example:
// KafkaStreams with tracing-enabled KafkaClientSupplier KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
- See Also:
TracingKafkaClientSupplier
-
processor
public <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processor(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processorSupplier)Create a tracing-decoratedProcessorSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
- See Also:
TracingKafkaClientSupplier
-
transformer
public <K, V, R> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformer(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformerSupplier)Create a tracing-decoratedTransformerSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier) .to(outputTopic);
-
valueTransformer
public <V, VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformer(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformerSupplier)Create a tracing-decoratedValueTransformerSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier) .to(outputTopic);
-
valueTransformerWithKey
public <K, V, VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKey(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKeySupplier)Create a tracing-decoratedValueTransformerWithKeySupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier) .to(outputTopic);
-
foreach
public <K, V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V> foreach(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)Create a foreach processor, similar toKStream.foreach(ForeachAction)
, where its action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
-
peek
public <K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> peek(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)Create a peek transformer, similar toKStream.peek(ForeachAction)
, where its action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...) .to(outputTopic);
-
mark
public <K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> mark(String spanName)Create a mark transformer, similar toKStream.peek(ForeachAction)
, but no action is executed. Instead, only a span is created to represent an event as part of the stream process. A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) in a stream process.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mark("beginning-complex-map") .map(complexTransformation1) .filter(predicate) .mapValues(complexTransformation2) .transform(kafkaStreamsTracing.mark("end-complex-transformation") .to(outputTopic);
-
map
public <K, V, KR, VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> map(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> mapper)Create a map transformer, similar toKStream.map(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...) .to(outputTopic);
-
flatMap
public <K, V, KR, VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> flatMap(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> mapper)Create a flatMap transformer, similar toKStream.flatMap(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...) .to(outputTopic);
-
filter
public <K, V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>> filter(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)Create a filter transformer. WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider usingmarkAsFiltered(String, Predicate)
instead which usesValueTransformerWithKey
API instead.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...) .to(outputTopic);
-
filterNot
public <K, V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>> filterNot(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)Create a filterNot transformer. WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider usingmarkAsNotFiltered(String, Predicate)
instead which usesValueTransformerWithKey
API instead.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...) .to(outputTopic);
-
markAsFiltered
public <K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> markAsFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)Create a markAsFiltered valueTransformer. Instead of filtering, and not emitting values downstream asfilter
does;markAsFiltered
creates a span, marking it as filtered or not. If filtered, value returned will benull
and will require an additional non-null value filter to complete the filtering. This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.markAsFiltered("myFilter", (k, v) -> ...) .filterNot((k, v) -> Objects.isNull(v)) .to(outputTopic);
-
markAsNotFiltered
public <K, V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> markAsNotFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)Create a markAsNotFiltered valueTransformer. Instead of filtering, and not emitting values downstream asfilterNot
does;markAsNotFiltered
creates a span, marking it as filtered or not. If filtered, value returned will benull
and will require an additional non-null value filter to complete the filtering. This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.markAsNotFiltered("myFilter", (k, v) -> ...) .filterNot((k, v) -> Objects.isNull(v)) .to(outputTopic);
-
mapValues
public <K, V, VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,V,VR> mapper)Create a mapValues transformer, similar toKStream.mapValues(ValueMapperWithKey)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...) .to(outputTopic);
-
mapValues
public <V, VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,VR> mapper)Create a mapValues transformer, similar toKStream.mapValues(ValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...) .to(outputTopic);
-