How to propagate OpenTelemetry trace headers over AWS Kinesis: Part 3
Continuing our series on tracing with OpenTelemetry in AWS Kinesis, we now explore a refined approach to overcome the challenges faced in our initial attempt with the PartitionKey parameter.
Second attempt and solution: Propagate trace context through “PartitionKey
” but partition with “ExplicitHashKey
” parameter
In our initial attempt, we were able to propagate the traceparent
with the PartitionKey
parameter, but while doing this, we also disrupted the correct partition assignment of the events. So, when propagating the traceparent
with the PartitionKey
parameter, how can we ensure that the events are assigned to the correct shard in accordance with the original PartitionKey
value given by the user? The answer here is the ExplicitHashKey
parameter. But before getting into the solution, let's take a look at key space partitioning in the AWS Kinesis service.
Here is a statement about how PartitionKey
is used while partitioning records into shards from the AWS Kinesis API Reference documentation:
The partition key is used by Kinesis Data Streams to distribute data across shards. Kinesis Data Streams segregates the data records that belong to a stream into multiple shards, using the partition key associated with each data record to determine the shard to which a given data record belongs.
Partition keys are Unicode strings, with a maximum length limit of 256 characters for each key. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards. You can override hashing the partition key to determine the shard by explicitly specifying a hash value using theExplicitHashKey
parameter.
So, the hash key to be used while partitioning is calculated from PartitionKey
as follows:
HashKey = Int128(MD5(<PartitionKey>))
And, this means the range of hash key (and the Kinesis key space) is:
[0, 2^128) => [0, 340282366920938463463374607431768211455]
How creating, splitting, and merging Kinesis shards affects key ranges
Let’s play a little bit with Kinesis shards by creating, splitting, and merging them to understand how key ranges of shards are affected:
#1 - Create events
stream:
> aws kinesis create-stream --stream-name events --shard-count 1
#2 - Verify that the events
stream has been created:
> aws kinesis describe-stream --stream-name events
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570"
}
}
],
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events",
"StreamName": "events",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2023-11-14T12:31:26+03:00"
}
}
As you can see, whole key space [0, 340282366920938463463374607431768211455]
is assigned to the only single shard shardId-000000000000
#3 - Let’s increase shard count to 2
:
> aws kinesis update-shard-count --stream-name events --target-shard-count 2 --scaling-type UNIFORM_SCALING
{
"StreamName": "events",
"CurrentShardCount": 1,
"TargetShardCount": 2,
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events"
}
Increasing shard count triggers splitting shard shardId-000000000000
into children shards (shardId-000000000001
and shardId-000000000002
):
> aws kinesis describe-stream --stream-name events
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570",
"EndingSequenceNumber": "49646421264748934929924171131980572028256847859579617282"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421980524803194562316794282962492134597634174222354"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421980547103939760847417424498210407245995680202786"
}
}
],
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events",
"StreamName": "events",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2023-11-14T12:31:26+03:00"
}
}
As seen from the response, the hash key range of shards is immutable. When a shard is split, the parent shard (shardId-000000000000
here) is still available but has switched to the CLOSED
state here and its entire hash key range has been taken over (equally divided in half) by its children shards shardId-000000000001 ([0, 170141183460469231731687303715884105727])
and shardId-000000000002 ([170141183460469231731687303715884105728, 340282366920938463463374607431768211455])
.
The CLOSED
state of a shard can be identified by the presence of an EndingSequenceNumber
property in the description of that shard.
#4 - Now, let’s see what will happen when we decrease shard count back to 1
:
> aws kinesis update-shard-count --stream-name events --target-shard-count 1 --scaling-type UNIFORM_SCALING
{
"StreamName": "events",
"CurrentShardCount": 2,
"TargetShardCount": 1,
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events"
}
Decreasing shard count triggers merging shards shardId-000000000001
and shardId-000000000002
back to a new parent shard (shardId-000000000003
):
> aws kinesis describe-stream --stream-name events
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421264737784557324905820411013094940000525450477570",
"EndingSequenceNumber": "49646421264748934929924171131980572028256847859579617282"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421980524803194562316794282962492134597634174222354",
"EndingSequenceNumber": "49646421980535953567161582105852521425451370064073719826"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646421980547103939760847417424498210407245995680202786",
"EndingSequenceNumber": "49646421980558254312360112728994057143724018425579700258"
}
},
{
"ShardId": "shardId-000000000003",
"ParentShardId": "shardId-000000000001",
"AdjacentParentShardId": "shardId-000000000002",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646422307409126314624190802913520932614343535875850290"
}
}
],
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/events",
"StreamName": "events",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2023-11-14T12:31:26+03:00"
}
}
As in shard splitting, closed shards shardId-000000000001
and shardId-000000000002
still exist, but in CLOSED
state because they have EndingSequenceNumber
property which indicates that they are closed.
And the newly created shard shardId-000000000003
has its history inherited from its parent shards shardId-000000000001
and shardId-000000000002
. It points back to shardId-000000000001
as its ParentShardId
, as well as to the shardId-000000000002
as its AdjacentParentShardID
that also helped to derive it.
On the contrary of splitting, after splitting, newly created shard gets its own hash key range [0, 340282366920938463463374607431768211455]
as merge of its parents (shardId-000000000001
and shardId-000000000002
) hash key ranges ([0, 170141183460469231731687303715884105727]
and [170141183460469231731687303715884105728, 340282366920938463463374607431768211455]
)
How to use the ExplicitKey parameter
Ok, let's get back to the original problem.
We are using the PartitionKey
parameter to propagate the transparent
header and now we have to find a way to specify the hash key based on the original PartitionKey
parameter (given by the user) directly, then the AWS will put the event into the correct shard.
As we mentioned in the beginning of this section, we can use the ExplicitHashKey
parameter to specify the hash key explicitly.
Let’s remember the hash key calculation based on the PartitionKey
:
HashKey = Int128(MD5(<PartitionKey>))
And here its implementation:
private String toHashKey(String partitionKey) throws Exception {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(partitionKey.getBytes());
byte[] digest = md.digest();
BigInteger bi = new BigInteger(1, digest);
return bi.toString();
}
Ok, let’s test and verify our custom hash key calculation algorithm whether AWS Kinesis puts the records to the correct shards when partitions are specified by custom generated hash key through ExplicitHashKey
parameter.
#1 - First, we will create a Kinesis stream with 5
shards:
> aws kinesis create-stream --stream-name events --shard-count 5
#2 - Then, let’s see how the hash keys are split across shards:
> aws kinesis describe-stream --stream-name events
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "68056473384187692692674921486353642290"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646447835808079664180011482520906336196071965037953026"
}
},
{
"ShardId": "shardId-000000000001",
"HashKeyRange": {
"StartingHashKey": "68056473384187692692674921486353642291",
"EndingHashKey": "136112946768375385385349842972707284581"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646447835830380409378542105662442054468720326543933458"
}
},
{
"ShardId": "shardId-000000000002",
"HashKeyRange": {
"StartingHashKey": "136112946768375385385349842972707284582",
"EndingHashKey": "204169420152563078078024764459060926872"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646447835852681154577072728803977772741368688049913890"
}
},
{
"ShardId": "shardId-000000000003",
"HashKeyRange": {
"StartingHashKey": "204169420152563078078024764459060926873",
"EndingHashKey": "272225893536750770770699685945414569163"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646447835874981899775603351945513491014017049555894322"
}
},
{
"ShardId": "shardId-000000000004",
"HashKeyRange": {
"StartingHashKey": "272225893536750770770699685945414569164",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49646447835897282644974133975087049209286665411061874754"
}
}
],
"StreamARN": "arn:aws:kinesis:us-west-2:************:stream/test",
"StreamName": "test",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2023-11-15T09:12:34+03:00"
}
}
Here are the hash key ranges for all the 5
shards:
#3 - Then, send 20
events from the producer
application with different event group IDs (group-1, …, group-20
) by using group ID as partition key without any trace context propagation.
Here are the partition assignment results we collected from the consumer
function logs:
#4 - Then, to verify our hash key generator algorithm based on partition key (event group ID here), run our hash key generator for each of the event group IDs (group-1, …, group-20
) to generate the hash keys:
According to hash key ranges of the the 5
shards as shown in the Table 1
, with these custom generated hash keys as shown in the Table 3
, events will be assigned to the following shards:
As you can see, Kinesis shard assignments of events based on group IDs are the same with both the standard PartitionKey
method and the custom generated hash key method.
It now seems safe to conclude that by generating a custom hash key for the ExplicitHashKey
parameter, we can maintain accurate shard assignments. This allows us to use the PartitionKey
parameter effectively to propagate the trace context.
Propagating trace context through “PartitionKey
” but partition with “ExplicitHashKey
” parameter
Let’s see our new approach in action with the following implementation steps:
private PutRecordRequest injectTraceHeader(PutRecordRequest request) throws Exception {
if (!TRACE_CONTEXT_PROPAGATION_ENABLED) {
return request;
}
Span currentSpan = Span.current();
if (currentSpan == null) {
return request;
}
SpanContext currentSpanContext = currentSpan.getSpanContext();
if (currentSpanContext == null) {
return request;
}
String partitionKey = request.partitionKey();
PutRecordRequest.Builder requestBuilder = request.toBuilder();
String traceParent = String.format("00-%s-%s-%s",
currentSpanContext.getTraceId(),
currentSpanContext.getSpanId(),
currentSpanContext.getTraceFlags().asHex());
requestBuilder.partitionKey(traceParent);
if (partitionKey != null) {
requestBuilder.explicitHashKey(toHashKey(partitionKey));
}
return requestBuilder.build();
}
private String toHashKey(String partitionKey) throws Exception {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(partitionKey.getBytes());
byte[] digest = md.digest();
BigInteger bi = new BigInteger(1, digest);
return bi.toString();
}
When running our test scenario again, we can see that the event-producer
and event-consumer
applications are now included in the same trace because the trace ID is propagated via the PartitionKey
parameter in the PutRecord
request, and we didn’t break the event’s shard assignment because we set the hash key over ExplicitHashKey
parameter to put them into correct partitions.

You can check from the logs that events are partitioned by event group ID properly, not randomly based on trace ID as we did in the first attempt over the PartitionKey
parameter.





Conclusion
This 3-part series has demonstrated a practical approach to propagating the OTEL trace context through AWS Kinesis, leveraging the PartitionKey
and ExplicitHashKey
parameters without the need to modify the record data. By integrating custom code within the application, we have navigated the complexities of trace context propagation in a cloud-native environment.
Go back and read part 1 and part 2 of this series if you missed them.