Skip to content

Commit

Permalink
add vendor
Browse files Browse the repository at this point in the history
  • Loading branch information
deathowl committed Sep 12, 2016
1 parent a56b6d9 commit 661e0dc
Show file tree
Hide file tree
Showing 417 changed files with 150,846 additions and 23 deletions.
13 changes: 9 additions & 4 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/Shopify/sarama"
)
"github.com/deathowl/go-metrics-prometheus"
"time"

)
var saramaConfig *sarama.Config

type kafkaCollector struct {
upIndicator *prometheus.Desc
Expand All @@ -17,7 +20,9 @@ type kafkaCollector struct {
}
func init() {
prometheus.MustRegister(NewKafkaCollector())

saramaConfig = sarama.NewConfig()
pClient := prometheusmetrics.NewPrometheusProvider(saramaConfig.MetricRegistry, "kafka", "broker", prometheus.DefaultRegisterer, 1*time.Second)
go pClient.UpdatePrometheusMetrics()
}
func parseFloatOrZero(s string) float64 {
res, err := strconv.ParseFloat(s, 64)
Expand Down Expand Up @@ -46,6 +51,7 @@ func (c *kafkaCollector) Describe(ch chan<- *prometheus.Desc) {
func (c *kafkaCollector) Collect(ch chan<- prometheus.Metric) {
log.Info("Fetching metrics from Kafka")

kClient, _ := sarama.NewClient(strings.Split(*kafkaAddr, ","), saramaConfig)
kafkaConsumer, err := sarama.NewConsumer(strings.Split(*kafkaAddr, ","), nil)
if err != nil {
log.Error("Failed to start Sarama consumer:", err)
Expand All @@ -67,7 +73,6 @@ func (c *kafkaCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(c.partitionsCount, prometheus.GaugeValue,float64(len(partitions)), topic)

for _, partition := range partitions {
kClient, _ := sarama.NewClient(strings.Split(*kafkaAddr, ","), nil)
offsetManager, _ :=sarama.NewOffsetManagerFromClient("zkexporter.om", kClient)
pom, _ := offsetManager.ManagePartition(topic, partition)
oldOffset, _ := pom.NextOffset()
Expand All @@ -76,8 +81,8 @@ func (c *kafkaCollector) Collect(ch chan<- prometheus.Metric) {
pom.MarkOffset(newOffset, "already reported")
pom.Close()
offsetManager.Close()
kClient.Close()
}
}
kafkaConsumer.Close()
kClient.Close()
}
323 changes: 323 additions & 0 deletions vendor/github.com/Shopify/sarama/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/Shopify/sarama/MIT-LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 661e0dc

Please sign in to comment.