Skip to content

Commit

Permalink
chore: Update Cohort SDK Client readme. (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred authored Aug 12, 2024
1 parent c080256 commit 8c58c8d
Showing 1 changed file with 117 additions and 27 deletions.
144 changes: 117 additions & 27 deletions cohort_sdk_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,34 +258,124 @@ export const enum SdkErrorKind {

## The Initiator API Configuration

See structure defined in `JsInitiatorConfig`

| Parameter | Description | Example, default |
| ---------- | ----------- | ---------------- |
| `retryAttemptsMax` | This parameter comes into play during re-try logic implemented in SDK. It controls many times to re-try the certification request. | 10 |
| `retryBackoff.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 20 |
| `retryBackoff.maxMs` | | 1500 |
| `retryOoAttemptsMax` | This parameter comes into play during re-try logic implemented in SDK. How many times to re-try installing statemap. The difference between this parameter and `retryAttemptsMax` is that this retry is implemented for each certification attempt.| 10 |
| `retryOoBackoff.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 20 |
| `retryOoBackoff.maxMs` | | 1000 |
| `backoffOnConflict.minMs` | The re-try logic implemented in SDK uses exponential backoff strategy. The delay will grow anywhere between min and max values. | 1 |
| `backoffOnConflict.maxMs` | | 1500 |
| `snapshotWaitTimeoutMs` | This parameter comes into play during retry logic implemented in SDK. When Talos certifier aborts the transaction (if conflict is detected) then you should wait until your global state reaches the safe point. This is number of millis to wait until `OutOfOrderSnapshotTimeout` is raised. | 10000 |
| `agent` | Cohort Agent Name, this must be unique per kafka consumer instance. Passed in the kafka mesage header `certAgent`. Used to correlate response message with certification request message issued by same process. | |
| `cohort` | Cohort name. This param, and `agent` param are passed to Talos in the candidate message payload and returned in the decision message. | |
| `bufferSize` | The size of internal buffer in certification messages. | 10000 |
| `timeoutMs` | The number of millis SDK should wait for response from Talos before giving up. | |
| `kafka.brokers` | The array of kafka brokers. | `["127.0.0.1:9092"]` |
| `kafka.topic` | The certification topic name. The same as used by Talos Certifier. | |
| `kafka.clientId` | The client id of connecting cohort. | |
| `kafka.groupId` | TODO: Explain why it should not be set for cohort. | |
| `kafka.username` | Kafka auth | |
| `kafka.password` | Kafka auth | |
| `kafka.producerConfigOverrides` | This is `Map<string, string>` All keys and values are passed directly to underlying kafka library when configuring produser. | `{ "enable.auto.commit": "false" }` |
| `kafka.consumerConfigOverrides` | This is `Map<string, string>` All keys and values are passed directly to underlying kafka library when configuring consumer. | |
| `kafka.producerSendTimeoutMs` | The `queue_timeout` parameter controls how long to retry for if the librdkafka producer queue is full. | 10 |
| `kafka.logLevel` | The verbocity level of kafka library. One of "alert", "critical", "debug", "emerg", "error", "info", "notice", "warning". | "info" |
This section documents all configuration parameters used by Talos client libraries. Information is presented as table listing all tunable configrations. Following few paragraphs explaining some parameters in more details.

### Initiator Configuration

Rust data structure: `packages/cohort_sdk_js/initiator/JsInitiatorConfig`

| Parameter JS | Type JS | Parameter Rust | Type Rust | Default value | Description|
|--------------|---------|----------------|-----------|---------------|-------------|
| `agent`| `string` | `agent` | `String` | | Used in the header `certAgent` of `CandidateMessage` |
| `backoffOnConflict`| `JsBackoffConfig` | `backoff_on_conflict` | `cohort_sdk_js/models/JsBackoffConfig` | | Used by exponential "delay controller" when preparing certification request. (Read more below) |
| `backoffOnConflict.minMs`| `number` | `backoff_on_conflict.min_ms` | `u32` | | The lower limit of time range used by delay controller (read more below) |
| `backoffOnConflict.maxMs`| `number` | `backoff_on_conflict.max_ms` | `u32` | | The upper limit of time range used by delay controller (read more below) |
| `bufferSize`| `number` | `buffer_size` | `u32` | | This setting is used in multiple places (read below) |
| `cohort`| `string` | `cohort` | `String` | | Used in `CandidateMessage` payload |
| `kafka`| `JsKafkaConfig` | `kafka` | `talos_rdkafka_utils/kafka_configs/KafkaConfig` | | |
| `kafka.brokers` | `Array<string>` | | | |
| `kafka.topic` | `string` | | | | |
| `kafka.clientId` | `string` | | | | |
| `kafka.groupId` | `string` | | | | |
| `kafka.username` | `string` | | | | |
| `kafka.password` | `string` | | | | |
| `kafka.producerConfigOverrides` | `Record<string, string>` | `kafka.producer_config_overrides` | `HashMap<String, String>` | | The set of options for overwriting any producer configs supported by [rdlibkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html). Recommented settings for clients: `{ message.timeout.ms: <read below>, queue.buffering.max.messages: ??, topic.metadata.refresh.interval.ms: ??, socket.keepalive.enable: true, acks: 0 }` |
| `kafka.consumerConfigOverrides` | `Record<string, string>` | `kafka.consumer_config_overrides` | `HashMap<String, String>` | | The set of options for overwriting any consumer configs supported by [rdlibkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html). Recommented settings for clients: `{ enable.auto.commit: false, auto.offset.reset: latest, socket.keepalive.enable: true }` |
| `kafka.producerSendTimeoutMs` | `number` | `kafka.producer_send_timeout_ms` | `Option<u32>` | 10 | This value is passed to the librdkafka producer for every message. It controls how long to retry for if the librdkafka producer queue is full. |
| `kafka.logLevel` | `string` | `kafka.log_level` | `String` | info | One of: alert, critical, debug, emerg, error, info, notice, warning. See [event severity](https://docs.confluent.io/platform/current/clients/librdkafka/html/classRdKafka_1_1Event.html#a97aab314006c6c1a82dcd341e5c0e6d3) |
| `retryBackoff`| `JsBackoffConfig` | `retry_backoff` | `cohort_sdk_js/models/JsBackoffConfig` | | Used by exponential "delay controller" when attempting to get response from Talos. |
| `retryBackoff.minMs`| `number` | `retry_backoff.min_ms` | `u32` | | The lower limit of time range used by delay controller (read more below) |
| `retryBackoff.maxMs`| `number` | `retry_backoff.max_ms` | `u32` | | The upper limit of time range used by delay controller (read more below) |
| `retryAttemptsMax`| `number` | | | | Controls how many times the same "logical" request will be sent to Talos Certifier |
| `retryOoBackoff`| `JsBackoffConfig` | `retry_oo_backoff` | `cohort_sdk_js/models/JsBackoffConfig` | | Used by exponential "delay controller" during out of order install process |
| `retryOoBackoff.minMs`| `number` | `retry_oo_backoff.min_ms` | `u32` | | The lower limit of time range used by delay controller (read more below) |
| `retryOoBackoff.maxMs`| `number` | `retry_oo_backoff.max_ms` | `u32` | | The upper limit of time range used by delay controller (read more below) |
| `retryOoAttemptsMax`| `number` | `retry_attempts_max` | `u32` | | Controls how many times the out of order installs will be attempted |
| `snapshotWaitTimeoutMs`| `number` | `snapshot_wait_timeout_ms` | `u32` | | Controls the duration of polling for more recent snapshot during conflict management. This setting is applied for each certification attempt. |
| `timeoutMs`| `number` | `timeout_ms` | `u32` | | Controls max duration of single attempt to obtain decision from Talos certifier (read below) |

#### Configuration Descriptions

**Config `JsInitiatorConfig.bufferSize`**

- Used as the size of internal data channel between **SDK client** and internal **Kafka producer**. If producer is slower that SDK client then this channel will fill-up and cause `certify()` call to block until a free slot becomes available in the buffer. Parameter `JsInitiatorConfig.timeoutMs` contols max duration of single attempt to obtain decision from Talos certifier. The timeout error will be intercepted by main re-try logic and the counter `JsInitiatorConfig.retryAttemptsMax` will be decreased by 1.

- Used as the size of internal data channel between internal **timeout watcher** and **in-flight candidates** cache.

- Used as the size of internal data channel between **Talos decision consumer** and **SDK client**

**Config `JsInitiatorConfig.kafka.producerConfigOverrides`**

It is recommended to pick the optimal value for kafka producer `JsInitiatorConfig.kafka.producerConfigOverrides[message.timeout.ms]` This setting control how quickly sender will see the client level error in case if delivery is not successful.

#### The certification logic

This section describes the certification algorithm implemented by cohort_sdk_client. In addition, the following parameters will be explaned in more details:

- `JsInitiatorConfig.backoffOnConflict`
- `JsInitiatorConfig.retryBackoff`
- `JsInitiatorConfig.snapshotWaitTimeoutMs`
- `JsInitiatorConfig.retryOoBackoff`
- `JsInitiatorConfig.retryOoAttemptsMax`

Invoking `Initiator.certify(callback1, callback2)` triggers the certification process which will invoke both callback functions. These callbacks are invoked one after another in two phases.

**Phase one**

The 1st phase is about obtaining the certification decision from Talos. During this phase the data for certification is being fetched from cohort DB. Once data is loaded the _handler_ passes it to Talos Certifier and waits for the response. Then response is analysed and if decision is "committed" then response is returned to the caller. Here "handler" is the piece of code implemented inside library.

If decision received from Talos is to "abort" the trnsaction then Talos response is analysed further. Especially, handler is interested to find the reason why Talos decided to abort it. Handler will attempt to extract the version of **conflicting transaction** from response. Such response simply means that caller is not the first who is attempting to use the data. Some other process has changed the data already. The caller is risking overwriting these changes. As such, the expected behaviour is either to cancel the certification attempt or pause for a while until cohort database will "catch up" with the most recent changes done to the data. Here, the term "to catch up" means that caller should **wait** until the **snapshot version** updates to more recent. The waiting is controlled by `JsInitiatorConfig.retryAttemptsMax` and `JsInitiatorConfig.retryBackoff` The waiting will stop in three cases:

1. we achieved what we were waiting for
1. we used up all attempts
1. there was some error.

Parameter `JsInitiatorConfig.retryBackoff`, a time range in milliseconds, controls what happens between each retry. Handler uses exponential back-off algorithm, starting from `retryBackoff.minMs` and increasing until `retryBackoff.maxMs`. If we simply wait and re-try then Talos will keep responding with the same "abort" decision. To avoid that we have **to re-load the data** before sending it to Talos again. The re-loading of data is job of the first callback of certify entry point: `Initiator.certify(this one, )`. The handler could have been implemented like this:

- get data from db,
- send to Talos,
- if decision is abort, then wait,
- reload data,
- send to talos and repeat if needed...

This would work, but it's not efficient, because every attempt to send data to Talos is "blind" and relies purely on Talos to decide if it will be successful or not. For the purpose of optimizing this sequence we are making use of some hint included in Talos response: **conflicting transaction**. When Talos response contains the conflicting transaction, which is a number, we know that it is pointless to attempt to certify until our snapshot has reached that number. So, we implemented a slightly different logic:

- get data from DB,
- send to Talos, if decision is abort, then wait,
- get data from DB again,
- if snapshot is still behind the **conflicting transaction** then poll DB: (wait, get data from DB again, check, wait ...)
- if snapshot is now more recent and passed the conflicting transaction, then
- send to Talos and repeat if needed...

In general, it is the same algorithm as above but with a small optimization involving polling DB for the current snapshot. The polling process is controlled by `JsInitiatorConfig.backoffOnConflict` and `JsInitiatorConfig.snapshotWaitTimeoutMs`. The polling will stop in three cases:

1. the snapshot is more recent and has good chances to pass Talos certification, or
1. the re-loading of data was not successful for technical reasons,
1. in the current attempt we waited longer than `JsInitiatorConfig.snapshotWaitTimeoutMs`.

The phase one ends with either the "abort" decision when it is not eligible to be automatically retried or successful certification decision.

When the decision is "commit", which means Talos has certified the request then the second phase kicks-in.

**Phase two**

The 2nd phase is about applying Talos decision to data. In other words, if we want to update entity status, we consult with Talos (in the 1st phase) and now, in the 2nd phase we want to make changes to the data entity. It is evident that the 2nd phase is about working with DB and making changes. This process can be done in two places:

- synchronously, by _initiator_ (this is what we are discussing here);
- asynchronously, by _replicator_ (we will discuss it later in this document).

Synchronous update to data is useful when there is a person or system waiting for the immediate response. For example, we are handling the HTTP call `POST /students {...}` and we need to return very definite response, was it success or not, because client is going to do `GET /students` right after it. In this case the dabase change is performed in the 2nd phase which is called "out of order udpate": `Initiator.certify(..., here)`. As per Talos algorithm, cohort should do the following:

1. update business data, for example, to move entity status from A to B, and
2. to update version number of entity

This must be done atomically.

If attempt to run out of order install fails, then it will be re-tried a few times as per configuration `JsInitiatorConfig.retryOoAttemptsMax`. The handler uses exponential back-off algorithm, starting from `JsInitiatorConfig.retryOoBackoff.minMs` and increasing until `JsInitiatorConfig.retryOoBackoff.maxMs`.

In case SDK client decided not to implement "Out Of Order Install", or it unexpected crashed, restarted etc. then the function of out of order install will be perform "in order" by replicator process. Concepts "out of order" and "in order" refer to the order of incoming candidates from the Talos Certifier point of view.

The next section explains replicator usage and configs.

# The Replicator API
The role of Replicator in Talos ecosystem is to listen to certification decisions and apply transactions in the same order as they were certified.
Expand Down

0 comments on commit 8c58c8d

Please sign in to comment.