Skip to content

Commit

Permalink
feat: bring in a copy of awsfirehosereceiver from opentelemetry-colle…
Browse files Browse the repository at this point in the history
…ctor-contrib
  • Loading branch information
raj-k-singh committed Nov 27, 2024
1 parent b4fcdae commit bb1edc0
Show file tree
Hide file tree
Showing 51 changed files with 3,293 additions and 0 deletions.
1 change: 1 addition & 0 deletions receiver/signozawsfirehosereceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
88 changes: 88 additions & 0 deletions receiver/signozawsfirehosereceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# AWS Kinesis Data Firehose Receiver

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: metrics, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fawsfirehose%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fawsfirehose) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fawsfirehose%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fawsfirehose) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Aneurysm9](https://www.github.com/Aneurysm9) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

Receiver for ingesting AWS Kinesis Data Firehose delivery stream messages and parsing the records received based on the configured record type.

## Configuration

Example:

```yaml
receivers:
awsfirehose:
endpoint: 0.0.0.0:4433
record_type: cwmetrics
access_key: "some_access_key"
tls:
cert_file: server.crt
key_file: server.key
```
The configuration includes the Opentelemetry collector's server [confighttp](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp#server-configuration),
which allows for a variety of settings. Only the most relevant ones will be discussed here, but all are available.
The AWS Kinesis Data Firehose Delivery Streams currently only support HTTPS endpoints using port 443. This can be potentially circumvented
using a Load Balancer.
### endpoint:
The address:port to bind the listener to.
default: `localhost:4433`

You can temporarily disable the `component.UseLocalHostAsDefaultHost` feature gate to change this to `0.0.0.0:4433`. This feature gate will be removed in a future release.

### tls:
See [documentation](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#server-configuration) for more details.

A `cert_file` and `key_file` are required.

### record_type:
The type of record being received from the delivery stream. Each unmarshaler handles a specific type, so the field allows the receiver to use the correct one.

default: `cwmetrics`

See the [Record Types](#record-types) section for all available options.

### access_key (Optional):
The access key to be checked on each request received. This can be set when creating or updating the delivery stream.
See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http) for details.

## Record Types

### cwmetrics
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.

### cwlogs
The record type for the CloudWatch log stream. Expects the format for the records to be JSON.
For example:

```json
{
"messageType": "DATA_MESSAGE",
"owner": "111122223333",
"logGroup": "my-log-group",
"logStream": "my-log-stream",
"subscriptionFilters": ["my-subscription-filter"],
"logEvents": [
{
"id": "123",
"timestamp": 1725544035523,
"message": "My log message."
}
]
}
```

### otlp_v1
The OTLP v1 format as produced by CloudWatch metric streams.
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.
39 changes: 39 additions & 0 deletions receiver/signozawsfirehosereceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver"

import (
"errors"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
)

type Config struct {
// ServerConfig is used to set up the Firehose delivery
// endpoint. The Firehose delivery stream expects an HTTPS
// endpoint, so TLSSettings must be used to enable that.
confighttp.ServerConfig `mapstructure:",squash"`
// RecordType is the key used to determine which unmarshaler to use
// when receiving the requests.
RecordType string `mapstructure:"record_type"`
// AccessKey is checked against the one received with each request.
// This can be set when creating or updating the Firehose delivery
// stream.
AccessKey configopaque.String `mapstructure:"access_key"`
}

// Validate checks that the endpoint and record type exist and
// are valid.
func (c *Config) Validate() error {
if c.Endpoint == "" {
return errors.New("must specify endpoint")
}
// If a record type is specified, it must be valid.
// An empty string is acceptable, however, because it will use a telemetry-type-specific default.
if c.RecordType != "" {
return validateRecordType(c.RecordType)
}
return nil
}
57 changes: 57 additions & 0 deletions receiver/signozawsfirehosereceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awsfirehosereceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
for _, configType := range []string{
"cwmetrics", "cwlogs", "otlp_v1", "invalid",
} {
t.Run(configType, func(t *testing.T) {
fileName := configType + "_config.yaml"
cm, err := confmaptest.LoadConf(filepath.Join("testdata", fileName))
require.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

err = component.ValidateConfig(cfg)
if configType == "invalid" {
assert.Error(t, err)
} else {
assert.NoError(t, err)
require.Equal(t, &Config{
RecordType: configType,
AccessKey: "some_access_key",
ServerConfig: confighttp.ServerConfig{
Endpoint: "0.0.0.0:4433",
TLSSetting: &configtls.ServerConfig{
Config: configtls.Config{
CertFile: "server.crt",
KeyFile: "server.key",
},
},
},
}, cfg)
}
})
}
}
16 changes: 16 additions & 0 deletions receiver/signozawsfirehosereceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package awsfirehosereceiver implements a receiver that can be used to
// receive requests from the AWS Kinesis Data Firehose and transform them
// into formats usable by the Opentelemetry collector. The configuration
// determines which unmarshaler to use. Each unmarshaler is responsible for
// processing a Firehose record format that can be sent through the delivery
// stream.
//
// More details can be found at:
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html

//go:generate mdatagen metadata.yaml

package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver"
104 changes: 104 additions & 0 deletions receiver/signozawsfirehosereceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver"

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
)

