Class KafkaStreamsTracing
- java.lang.Object
-
- brave.kafka.streams.KafkaStreamsTracing
-
public final class KafkaStreamsTracing extends Object
Use this class to decorate Kafka Stream Topologies and enable Tracing.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaStreamsTracing.Builder
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static KafkaStreamsTracing
create(KafkaTracing kafkaTracing)
static KafkaStreamsTracing
create(MessagingTracing messagingTracing)
static KafkaStreamsTracing
create(brave.Tracing tracing)
<K,V>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>>filter(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
<K,V>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>>filterNot(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
<K,V,KR,VR>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>>flatMap(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> mapper)
<K,V>
org.apache.kafka.streams.processor.ProcessorSupplier<K,V>foreach(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
org.apache.kafka.streams.KafkaClientSupplier
kafkaClientSupplier()
Provides aKafkaClientSupplier
with tracing enabled, hence Producer and Consumer operations will be traced.org.apache.kafka.streams.KafkaStreams
kafkaStreams(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
Creates aKafkaStreams
instance with a tracing-enabledKafkaClientSupplier
.<K,V,KR,VR>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<KR,VR>>map(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> mapper)
<V,VR>
org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR>mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,VR> mapper)
<K,V,VR>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR>mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,V,VR> mapper)
<K,V>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>mark(String spanName)
<K,V>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>markAsFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
<K,V>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>markAsNotFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
static KafkaStreamsTracing.Builder
newBuilder(MessagingTracing messagingTracing)
static KafkaStreamsTracing.Builder
newBuilder(brave.Tracing tracing)
<K,V>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V>peek(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
<KIn,VIn,KOut,VOut>
org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,VIn,KOut,VOut>process(String spanName, org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,VIn,KOut,VOut> processorSupplier)
Create a tracing-decoratedProcessorSupplier
<K,V>
org.apache.kafka.streams.processor.ProcessorSupplier<K,V>processor(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processorSupplier)
<KIn,VIn,VOut>
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,VIn,VOut>processValues(String spanName, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,VIn,VOut> processorSupplier)
Create a tracing-decoratedFixedKeyProcessorSupplier
<K,V,R>
org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R>transformer(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformerSupplier)
<V,VR>
org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR>valueTransformer(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformerSupplier)
<K,V,VR>
org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR>valueTransformerWithKey(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKeySupplier)
-
-
-
Method Detail
-
create
public static KafkaStreamsTracing create(brave.Tracing tracing)
-
create
public static KafkaStreamsTracing create(MessagingTracing messagingTracing)
- Since:
- 5.10
-
create
public static KafkaStreamsTracing create(KafkaTracing kafkaTracing)
- Since:
- 5.9
-
newBuilder
public static KafkaStreamsTracing.Builder newBuilder(brave.Tracing tracing)
- Since:
- 5.10
-
newBuilder
public static KafkaStreamsTracing.Builder newBuilder(MessagingTracing messagingTracing)
- Since:
- 5.10
-
kafkaClientSupplier
public org.apache.kafka.streams.KafkaClientSupplier kafkaClientSupplier()
Provides aKafkaClientSupplier
with tracing enabled, hence Producer and Consumer operations will be traced.This is mean to be used in scenarios
KafkaStreams
creation is not controlled by the user but framework (e.g. Spring Kafka Streams) creates it, andKafkaClientSupplier
is accepted.
-
kafkaStreams
public org.apache.kafka.streams.KafkaStreams kafkaStreams(org.apache.kafka.streams.Topology topology, Properties streamsConfig)
Creates aKafkaStreams
instance with a tracing-enabledKafkaClientSupplier
. All Topology Sources and Sinks (including internal Topics) will create Spans on records processed (i.e. send or consumed).Use this instead of
KafkaStreams
constructor.Simple example:
// KafkaStreams with tracing-enabled KafkaClientSupplier KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
- See Also:
TracingKafkaClientSupplier
-
processor
@Deprecated public <K,V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processor(String spanName, org.apache.kafka.streams.processor.ProcessorSupplier<K,V> processorSupplier)
Deprecated.Create a tracing-decoratedProcessorSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .process(kafkaStreamsTracing.processor("my-processor", myProcessorSupplier);
- See Also:
TracingKafkaClientSupplier
-
transformer
@Deprecated public <K,V,R> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformer(String spanName, org.apache.kafka.streams.kstream.TransformerSupplier<K,V,R> transformerSupplier)
Deprecated.Create a tracing-decoratedTransformerSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.transformer("my-transformer", myTransformerSupplier) .to(outputTopic);
-
valueTransformer
@Deprecated public <V,VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformer(String spanName, org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> valueTransformerSupplier)
Deprecated.Create a tracing-decoratedValueTransformerSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.valueTransformer("my-transformer", myTransformerSupplier) .to(outputTopic);
-
valueTransformerWithKey
@Deprecated public <K,V,VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKey(String spanName, org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> valueTransformerWithKeySupplier)
Deprecated.Create a tracing-decoratedValueTransformerWithKeySupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.valueTransformerWithKey("my-transformer", myTransformerSupplier) .to(outputTopic);
-
foreach
@Deprecated public <K,V> org.apache.kafka.streams.processor.ProcessorSupplier<K,V> foreach(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
Deprecated.Create a foreach processor, similar toKStream.foreach(ForeachAction)
, where its action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
-
peek
@Deprecated public <K,V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> peek(String spanName, org.apache.kafka.streams.kstream.ForeachAction<K,V> action)
Deprecated.Create a peek transformer, similar toKStream.peek(ForeachAction)
, where its action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...) .to(outputTopic);
-
mark
@Deprecated public <K,V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> mark(String spanName)
Deprecated.Create a mark transformer, similar toKStream.peek(ForeachAction)
, but no action is executed. Instead, only a span is created to represent an event as part of the stream process.A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) in a stream process.
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mark("beginning-complex-map") .map(complexTransformation1) .filter(predicate) .mapValues(complexTransformation2) .transform(kafkaStreamsTracing.mark("end-complex-transformation") .to(outputTopic);
-
map
@Deprecated public <K,V,KR,VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> map(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,org.apache.kafka.streams.KeyValue<KR,VR>> mapper)
Deprecated.Create a map transformer, similar toKStream.map(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...) .to(outputTopic);
-
flatMap
@Deprecated public <K,V,KR,VR> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> flatMap(String spanName, org.apache.kafka.streams.kstream.KeyValueMapper<K,V,Iterable<org.apache.kafka.streams.KeyValue<KR,VR>>> mapper)
Deprecated.Create a flatMap transformer, similar toKStream.flatMap(KeyValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .flatTransform(kafkaStreamsTracing.flatMap("myflatMap", (k, v) -> ...) .to(outputTopic);
-
filter
@Deprecated public <K,V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>> filter(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Deprecated.Create a filter transformer.WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter.
In that case, consider using
markAsFiltered(String, Predicate)
instead which usesValueTransformerWithKey
API instead.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.filter("myFilter", (k, v) -> ...) .to(outputTopic);
-
filterNot
@Deprecated public <K,V> org.apache.kafka.streams.kstream.TransformerSupplier<K,V,org.apache.kafka.streams.KeyValue<K,V>> filterNot(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Deprecated.Create a filterNot transformer.WARNING: this filter implementation uses the Streams transform API, meaning that re-partitioning can occur if a key modifying operation like grouping or joining operation is applied after this filter. In that case, consider using
markAsNotFiltered(String, Predicate)
instead which usesValueTransformerWithKey
API instead.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transform(kafkaStreamsTracing.filterNot("myFilter", (k, v) -> ...) .to(outputTopic);
-
markAsFiltered
@Deprecated public <K,V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> markAsFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Deprecated.Create a markAsFiltered valueTransformer.Instead of filtering, and not emitting values downstream as
filter
does;markAsFiltered
creates a span, marking it as filtered or not. If filtered, value returned will benull
and will require an additional non-null value filter to complete the filtering.This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.markAsFiltered("myFilter", (k, v) -> ...) .filterNot((k, v) -> Objects.isNull(v)) .to(outputTopic);
-
markAsNotFiltered
@Deprecated public <K,V> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,V> markAsNotFiltered(String spanName, org.apache.kafka.streams.kstream.Predicate<K,V> predicate)
Deprecated.Create a markAsNotFiltered valueTransformer.Instead of filtering, and not emitting values downstream as
filterNot
does;markAsNotFiltered
creates a span, marking it as filtered or not. If filtered, value returned will benull
and will require an additional non-null value filter to complete the filtering.This operation is offered as lack of a processor that allows to continue conditionally with the processing without risk of accidental re-partitioning.
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.markAsNotFiltered("myFilter", (k, v) -> ...) .filterNot((k, v) -> Objects.isNull(v)) .to(outputTopic);
-
mapValues
@Deprecated public <K,V,VR> org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier<K,V,VR> mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapperWithKey<K,V,VR> mapper)
Deprecated.Create a mapValues transformer, similar toKStream.mapValues(ValueMapperWithKey)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...) .to(outputTopic);
-
mapValues
@Deprecated public <V,VR> org.apache.kafka.streams.kstream.ValueTransformerSupplier<V,VR> mapValues(String spanName, org.apache.kafka.streams.kstream.ValueMapper<V,VR> mapper)
Deprecated.Create a mapValues transformer, similar toKStream.mapValues(ValueMapper)
, where its mapper action will be recorded in a new span with the indicated name.Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .transformValues(kafkaStreamsTracing.mapValues("myMapValues", v -> ...) .to(outputTopic);
-
process
public <KIn,VIn,KOut,VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,VIn,KOut,VOut> process(String spanName, org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn,VIn,KOut,VOut> processorSupplier)
Create a tracing-decoratedProcessorSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .process(kafkaStreamsTracing.process("my-processor", myProcessorSupplier);
- See Also:
TracingKafkaClientSupplier
-
processValues
public <KIn,VIn,VOut> org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,VIn,VOut> processValues(String spanName, org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier<KIn,VIn,VOut> processorSupplier)
Create a tracing-decoratedFixedKeyProcessorSupplier
Simple example using Kafka Streams DSL:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic) .processValues(kafkaStreamsTracing.processValues("my-processor", myFixedKeyProcessorSupplier);
- See Also:
TracingKafkaClientSupplier
-
-