Skip to content

Commit

Permalink
refactor(ffi): refactor engine for streaming (#430)
Browse files Browse the repository at this point in the history
* refactor: decouple evaluator and parser to support streaming

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: use channels for message passing

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: handle errors from http

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: return response from fetcher

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* feat: impl streaming

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: rm streaming file

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fix async tests

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: handle json parsing error

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: rm unneeded test

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: rm tokio from main deps

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fix sending async

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: minor cleanup

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: add some logging

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: try to fix thread lifecycle management

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: put polling in loop

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: add some debug logging

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: dont reset snapshot on error state

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: dont print error on shutdown

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: handle wait for async code to finish

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: use println for debug

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: more debug info

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: perform initial fetch on engine start

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: refactor initial fetch

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: set error in snapshot

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: test stream fetch

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: error message

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: clippyyyyyyyyy

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: rm 303 handling

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: expose fetch mode

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fix url for streaming

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fix streaming test

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: expose fetch mode to other languages

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fix java build

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: rename file cause java

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: handle stream chunk parsing; rm timeout

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fmt java

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: ruby fmt

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: fmt python

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: retry requests, ensure consistency in error handling

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: assert expected retries for fetch error

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: add more tests and documentation

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

* chore: java fmt

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>

---------

Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com>
  • Loading branch information
markphelps authored Oct 17, 2024
1 parent 1f357f8 commit 40022f7
Show file tree
Hide file tree
Showing 24 changed files with 791 additions and 374 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ Cargo.lock
tmp
.envrc
.vscode
staging
staging
*.h
16 changes: 16 additions & 0 deletions flipt-client-dart/lib/src/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@ import 'package:json_annotation/json_annotation.dart';

part 'models.g.dart';

enum FetchMode {
@JsonValue('polling')
polling,
@JsonValue('streaming')
streaming,
}

/// Options for the Flipt client
@JsonSerializable()
class Options {
final String? url;
final String? reference;
final int? updateInterval;
final Map<String, dynamic>? authentication;

/// Note: Streaming is currently only supported when using the SDK with Flipt Cloud (https://flipt.io/cloud).
final FetchMode? fetchMode;

Options({
this.url = 'http://localhost:8080',
this.reference,
this.updateInterval = 120,
this.authentication,
this.fetchMode = FetchMode.polling,
});

factory Options.fromJson(Map<String, dynamic> json) =>
Expand All @@ -25,6 +37,7 @@ class Options {
String? url,
String? reference,
int? updateInterval,
FetchMode? fetchMode,
}) {
return Options(
url: url,
Expand All @@ -33,6 +46,7 @@ class Options {
authentication: {
'client_token': token,
},
fetchMode: fetchMode,
);
}

Expand All @@ -41,6 +55,7 @@ class Options {
String? url,
String? reference,
int? updateInterval,
FetchMode? fetchMode,
}) {
return Options(
url: url,
Expand All @@ -49,6 +64,7 @@ class Options {
authentication: {
'jwt_token': token,
},
fetchMode: fetchMode,
);
}
}
Expand Down
8 changes: 8 additions & 0 deletions flipt-client-dart/lib/src/models.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions flipt-client-go/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type EvaluationClient struct {
authentication any
ref string
updateInterval int
fetchMode FetchMode
}

// NewEvaluationClient constructs a Client.
Expand All @@ -46,6 +47,7 @@ func NewEvaluationClient(opts ...clientOption) (*EvaluationClient, error) {
UpdateInterval: client.updateInterval,
Authentication: &client.authentication,
Reference: client.ref,
FetchMode: client.fetchMode,
}

b, err := json.Marshal(clientOpts)
Expand Down Expand Up @@ -106,6 +108,14 @@ func WithJWTAuthentication(token string) clientOption {
}
}

