Skip to content

Commit

Permalink
Merge remote-tracking branch 'grafana/main' into karsten/enable-query…
Browse files Browse the repository at this point in the history
…-protos
  • Loading branch information
jeschkies committed Nov 10, 2023
2 parents d1ac371 + 4b20f95 commit 9b9bb06
Show file tree
Hide file tree
Showing 49 changed files with 269 additions and 282 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
* [10341](https://github.com/grafana/loki/pull/10341) **ashwanthgoli** Deprecate older index types and non-object stores - `aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc`
* [10344](https://github.com/grafana/loki/pull/10344) **ashwanthgoli** Compactor: deprecate `-boltdb.shipper.compactor.` prefix in favor of `-compactor.`.
* [10073](https://github.com/grafana/loki/pull/10073) **sandeepsukhani,salvacorts,vlad-diachenko** Support attaching structured metadata to log lines.
* [11151](https://github.com/grafana/loki/pull/11151) **ashwanthgoli**: Removes already deprecated configs: `ruler.evaluation-delay-duration`, `boltdb.shipper.compactor.deletion-mode`, `validation.enforce-metric-name` and flags with prefix `-boltdb.shipper.compactor.*`.

##### Fixes

Expand Down
13 changes: 8 additions & 5 deletions clients/cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'An array of fields to map to labels, if defined only fields in this list will be mapped.'
config :include_fields, :validate => :array, :default => [], :required => false

## 'An array of fields to map to structure metadata, if defined only fields in this list will be mapped.'
config :metadata_fields, :validate => :array, :default => [], :required => false

## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false

Expand All @@ -71,7 +74,7 @@ def register
@logger.info("Loki output plugin", :class => self.class.name)

# initialize Queue and Mutex
@entries = Queue.new
@entries = Queue.new
@mutex = Mutex.new
@stop = false

Expand All @@ -94,7 +97,7 @@ def max_batch_size
@mutex.synchronize do
return if @stop
end

e = @entries.deq
return if e.nil?

Expand Down Expand Up @@ -201,13 +204,13 @@ def is_batch_expired
## Receives logstash events
public
def receive(event)
@entries << Entry.new(event, @message_field, @include_fields)
@entries << Entry.new(event, @message_field, @include_fields, @metadata_fields)
end

def close
@entries.close
@mutex.synchronize do
@stop = true
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join
Expand Down
14 changes: 13 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ def to_json
def build_stream(stream)
values = []
stream['entries'].each { |entry|
values.append([entry['ts'].to_s, entry['line']])
if entry.key?('metadata')
sorted_metadata = entry['metadata'].sort.to_h
values.append([
entry['ts'].to_s,
entry['line'],
sorted_metadata
])
else
values.append([
entry['ts'].to_s,
entry['line']
])
end
}
return {
'stream'=>stream['labels'],
Expand Down
18 changes: 17 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def to_ns(s)
class Entry
include Loki
attr_reader :labels, :entry
def initialize(event,message_field,include_fields)
def initialize(event,message_field,include_fields,metadata_fields)
@entry = {
"ts" => to_ns(event.get("@timestamp")),
"line" => event.get(message_field).to_s
Expand All @@ -21,6 +21,22 @@ def initialize(event,message_field,include_fields)
next if include_fields.length() > 0 and not include_fields.include?(key)
@labels[key] = value.to_s
}

# Unlike include_fields we should skip if no metadata_fields provided
if metadata_fields.length() > 0
@metadata = {}
event.to_hash.each { |key,value|
next if key.start_with?('@')
next if value.is_a?(Hash)
next if metadata_fields.length() > 0 and not metadata_fields.include?(key)
@metadata[key] = value.to_s
}

# Add @metadata to @entry if there was a match
if @metadata.size > 0
@entry.merge!('metadata' => @metadata)
end
end
end
end
end
2 changes: 1 addition & 1 deletion clients/cmd/logstash/logstash-output-loki.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.1.0'
s.version = '1.2.0'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

Expand Down
15 changes: 15 additions & 0 deletions clients/cmd/logstash/loki-metadata.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
input {
generator {
message => "Hello world!"
count => 10
add_field => {cluster=> "foo" namespace=>"bar" trace_id=> "trace_001"}
}
}

output {
loki {
url => "http://localhost:3100"
include_fields => ["cluster"]
metadata_fields => ["trace_id"]
}
}
3 changes: 3 additions & 0 deletions clients/cmd/logstash/loki.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ output {
# If include_fields is set, only fields in this list will be sent to Loki as labels.
#include_fields => ["service","host","app","env"] #default empty array, all labels included.

# If metadata_fields is set, fields in this list will be sent to Loki as structured metadata for the associated log.
#metadata_fields => ["trace_id"] #default empty array, no structure metadata will be included

#batch_wait => 1 ## in seconds #default 1 second

#batch_size => 102400 #bytes #default 102400 bytes
Expand Down
21 changes: 15 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki/entry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,40 @@
{'@path' => '/path/to/file.log'},
},
'host' => '172.0.0.1',
'trace_id' => 'trace_001',
'@timestamp' => Time.now
}
)
}

it 'labels extracted should not contains object and metadata or timestamp' do
entry = Entry.new(event,"message", [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'})
entry = Entry.new(event,"message", [], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5', 'trace_id'=>'trace_001'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels extracted should only contain allowlisted labels' do
entry = Entry.new(event, "message", %w[agent foo])
entry = Entry.new(event, "message", %w[agent foo], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels and structured metadata extracted should only contain allow listed labels and metadata' do
entry = Entry.new(event, "message", %w[agent foo], %w[trace_id])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
expect(entry.entry['metadata']).to eql({'trace_id' => 'trace_001'})
end
end

context 'test batch generation with label order' do
let (:entries) {[
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []),
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", [], []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", [], []),

]}
let (:expected) {
Expand Down
55 changes: 49 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

context 'when adding en entry to the batch' do
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h}

it 'should not add empty line' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config)
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [])
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [], [])
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true
expect(plugin.batch).to eql nil
end
Expand Down Expand Up @@ -83,8 +83,51 @@
end
end

context 'when building json from batch to send' do
let (:basic_loki_config) {{'url' => 'http://localhost:3100'}}
let (:basic_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:metadata_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id"] }}
let (:metadata_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id"])}
let (:metadata_multi_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id", "user_id"] }}
let (:metadata_multi_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","user_id"=>"user_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id", "user_id"])}

it 'should not include labels or metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(basic_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(basic_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"buzz":"bar","cluster":"us-central1","trace_id":"trace_001"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include metadata with no labels' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001"}]]}]}'
end

it 'should include labels with no metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include labels with multiple metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_multi_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_multi_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001","user_id":"user_001"}]]}]}'
end
end

context 'batch expiration' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should not expire if empty' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5}))
Expand Down Expand Up @@ -147,13 +190,13 @@
loki.receive(event)
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
expect(loki.batch).to be_nil
loki.close
end
end

context 'http requests' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should send credentials' do
conf = {
Expand Down
2 changes: 1 addition & 1 deletion clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
exit(1)
}
serverCfg := &config.Config.ServerConfig.Config
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, true, false)
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, false)

// Use Stderr instead of files for the klog.
klog.SetOutput(os.Stderr)
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_dropStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labelallow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_addLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labeldrop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_dropLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions clients/pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_multilineStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -52,7 +52,7 @@ func Test_multilineStage_MultiStreams(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

maxWait := 2 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Test_packStage_Run(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *PushTarget) run() error {
// The logger registers a metric which will cause a duplicate registry panic unless we provide an empty registry
// The metric created is for counting log lines and isn't likely to be missed.
serverCfg := &t.config.Server
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.NewRegistry(), true, false)
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.NewRegistry(), false)

// Set new registry for upcoming metric server
// If not, it'll likely panic when the tool gets reloaded.
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/windows/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
// Enable debug logging
cfg := &server.Config{}
_ = cfg.LogLevel.Set("debug")
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
}

// Test that you can use to generate event logs locally.
Expand Down
2 changes: 1 addition & 1 deletion cmd/logql-analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
cfg := getConfig()
util_log.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
}, prometheus.DefaultRegisterer, true, false)
}, prometheus.DefaultRegisterer, false)
s, err := createServer(cfg, util_log.Logger)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error while creating the server", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {
exit(1)
}
serverCfg := &config.Server
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, config.UseBufferedLogger, config.UseSyncLogger)
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, false)

// Validate the config once both the config file has been loaded
// and CLI flags parsed.
Expand Down
2 changes: 1 addition & 1 deletion cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {

util_log.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
}, prometheus.DefaultRegisterer, true, false)
}, prometheus.DefaultRegisterer, false)

// Run the instrumentation server.
registry := prometheus.NewRegistry()
Expand Down
Loading

0 comments on commit 9b9bb06

Please sign in to comment.