Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] Improve test flakiness #284

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions spec/filewatch/rotate_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,33 @@ module FileWatch
end

context "create + rename rotation: when a new logfile is renamed to a path we have seen before and the open file is fully read, renamed outside glob" do
let(:stat_interval) { 0.04 }
let(:discover_interval) { 15 }

let(:watch_dir) { directory.join("*A.log") }
let(:file_path) { directory.join("1A.log") }
subject { described_class.new(conf) }
let(:listener1) { observer.listener_for(file1_path) }
let(:listener2) { observer.listener_for(second_file.to_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create file") do
file_path.open("wb") { |file| file.write("#{line1}\n") }
end
.then_after(0.25, "write a 'unfinished' line") do
file_path.open("ab") { |file| file.write(line2) }
end
.then_after(0.25, "rotate once") do
tmpfile = directory.join("1.logtmp")
tmpfile.open("wb") { |file| file.write("\n#{line3}\n")}
.run_after(1.25, "create file") do
file_path.open("wb") { |file| file.write("#{line1}\n") }
end
.then_after(1.25, "write a 'unfinished' line") do
file_path.open("ab") { |file| file.write("#{line2}") }
end
.then_after(1.25, "rotate once") do
tmpfile = directory.join("1.logtmp")
tmpfile.open("wb") { |file|
file.write("\n#{line3}\n")
}
file_path.rename(directory.join("1.log.1"))
FileUtils.mv(directory.join("1.logtmp").to_path, file1_path)
end
end
.then("wait for expectation") do
sleep(0.25) # if ENV['CI']
wait(2).for { listener1.calls }.to eq([:open, :accept, :accept, :accept])
sleep(1.25) # if ENV['CI']
wait(2).for { listener1.calls }.to include(:open, :accept, :accept, :accept)
end
.then("quit") do
tailing.quit
Expand Down Expand Up @@ -104,14 +109,14 @@ module FileWatch
let(:listener3) { observer.listener_for(third_file.to_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create file") do
.run_after(0.75, "create file") do
file_path.open("wb") { |file| file.write("#{line1}\n") }
end
.then_after(0.25, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do
.then_after(0.75, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.write("#{line2}\n") }
end
.then_after(0.25, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do
.then_after(0.75, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do
second_file.rename(third_file)
file_path.rename(second_file)
file_path.open("wb") { |file| file.write("#{line3}\n") }
Expand Down Expand Up @@ -145,19 +150,19 @@ module FileWatch
let(:listener2) { observer.listener_for(second_file.to_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create original - write line 1, 66 bytes") do
.run_after(0.75, "create original - write line 1, 66 bytes") do
file_path.open("wb") { |file| file.write("#{line1}\n") }
end
.then_after(0.25, "rename to 2.log") do
.then_after(0.75, "rename to 2.log") do
file_path.rename(second_file)
end
.then_after(0.25, "write line 2 to original, 61 bytes") do
.then_after(0.75, "write line 2 to original, 61 bytes") do
file_path.open("wb") { |file| file.write("#{line2}\n") }
end
.then_after(0.25, "rename to 2.log again") do
.then_after(0.75, "rename to 2.log again") do
file_path.rename(second_file)
end
.then_after(0.25, "write line 3 to original, 47 bytes") do
.then_after(0.75, "write line 3 to original, 47 bytes") do
file_path.open("wb") { |file| file.write("#{line3}\n") }
end
.then("wait for expectations to be met") do
Expand Down Expand Up @@ -188,19 +193,19 @@ module FileWatch
let(:listener2) { observer.listener_for(second_file.to_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create original - write line 1, 66 bytes") do
.run_after(1.5, "create original - write line 1, 66 bytes") do
file_path.open("wb") { |file| file.write("#{line1}\n") }
end
.then_after(0.25, "rename to 2.log") do
.then_after(1.5, "rename to 2.log") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.write("#{line2}\n") }
end
.then_after(0.25, "rename to 2.log again") do
.then_after(1.5, "rename to 2.log again") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.write("#{line3}\n") }
end
.then("wait for expectations to be met") do
wait(0.5).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true)
wait(1).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true)
end
.then("quit") do
tailing.quit
Expand Down Expand Up @@ -267,14 +272,14 @@ module FileWatch
let(:listener1) { observer.listener_for(file1_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create file") do
.run_after(0.75, "create file") do
file_path.open("wb") { |file| file.puts(line1); file.puts(line2) }
end
.then_after(0.25, "rotate") do
.then_after(0.75, "rotate") do
FileUtils.cp(file1_path, directory.join("1F.log.1").to_path)
file_path.truncate(0)
end
.then_after(0.25, "write to truncated file") do
.then_after(0.75, "write to truncated file") do
file_path.open("wb") { |file| file.puts(line3) }
end
.then("wait for expectations to be met") do
Expand Down Expand Up @@ -342,13 +347,13 @@ module FileWatch
let(:listener2) { observer.listener_for(file2.to_path) }
let(:actions) do
RSpec::Sequencing
.run_after(0.25, "create file") do
.run_after(0.75, "create file") do
file_path.open("wb") { |file| file.puts(line1); file.puts(line2) }
end
.then_after(0.25, "rename") do
.then_after(0.75, "rename") do
FileUtils.mv(file1_path, file2.to_path)
end
.then_after(0.25, "write to renamed file") do
.then_after(0.75, "write to renamed file") do
file2.open("ab") { |file| file.puts(line3) }
end
.then("wait for expectations to be met") do
Expand Down Expand Up @@ -464,14 +469,14 @@ module FileWatch
.run_after(0.75, "create file") do
file_path.open("wb") { |file| file.puts(line1); file.puts(line2) }
end
.then_after(0.5, "rename") do
.then_after(0.75, "rename") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.puts("#{line3}") }
end
.then("wait for expectations to be met") do
wait(2.0).for{listener1.lines.size + listener2.lines.size}.to eq(3)
end
.then_after(0.5, "rename again") do
end
.then_after(0.75, "rename again") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.puts("#{line4}") }
end
Expand Down
17 changes: 9 additions & 8 deletions spec/filewatch/tailing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ module FileWatch
ENV["FILEWATCH_MAX_FILES_WARN_INTERVAL"] = "0"
File.open(file_path, "wb") { |file| file.write("line1\nline2\n") }
File.open(file_path2, "wb") { |file| file.write("line-A\nline-B\n") }
sleep(0.25) # if ENV['CI']
end

context "when max_active is 1" do
Expand All @@ -76,7 +77,7 @@ module FileWatch
end

context "when close_older is set" do
let(:wait_before_quit) { 0.8 }
let(:wait_before_quit) { 1.5 }
let(:opts) { super().merge(:close_older => 0.1, :max_open_files => 1, :stat_interval => 0.1) }
let(:suffix) { "B" }
it "opens both files" do
Expand Down Expand Up @@ -134,7 +135,7 @@ module FileWatch
File.open(file_path, "wb") { |file| file.write("line1\nline2\n") }
end
.then("wait") do
wait(0.75).for { listener1.lines }.to_not be_empty
wait(1.5).for { listener1.lines }.to_not be_empty
end
.then("quit") do
tailing.quit
Expand All @@ -155,7 +156,7 @@ module FileWatch
# it simulates that the user deleted the file
# so when a stat is taken on the file an error is raised
let(:suffix) { "E" }
let(:quit_after) { 0.2 }
let(:quit_after) { 1 }
let(:stat) { double("stat", :size => 100, :modified_at => Time.now.to_f, :inode => 234567, :inode_struct => InodeStruct.new("234567", 1, 5)) }
let(:watched_file) { WatchedFile.new(file_path, stat, tailing.settings) }
before do
Expand Down Expand Up @@ -253,10 +254,10 @@ module FileWatch
# create file after first discovery, will be read from the beginning
File.open(file_path, "wb") { |file| file.write("line1\nline2\n") }
end
.then_after(0.55, "rename file") do
.then_after(0.75, "rename file") do
FileUtils.mv(file_path, new_file_path)
end
.then_after(0.55, "then write to renamed file") do
.then_after(0.75, "then write to renamed file") do
File.open(new_file_path, "ab") { |file| file.write("line3\nline4\n") }
wait(0.5).for{listener1.lines.size}.to eq(2), "listener1.lines.size not eq(2)"
end
Expand Down Expand Up @@ -354,7 +355,7 @@ module FileWatch
end
.then("watch and wait") do
tailing.watch_this(watch_dir)
wait(1.25).for{listener1.calls}.to eq([:open, :timed_out])
wait(2).for{listener1.calls}.to eq([:open, :timed_out])
end
.then("quit") do
tailing.quit
Expand Down Expand Up @@ -450,7 +451,7 @@ module FileWatch
FileUtils.mv(file_path2, file_path3)
end
.then("wait") do
wait(4).for do
wait(8).for do
listener1.lines.size == 32 && listener2.calls == [:delete] && listener3.calls == [:open, :accept, :timed_out]
end.to eq(true), "listener1.lines != 32 or listener2.calls != [:delete] or listener3.calls != [:open, :accept, :timed_out]"
end
Expand Down Expand Up @@ -481,7 +482,7 @@ module FileWatch
tailing.watch_this(watch_dir)
end
.then("wait for lines") do
wait(1.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out])
wait(2.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out])
end
.then("quit") do
tailing.quit
Expand Down
50 changes: 36 additions & 14 deletions spec/inputs/file_read_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
end

events = input(conf) do |pipeline, queue|
wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey
wait(2).for{File.exist?(tmpfile_path)}.to be_falsey
2.times.collect { queue.pop }
end

Expand Down Expand Up @@ -102,7 +102,7 @@
end

events = input(conf) do |pipeline, queue|
wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey
wait(1).for{File.exist?(tmpfile_path)}.to be_falsey
3.times.collect { queue.pop }
end

Expand Down Expand Up @@ -255,7 +255,7 @@
'mode' => "read",
'path' => "#{temp_directory}/*",
'stat_interval' => interval,
'discover_interval' => interval,
'discover_interval' => 1,
'sincedb_path' => "#{temp_directory}/.sincedb",
'sincedb_write_interval' => interval
}
Expand Down Expand Up @@ -284,7 +284,10 @@
wait_for_start_processing(@run_thread)
end

after { plugin.stop }
after {
plugin.stop
@run_thread.join
}

it 'processes a file' do
wait_for_file_removal(sample_file) # watched discovery
Expand All @@ -296,7 +299,7 @@

it 'removes watched file from collection' do
wait_for_file_removal(sample_file) # watched discovery
sleep(0.25) # give CI some space to execute the removal
sleep(1) # give CI some space to execute the removal
# TODO shouldn't be necessary once WatchedFileCollection does proper locking
watched_files = plugin.watcher.watch.watched_files_collection
expect( watched_files ).to be_empty
Expand Down Expand Up @@ -330,15 +333,18 @@
wait_for_start_processing(@run_thread)
end

after { plugin.stop }
after {
plugin.stop
@run_thread.join
}

it 'cleans up sincedb entry' do
wait_for_file_removal(sample_file) # watched discovery

sincedb_content = File.read(sincedb_path).strip
expect( sincedb_content ).to_not be_empty

Stud.try(3.times) do
Stud.try(3.times, RSpec::Expectations::ExpectationNotMetError) do
sleep(1.5) # > sincedb_clean_after

sincedb_content = File.read(sincedb_path).strip
Expand All @@ -350,20 +356,36 @@

private

def wait_for_start_processing(run_thread, timeout: 1.0)
def wait_for_start_processing(run_thread, timeout: 10.0)
begin
Timeout.timeout(timeout) do
sleep(0.01) while run_thread.status != 'sleep'
sleep(timeout) unless plugin.queue
end
rescue Timeout::Error
# sleep(0.01) while run_thread.status != 'sleep'
while run_thread.status != 'sleep'
puts "not sleep"
sleep(0.01)
end
# sleep(0.1) while !plugin.queue
while !plugin.queue
sleep(0.1)
puts "no queue"
end
puts "the queue size is #{plugin.queue.size}"
# sleep(0.1) while !plugin.queue
while plugin.queue.size == 0
sleep(0.1)
puts "no item on queue"
end
end
rescue Timeout::Error => e
puts "plugin timed out #{e}"
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
else
puts "plugin did not time out"
raise "plugin did not start processing" unless plugin.queue
end
end

def wait_for_file_removal(path, timeout: 3 * interval)
wait(timeout).for { File.exist?(path) }.to be_falsey
def wait_for_file_removal(path)
wait(5).for { File.exist?(path) }.to be_falsey
end
end
10 changes: 5 additions & 5 deletions spec/inputs/file_tail_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@
.run("create file") do
File.open(tmpfile_path, "wb") { |file| file.puts(line) }
end
.then_after(0.1, "identity is mapped") do
.then_after(0.25, "identity is mapped") do
wait(0.75).for{subject.codec.identity_map[tmpfile_path]}.not_to be_nil, "identity is not mapped"
end
.then("wait for auto_flush") do
Expand Down Expand Up @@ -395,7 +395,7 @@
let(:suffix) { "M" }
it "an event is generated via auto_flush" do
actions = RSpec::Sequencing
.run_after(0.1, "create files") do
.run_after(0.25, "create files") do
File.open(tmpfile_path, "wb") do |fd|
fd.puts("line1.1-of-a")
fd.puts(" line1.2-of-a")
Expand Down Expand Up @@ -478,7 +478,7 @@
"sincedb_path" => sincedb_path,
"stat_interval" => 0.1,
"max_open_files" => 1,
"close_older" => 0.5,
"close_older" => 1,
"start_position" => "beginning",
"file_sort_by" => "path",
"delimiter" => TEST_FILE_DELIMITER)
Expand All @@ -491,9 +491,9 @@
wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built"
end
.then("wait for close to flush last event of each identity") do
wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity"
wait(1.6).for{events.size}.to eq(4), "close does not flush last event of each identity"
end
.then_after(0.1, "stop") do
.then_after(0.2, "stop") do
subject.stop
end
subject.run(events)
Expand Down