const (
defaultEndpoint = "0.0.0.0:4433"
defaultPort = 4433
)

var (
errUnrecognizedRecordType = errors.New("unrecognized record type")
availableRecordTypes = map[string]bool{
cwmetricstream.TypeStr: true,
cwlog.TypeStr: true,
otlpmetricstream.TypeStr: true,
}
)

// NewFactory creates a receiver factory for awsfirehose. Currently, only
// available in metrics pipelines.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

// validateRecordType checks the available record types for the
// passed in one and returns an error if not found.
func validateRecordType(recordType string) error {
if _, ok := availableRecordTypes[recordType]; !ok {
return errUnrecognizedRecordType
}
return nil
}

// defaultMetricsUnmarshalers creates a map of the available metrics
// unmarshalers.
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
cwmsu := cwmetricstream.NewUnmarshaler(logger)
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
return map[string]unmarshaler.MetricsUnmarshaler{
cwmsu.Type(): cwmsu,
otlpv1msu.Type(): otlpv1msu,
}
}

// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
u := cwlog.NewUnmarshaler(logger)
return map[string]unmarshaler.LogsUnmarshaler{
u.Type(): u,
}
}

// createDefaultConfig creates a default config with the endpoint set
// to port 8443 and the record type set to the CloudWatch metric stream.
func createDefaultConfig() component.Config {
return &Config{
ServerConfig: confighttp.ServerConfig{
Endpoint: testutil.EndpointForPort(defaultPort),
},
}
}

// createMetricsReceiver implements the CreateMetrics function type.
func createMetricsReceiver(
_ context.Context,
set receiver.Settings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer)
}

// createMetricsReceiver implements the CreateMetricsReceiver function type.
func createLogsReceiver(
_ context.Context,
set receiver.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (receiver.Logs, error) {
return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer)
}
50 changes: 50 additions & 0 deletions receiver/signozawsfirehosereceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awsfirehosereceiver

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
)

func TestValidConfig(t *testing.T) {
err := componenttest.CheckConfigStruct(createDefaultConfig())
require.NoError(t, err)
}

func TestCreateMetrics(t *testing.T) {
r, err := createMetricsReceiver(
context.Background(),
receivertest.NewNopSettings(),
createDefaultConfig(),
consumertest.NewNop(),
)
require.NoError(t, err)
require.NotNil(t, r)
}

func TestCreateLogsReceiver(t *testing.T) {
r, err := createLogsReceiver(
context.Background(),
receivertest.NewNopSettings(),
createDefaultConfig(),
consumertest.NewNop(),
)
require.NoError(t, err)
require.NotNil(t, r)
}

func TestValidateRecordType(t *testing.T) {
require.NoError(t, validateRecordType(defaultMetricsRecordType))
require.NoError(t, validateRecordType(defaultLogsRecordType))
require.NoError(t, validateRecordType(otlpmetricstream.TypeStr))
require.Error(t, validateRecordType("nop"))
}
Loading

0 comments on commit bb1edc0

Please sign in to comment.