Kinesis client built upon Amazon's KCL (Kinesis Client Library) & KPL (Kinesis Producer Library).
It's worth familiarising yourself with Sequence numbers and Sub sequence numbers.
- Dependency Resolution
- Considerations When Using Kinesis in a Distributed Environment
- Usage
- Running the reliability test
- FAQ
- Contributor Guide
- Contribution policy
- License
SBT
"com.weightwatchers" %% "reactive-kinesis" % "0.5.5"
Maven 2.11
<dependency>
<groupId>com.weightwatchers</groupId>
<artifactId>reactive-kinesis_2.11</artifactId>
<version>0.5.5</version>
<type>pom</type>
</dependency>
Maven 2.12
<dependency>
<groupId>com.weightwatchers</groupId>
<artifactId>reactive-kinesis_2.12</artifactId>
<version>0.5.5</version>
<type>pom</type>
</dependency>
Snapshots will be published here
You will need the following resolver for snapshots:
https://oss.jfrog.org/artifactory/oss-snapshot-local
From http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html:
Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances.
For our purposes, this means any service reading from Kinesis should expect to have one shard per instance, as a minimum. Note that this is specifically for consuming events. Producers don't have the same shard restrictions.
Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.
You should make sure that the DynamoDB table used for checkpointing your stream
-
Has a reasonable write throughput defined
-
Is cleaned up when you're done with it -- KCL will not automatically delete it for you
The checkpointer will automatically throttle if the write throughput is not sufficient, look out for the following info log:
Throttled by DynamoDB on checkpointing -- backing off...
The stream you have configured must already exist in AWS, e.g.
aws kinesis create-stream --stream-name core-test-foo --shard-count 1
For local development, it's expected that you already have a file called ~/.aws/credentials
,
which contains an AWS access key and secret e.g.
[default]
aws_access_key_id=AKIAXXXXXXXXX999999X
aws_secret_access_key=AAAAAAAAAAAA000000000+AAAAAAAAAAAAAAAAAA
Both the producer and consumer use the DefaultAWSCredentialsProviderChain.
You'll need some configuration values provided in the application which leverages this library, As a minimum you'll need:
kinesis {
application-name = "SampleService"
# The name of the this producer, we can have many producers per application.
# MUST contain the stream-name as a minimum. Any additional settings defined will override
# defaults in the kinesis.producer reference.conf for this producer only.
some-producer {
# The name of the producer stream
stream-name = "sample-producer"
kpl {
Region = us-east-1
}
}
# The name of the consumer, we can have many consumers per application
some-consumer {
# The name of the consumer stream, MUST be specified per consumer and MUST exist
stream-name = "sample-consumer"
}
some-other-consumer {
stream-name = "another-sample-consumer"
}
}
These values will override the default reference.conf.
See src/main/resources/reference.conf
for a complete reference configuration.
and src/it/resources/application.conf
for a more detailed override example.
The name of the producer/consumer configuration value MUST match what is specified when instantiating the library.
This will be merged with the default-consumer
/default-producer
from reference.conf
configuration at runtime.
Once these are defined, you can pass them into the Kinesis producer and consumer using a config object (see code examples below).
Note that the application-name
is combined with the stream-name
for each consumer to define the DynamoDB table for checkpointing.
For example: SampleService-sample-consumer
.
kinesis.<consumer-name>.akka.dispatcher
- Sets the dispatcher for the consumer, defaults tokinesis.akka.default-dispatcher
kinesis.<consumer-name>.worker.batchTimeoutSeconds
- The timeout for processing a batch. Note that any messages not processed within this time will be retried (according to the configuration). After retrying any unconfirmed messages will be considered failed.kinesis.<consumer-name>.worker.failedMessageRetries
- The number of times to retry failed messages within a batch, after the batch timeout.kinesis.<consumer-name>.worker.failureTolerancePercentage
- If, after retrying, messages are still unconfirmed. We will either continue processing the next batch, or shutdown processing completely depending on this tolerance percentage.kinesis.<consumer-name>.checkpointer.backoffMillis
- When DynamoDB throttles us (due to hitting the write threshold) we wait for this amount of time.kinesis.<consumer-name>.checkpointer.intervalMillis
- The interval between checkpoints. Setting this too high will cause lots of messages to be duplicated in event of a failed node. Setting it too low will result in throttling from DynamoDB.kinesis.<consumer-name>.kcl.initialPositionInStream
- Controls our strategy for pulling from Kinesis (LATEST, TRIM_HORIZON, ..)kinesis.<consumer-name>.kcl.maxRecords
- The maximum batch size.
kinesis.<producer-name>.akka.dispatcher
- Sets the dispatcher for the producer, defaults tokinesis.akka.default-dispatcher
kinesis.<producer-name>.akka.max-outstanding-requests
- Enables artificial throttling within the Producer. This limits the number of futures in play at any one time. Each message creates a new future (internally in the KPL), which allows us to track the progress of sent messages when they go with the next batch.kinesis.<producer-name>.akka.throttling-retry-millis
- How soon to retry after hitting the above throttling cap.kinesis.<producer-name>.kpl.AggregationEnabled
- Enables aggregation of messages.kinesis.<producer-name>.kpl.Region
- The AWS Region to use.kinesis.<producer-name>.kpl.RateLimit
- Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.
Implementing the consumer requires a simple actor which is responsible for processing messages sent to it by the library.
We call this the Event Processor
.
Upon creating an instance of the KinesisConsumer
, internally one ConsumerWorker
(this is different from the KCL Worker) is
created per shard (shards are distributed amongst consumers automatically). This consumer worker is what sends messages to the
Event Processor
. Note that the Event Processor
is shared amongst ALL shards, so it is important not to cache the sender of previous messages.
It is perfectly valid to use a router to spread the work amongst many Event Processor
actors.
import akka.actor.{Actor, Props}
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
class TestEventProcessor() extends Actor with LazyLogging {
import scala.concurrent.duration._
implicit val timeout = akka.util.Timeout(5.minutes)
// scalastyle:off method.length
override def receive: Receive = {
case ProcessEvent(event) => {
//Do some processing here...
sender ! EventProcessed(event.sequenceNumber)
}
case ConsumerWorkerFailure(failedEvents, shardId) =>
// Consumer failure, no more messages will be consumed!! Depending on the purpose of this service this may be critical.
case ConsumerShutdown(shardId) =>
// The Consumer has shutdown all shards on this instance (gracefully, or as a result of a failure).
}
}
object Consumer extends App {
val system = akka.actor.ActorSystem.create("test-system")
val config = ConfigFactory.load()
val eventProcessor = system.actorOf(Props[TestEventProcessor], "test-processor")
val consumer = KinesisConsumer(ConsumerConf(config.getConfig("kinesis"), "some-consumer"),
eventProcessor, system)
consumer.start()
}
The following types must be handled:
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}
/**
* Sent to the eventProcessor for each message in the batch.
*/
case class ProcessEvent(consumerEvent: ConsumerEvent)
/**
* Expected in response to a [[ProcessEvent]] message after processing is complete
*
* @param compoundSeqNo This is a combination of the sequence and subsequence numbers
* @param successful Set this to false to skip this message.
*/
case class EventProcessed(compoundSeqNo: CompoundSequenceNumber, successful: Boolean = true)
/**
* Sent to the eventProcessor if batch processing fails (above the tolerance after retrying)
* before shutting down processing on this shard.
* @param failedEvents The events that failed processing within the time.
* @param shardId The shardId of the worker causing the failure.
*/
case class ConsumerWorkerFailure(failedEvents: Seq[ConsumerEvent], shardId: String)
/**
* Sent to the eventProcessor upon shutdown of this worker.
*/
case class ConsumerShutdown(shardId: String)
- The Event Processor MUST handle [[ProcessEvent]] messages (for each message)
- The Event Processor MUST respond with [[EventProcessed]] after processing of the [[ProcessEvent]]
- The Event Processor may set
successful
to false to indicate the message can be skipped - The Event Processor SHOULD handle [[ConsumerWorkerFailure]] messages which signal a critical failure in the Consumer.
- The Event Processor SHOULD handle [[ConsumerShutdown]] messages which siganl a graceful shutdown of the Consumer.
The client will handle checkpointing asynchronously PER SHARD according to the configuration using a separate actor.
Currently the KinesisConsumer Shutdown works as follows:
- Shutdown is called on the
KinesisConsumer
(either explicitly or via the jvm shutdown hook) - This then calls
requestShutdown
on the KCL Worker, blocking until completion. - The KCL Worker propagates this down to the
ConsumerProcessingManager
(Which is theIRecordProcessor
) - callingshutdownRequested
on each instance (one per shard). - When
shutdownRequested
is called, this sends aGracefulShutdown
message to theConsumerWorker
Actor, blocking until a response is received (Ask + Await). - On receipt of this message, the
ConsumerWorker
switches context to ignore all future messages. If a batch is currently being processed, it responds to the sender of that batch (the manager), which will currently be blocking awaiting confirmation of the batch (this is by design, the KCL requires that we don't complete theprocessRecords
function until we have finished the batch, otherwise the next batch is immediately sent) - The
ConsumerWorker
then forces a final checkpoint, responding to the manager once completed (or failed), which allows shutdown to continue and theKinesisConsumer
to shutdown. - The shutdown timeout is configured by:
kinesis.<consumer-name>.worker.shutdownTimeoutSeconds
- The shutdown hooks can be disabled using:
kinesis.<consumer-name>.worker.gracefulShutdownHook
The KPL Client sends messages in batches, each message creates a Future which completes upon successful send or failure.
See Amazon's documentation for more information: https://github.com/awslabs/amazon-kinesis-producer
This implementation will optionally throttle of the number of futures currently in play,
according to the max-outstanding-requests
property.
Each future is handled within the actor and a message will be returned to the sender upon completion.
The following messages are supported:
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}
/**
* Send a message to Kinesis, registering a callback response of [[SendSuccessful]] or [[SendFailed]] accordingly.
*/
case class SendWithCallback(producerEvent: ProducerEvent, messageId: String = UUID_GENERATOR.generate().toString)
/**
* Send a message to Kinesis witout any callbacks. Fire and forget.
*/
case class Send(producerEvent: ProducerEvent)
/**
* Sent to the sender in event of a successful completion.
*
* @param messageId The id of the event that was sent.
* @param userRecordResult The Kinesis data regarding the send.
*/
case class SendSuccessful(messageId: String, userRecordResult: UserRecordResult)
/**
* Sent to the sender in event of a failed completion.
*
* @param messageId The id of the event that failed.
* @param reason The exception causing the failure. Likely to be of type [[UserRecordFailedException]]
*/
case class SendFailed(messageId: String, reason: Throwable)
import java.util.UUID
import akka.actor.Actor
import com.typesafe.config.Config
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}
import samples.SomeActor.DoSomething
object SomeActor {
case object DoSomething
}
class SomeActor(kinesisConfig: Config) extends Actor {
val kpa = context.actorOf(
KinesisProducerActor.props(kinesisConfig, "some-producer"))
override def receive: Receive = {
case DoSomething =>
//Do something exciting!
val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
kpa ! SendWithCallback(producerEvent)
//Callbacks from the KinesisProducerActor
case SendSuccessful(messageId, _) =>
println(s"Successfully sent $messageId")
case SendFailed(messageId, reason) =>
println(s"Failed to send $messageId, cause: ${reason.getMessage}")
}
}
import java.util.UUID
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.Send
implicit val system = akka.actor.ActorSystem.create()
val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
// where testProducer is the name in the configuration
val kpa = system.actorOf(KinesisProducerActor.props(kinesisConfig, "some-producer"))
val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
kpa ! Send(producerEvent) //Send without a callback confirmation
Note that future throttling will be unavailable using this method.
import java.util.UUID
import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult}
import com.weightwatchers.reactive.kinesis.producer.KinesisProducer
import com.weightwatchers.reactive.kinesis.producer.ProducerConf
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global //Not for production
val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))
val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
val callback: Future[UserRecordResult] = kpl.addUserRecord(producerEvent)
callback onSuccess {
case result =>
println("Success!!")
}
callback onFailure {
case ex: UserRecordFailedException =>
println(s"Failure! ${ex.getMessage}")
case ex =>
println(s"Critical Failure! ${ex.getMessage}")
}
Execute this command in a shell. If you don't have access to WW AWS resources, you'll need it:
aws kinesis delete-stream --stream-name test-kinesis-reliability && aws dynamodb delete-table --table-name KinesisReliabilitySpec && sleep 90 && aws kinesis create-stream --stream-name test-kinesis-reliability --shard-count 2
Run the SimpleKinesisProducer
using the App object.
Wait for hte producer to publish all messages.
At the end the producer will print the number of unconfirmed and failed messages (both should be 0!).
Run the SimpleKinesisConsumer
using the App object.
Now, wait for two messages that look like this to appear in the consumer window:
2016-06-14 20:09:24,938 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000001
2016-06-14 20:09:24,963 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000000
As the test progresses, watch the consumer window for a message of this format:
**** PIT STOP OK
You'll see some stats logged regarding messages/sec processed, near that line.
- How is DynamoDB used in relation to out checkpointing?
- DynamoDB tables will be automatically created, however the write throughput must be configured appropriately using the AWS console or CLI. There is a cost associated with this, but note that setting it too low will cause checkpoint throttling. Configure
kinesis.<consumer-name>.checkpointer.intervalMillis
accordingly.
- DynamoDB tables will be automatically created, however the write throughput must be configured appropriately using the AWS console or CLI. There is a cost associated with this, but note that setting it too low will cause checkpoint throttling. Configure
- How is data sharded?
- Sharding relates to the distribution of messages across the shards for a given stream. Ideally you want an even distribution amongst our shards. However ordering is only guaranteed within a given shard, it is therefore important to group related messages by shard. For example if a specific user performs several operations, in which the order of execution matters, then ensuring they land on the same shard will guarantee the order is maintained.
- For this, the
partition key
is used. Messages with the same partition key will land on the same shard. In the example above a userId may be a goodpartition key
. - Note that if the number of partition keys exceeds the number of shards, some shards necessarily contain records with different partition keys. From a design standpoint, to ensure that all your shards are well utilized, the number of shards should be substantially less than the number of unique partition keys.
- How long do we keep data?
- By default Kinesis stores data for 24 hours, however this can be extended to 7 days for a fee.
- What data is already in the Stream and from where is it consumed?
- Kinesis persists data up until the retention period, consuming data does not remove it from the stream. Rather, it moves your
checkpoint
to the appropriate position. - There are a number of options for configuring at which position in the stream a consumer will start, configured by the
initialPositionInStream
setting. - This library default to
TRIM_HORIZON
, note that if a checkpoint exists in DynamoDB, the application will always continue from the last checkpoint.AT_SEQUENCE_NUMBER
- Start reading from the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.AFTER_SEQUENCE_NUMBER
- Start reading right after the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.AT_TIMESTAMP
- Start reading from the position denoted by a specific timestamp, provided in the value Timestamp.TRIM_HORIZON
- Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.LATEST
- Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
- Kinesis persists data up until the retention period, consuming data does not remove it from the stream. Rather, it moves your
- How do sequence numbers work?
- Sequence numbers for the same partition key generally increase over time, but NOT necessarily in a continuous sequence. The longer the time period between records, the bigger the gap between the sequence numbers.
- To uniquely identify a record on a shard, you need to use BOTH the
sequence number
and thesub-sequence number
. This is because messages that are aggregated together have the same sequence number (they are treated as one messages by Kinesis). Therefore it is important to also use the sub-sequence number to distinguish between them.
This project uses scalafmt and will automatically fail the build if any files do not match the expected formatting.
Please run sbt scalafmt
before committing and pushing changes.
As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:
docker-compose -f localstack/docker-compose.yml up
sbt it:test
Uses tags and sbt-git to determine the current version.
- Each merge into master will automatically build a snapshot and publish to bintray OSS artifactory.
- Tagging the master branch will automatically build and publish both Scala 2.11 & Scala 2.12 artifacts (to bintray and maven central).
- Tags are in the format vX.X.X
- IF the current commit is tagged with "vX.Y.Z" (ie semantic-versioning), the version is "X.Y.Z"
- ELSE IF the current commit is tagged with "vX.Y.Z-Mx", the version is "X.Y.Z-Mx"
- ELSE IF the latest found tag is "vX.Y.Z", the version is "X.Y.Z-commitsSinceVersion-gCommitHash-SNAPSHOT"
- ELSE the version is "0.0.0-commitHash-SNAPSHOT"
v1.2.3 (version=1.2.3) v1.2.3-M1 (version=1.2.3-M1)
v1.2.3-SNAPSHOT v1.2.3-M1-SNAPSHOT v1.2.3-X1 1.2.3
If the current version on master is a snapshot (release tag + x commits), then the artifact will be deployed to the JFrog OSS repository:
Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.
This code is open source software licensed under the Apache 2.0 license.