// WithFetchMode allows for specifying the fetch mode for the Flipt client (e.g. polling, streaming).
// Note: Streaming is currently only supported when using the SDK with Flipt Cloud (https://flipt.io/cloud).
func WithFetchMode(fetchMode FetchMode) clientOption {
return func(c *EvaluationClient) {
c.fetchMode = fetchMode
}
}

// EvaluateVariant performs evaluation for a variant flag.
func (e *EvaluationClient) EvaluateVariant(_ context.Context, flagKey, entityID string, evalContext map[string]string) (*VariantEvaluationResponse, error) {
ereq, err := json.Marshal(evaluationRequest{
Expand Down
16 changes: 12 additions & 4 deletions flipt-client-go/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ type jwtAuthentication struct {
Token string `json:"jwt_token"`
}

type FetchMode string

const (
FetchModeStreaming FetchMode = "streaming"
FetchModePolling FetchMode = "polling"
)

type clientOptions[T any] struct {
URL string `json:"url,omitempty"`
Authentication *T `json:"authentication,omitempty"`
UpdateInterval int `json:"update_interval,omitempty"`
Reference string `json:"reference,omitempty"`
URL string `json:"url,omitempty"`
Authentication *T `json:"authentication,omitempty"`
UpdateInterval int `json:"update_interval,omitempty"`
Reference string `json:"reference,omitempty"`
FetchMode FetchMode `json:"fetch_mode,omitempty"`
}

type Flag struct {
Expand Down
2 changes: 1 addition & 1 deletion flipt-client-java/.idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flipt-client-java/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public interface CLibrary extends Library {
void destroy_string(Pointer str);
}

private FliptEvaluationClient(String namespace, ClientOptions clientOpts)
private FliptEvaluationClient(String namespace, ClientOptions clientOptions)
throws EvaluationException {

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());

String clientOptionsSerialized;
try {
clientOptionsSerialized = objectMapper.writeValueAsString(clientOpts);
clientOptionsSerialized = objectMapper.writeValueAsString(clientOptions);
} catch (JsonProcessingException e) {
throw new EvaluationException(e);
}
Expand All @@ -61,48 +61,99 @@ public static FliptEvaluationClientBuilder builder() {
return new FliptEvaluationClientBuilder();
}

/** FliptEvaluationClientBuilder is a builder for creating a FliptEvaluationClient. */
public static final class FliptEvaluationClientBuilder {
private String namespace = "default";
private String url = "http://localhost:8080";
private AuthenticationStrategy authentication;
private String reference;
private Duration updateInterval;
private FetchMode fetchMode = FetchMode.POLLING;

public FliptEvaluationClientBuilder() {}

/**
* url sets the URL for the Flipt server.
*
* @param url the URL for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder url(String url) {
this.url = url;
return this;
}

/**
* namespace sets the namespace for the Flipt server.
*
* @param namespace the namespace for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder namespace(String namespace) {
this.namespace = namespace;
return this;
}

/**
* authentication sets the authentication strategy for the Flipt server.
*
* @param authentication the authentication strategy for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder authentication(AuthenticationStrategy authentication) {
this.authentication = authentication;
return this;
}

/**
* updateInterval sets the update interval for the Flipt server.
*
* @param updateInterval the update interval for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder updateInterval(Duration updateInterval) {
this.updateInterval = updateInterval;
return this;
}

/**
* reference sets the reference for the Flipt server.
*
* @param reference the reference for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder reference(String reference) {
this.reference = reference;
return this;
}

/**
* fetchMode sets the fetch mode for the Flipt server. Note: Streaming is currently only
* supported when using the SDK with Flipt Cloud (https://flipt.io/cloud).
*
* @param fetchMode the fetch mode for the Flipt server
* @return the FliptEvaluationClientBuilder
*/
public FliptEvaluationClientBuilder fetchMode(FetchMode fetchMode) {
this.fetchMode = fetchMode;
return this;
}

/**
* build builds a new FliptEvaluationClient.
*
* @return the FliptEvaluationClient
* @throws EvaluationException if the FliptEvaluationClient could not be built
*/
public FliptEvaluationClient build() throws EvaluationException {
return new FliptEvaluationClient(
namespace,
new ClientOptions(
Optional.of(url),
Optional.ofNullable(updateInterval),
Optional.ofNullable(authentication),
Optional.ofNullable(reference)));
Optional.ofNullable(reference),
Optional.ofNullable(fetchMode)));
}
}

Expand Down Expand Up @@ -136,6 +187,15 @@ public Map<String, String> getContext() {
}
}

/**
* evaluateVariant evaluates a variant flag.
*
* @param flagKey the key for the flag to evaluate
* @param entityId the ID for the entity to evaluate
* @param context the context for the evaluation
* @return the evaluation response
* @throws EvaluationException if the evaluation failed
*/
public VariantEvaluationResponse evaluateVariant(
String flagKey, String entityId, Map<String, String> context) throws EvaluationException {
InternalEvaluationRequest evaluationRequest =
Expand All @@ -160,6 +220,15 @@ public VariantEvaluationResponse evaluateVariant(
return resp.getResult().get();
}

/**
* evaluateBoolean evaluates a boolean flag.
*
* @param flagKey the key for the flag to evaluate
* @param entityId the ID for the entity to evaluate
* @param context the context for the evaluation
* @return the evaluation response
* @throws EvaluationException if the evaluation failed
*/
public BooleanEvaluationResponse evaluateBoolean(
String flagKey, String entityId, Map<String, String> context) throws EvaluationException {
InternalEvaluationRequest evaluationRequest =
Expand All @@ -183,6 +252,13 @@ public BooleanEvaluationResponse evaluateBoolean(
return resp.getResult().get();
}

/**
* evaluateBatch evaluates a batch of flags.
*
* @param batchEvaluationRequests the batch of flags to evaluate
* @return the evaluation response
* @throws EvaluationException if the evaluation failed
*/
public BatchEvaluationResponse evaluateBatch(EvaluationRequest[] batchEvaluationRequests)
throws EvaluationException {
ArrayList<InternalEvaluationRequest> evaluationRequests =
Expand Down Expand Up @@ -216,6 +292,12 @@ public BatchEvaluationResponse evaluateBatch(EvaluationRequest[] batchEvaluation
return resp.getResult().get();
}

/**
* listFlags lists all flags in the namespace.
*
* @return the list of flags
* @throws EvaluationException if the list of flags could not be retrieved
*/
public ArrayList<Flag> listFlags() throws EvaluationException {
Pointer value = CLibrary.INSTANCE.list_flags(this.engine);

Expand All @@ -229,6 +311,7 @@ public ArrayList<Flag> listFlags() throws EvaluationException {
return resp.getResult().get();
}

/** close closes the FliptEvaluationClient. */
public void close() {
CLibrary.INSTANCE.destroy_engine(this.engine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ public class ClientOptions {
private final Optional<Long> updateInterval;
private final Optional<AuthenticationStrategy> authentication;
private final Optional<String> reference;
private final Optional<FetchMode> fetchMode;

public ClientOptions(
Optional<String> url,
Optional<Duration> updateInterval,
Optional<AuthenticationStrategy> authentication,
Optional<String> reference) {
Optional<String> reference,
Optional<FetchMode> fetchMode) {
this.url = url;
this.authentication = authentication;
this.reference = reference;
Expand All @@ -27,6 +29,7 @@ public ClientOptions(
}

this.updateInterval = setUpdateInterval;
this.fetchMode = fetchMode;
}

@JsonProperty("url")
Expand All @@ -48,4 +51,9 @@ public Optional<AuthenticationStrategy> getAuthentication() {
public Optional<String> getReference() {
return reference;
}

@JsonProperty("fetch_mode")
public Optional<FetchMode> getFetchMode() {
return fetchMode;
}
}
Loading

0 comments on commit 40022f7

Please sign in to comment.