diff --git a/spec/filewatch/rotate_spec.rb b/spec/filewatch/rotate_spec.rb index cdef967..122ab58 100644 --- a/spec/filewatch/rotate_spec.rb +++ b/spec/filewatch/rotate_spec.rb @@ -53,6 +53,9 @@ module FileWatch end context "create + rename rotation: when a new logfile is renamed to a path we have seen before and the open file is fully read, renamed outside glob" do + let(:stat_interval) { 0.04 } + let(:discover_interval) { 15 } + let(:watch_dir) { directory.join("*A.log") } let(:file_path) { directory.join("1A.log") } subject { described_class.new(conf) } @@ -60,21 +63,23 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do - file_path.open("wb") { |file| file.write("#{line1}\n") } - end - .then_after(0.25, "write a 'unfinished' line") do - file_path.open("ab") { |file| file.write(line2) } - end - .then_after(0.25, "rotate once") do - tmpfile = directory.join("1.logtmp") - tmpfile.open("wb") { |file| file.write("\n#{line3}\n")} + .run_after(1.25, "create file") do + file_path.open("wb") { |file| file.write("#{line1}\n") } + end + .then_after(1.25, "write a 'unfinished' line") do + file_path.open("ab") { |file| file.write("#{line2}") } + end + .then_after(1.25, "rotate once") do + tmpfile = directory.join("1.logtmp") + tmpfile.open("wb") { |file| + file.write("\n#{line3}\n") + } file_path.rename(directory.join("1.log.1")) FileUtils.mv(directory.join("1.logtmp").to_path, file1_path) - end + end .then("wait for expectation") do - sleep(0.25) # if ENV['CI'] - wait(2).for { listener1.calls }.to eq([:open, :accept, :accept, :accept]) + sleep(1.25) # if ENV['CI'] + wait(2).for { listener1.calls }.to include(:open, :accept, :accept, :accept) end .then("quit") do tailing.quit @@ -104,14 +109,14 @@ module FileWatch let(:listener3) { observer.listener_for(third_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do + .then_after(0.75, "rotate 1 - line1(66) is in 2B.log, line2(61) is in 1B.log") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do + .then_after(0.75, "rotate 2 - line1(66) is in 3B.log, line2(61) is in 2B.log, line3(47) is in 1B.log") do second_file.rename(third_file) file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line3}\n") } @@ -145,19 +150,19 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create original - write line 1, 66 bytes") do + .run_after(0.75, "create original - write line 1, 66 bytes") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rename to 2.log") do + .then_after(0.75, "rename to 2.log") do file_path.rename(second_file) end - .then_after(0.25, "write line 2 to original, 61 bytes") do + .then_after(0.75, "write line 2 to original, 61 bytes") do file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rename to 2.log again") do + .then_after(0.75, "rename to 2.log again") do file_path.rename(second_file) end - .then_after(0.25, "write line 3 to original, 47 bytes") do + .then_after(0.75, "write line 3 to original, 47 bytes") do file_path.open("wb") { |file| file.write("#{line3}\n") } end .then("wait for expectations to be met") do @@ -188,19 +193,19 @@ module FileWatch let(:listener2) { observer.listener_for(second_file.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create original - write line 1, 66 bytes") do + .run_after(1.5, "create original - write line 1, 66 bytes") do file_path.open("wb") { |file| file.write("#{line1}\n") } end - .then_after(0.25, "rename to 2.log") do + .then_after(1.5, "rename to 2.log") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line2}\n") } end - .then_after(0.25, "rename to 2.log again") do + .then_after(1.5, "rename to 2.log again") do file_path.rename(second_file) file_path.open("wb") { |file| file.write("#{line3}\n") } end .then("wait for expectations to be met") do - wait(0.5).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true) + wait(1).for{listener1.lines.size == 3 && listener2.lines.empty?}.to eq(true) end .then("quit") do tailing.quit @@ -267,14 +272,14 @@ module FileWatch let(:listener1) { observer.listener_for(file1_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.25, "rotate") do + .then_after(0.75, "rotate") do FileUtils.cp(file1_path, directory.join("1F.log.1").to_path) file_path.truncate(0) end - .then_after(0.25, "write to truncated file") do + .then_after(0.75, "write to truncated file") do file_path.open("wb") { |file| file.puts(line3) } end .then("wait for expectations to be met") do @@ -342,13 +347,13 @@ module FileWatch let(:listener2) { observer.listener_for(file2.to_path) } let(:actions) do RSpec::Sequencing - .run_after(0.25, "create file") do + .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.25, "rename") do + .then_after(0.75, "rename") do FileUtils.mv(file1_path, file2.to_path) end - .then_after(0.25, "write to renamed file") do + .then_after(0.75, "write to renamed file") do file2.open("ab") { |file| file.puts(line3) } end .then("wait for expectations to be met") do @@ -464,14 +469,14 @@ module FileWatch .run_after(0.75, "create file") do file_path.open("wb") { |file| file.puts(line1); file.puts(line2) } end - .then_after(0.5, "rename") do + .then_after(0.75, "rename") do file_path.rename(second_file) file_path.open("wb") { |file| file.puts("#{line3}") } end .then("wait for expectations to be met") do wait(2.0).for{listener1.lines.size + listener2.lines.size}.to eq(3) - end - .then_after(0.5, "rename again") do + end + .then_after(0.75, "rename again") do file_path.rename(second_file) file_path.open("wb") { |file| file.puts("#{line4}") } end diff --git a/spec/filewatch/tailing_spec.rb b/spec/filewatch/tailing_spec.rb index fbc4f9e..e369e01 100644 --- a/spec/filewatch/tailing_spec.rb +++ b/spec/filewatch/tailing_spec.rb @@ -58,6 +58,7 @@ module FileWatch ENV["FILEWATCH_MAX_FILES_WARN_INTERVAL"] = "0" File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } File.open(file_path2, "wb") { |file| file.write("line-A\nline-B\n") } + sleep(0.25) # if ENV['CI'] end context "when max_active is 1" do @@ -76,7 +77,7 @@ module FileWatch end context "when close_older is set" do - let(:wait_before_quit) { 0.8 } + let(:wait_before_quit) { 1.5 } let(:opts) { super().merge(:close_older => 0.1, :max_open_files => 1, :stat_interval => 0.1) } let(:suffix) { "B" } it "opens both files" do @@ -134,7 +135,7 @@ module FileWatch File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } end .then("wait") do - wait(0.75).for { listener1.lines }.to_not be_empty + wait(1.5).for { listener1.lines }.to_not be_empty end .then("quit") do tailing.quit @@ -155,7 +156,7 @@ module FileWatch # it simulates that the user deleted the file # so when a stat is taken on the file an error is raised let(:suffix) { "E" } - let(:quit_after) { 0.2 } + let(:quit_after) { 1 } let(:stat) { double("stat", :size => 100, :modified_at => Time.now.to_f, :inode => 234567, :inode_struct => InodeStruct.new("234567", 1, 5)) } let(:watched_file) { WatchedFile.new(file_path, stat, tailing.settings) } before do @@ -253,10 +254,10 @@ module FileWatch # create file after first discovery, will be read from the beginning File.open(file_path, "wb") { |file| file.write("line1\nline2\n") } end - .then_after(0.55, "rename file") do + .then_after(0.75, "rename file") do FileUtils.mv(file_path, new_file_path) end - .then_after(0.55, "then write to renamed file") do + .then_after(0.75, "then write to renamed file") do File.open(new_file_path, "ab") { |file| file.write("line3\nline4\n") } wait(0.5).for{listener1.lines.size}.to eq(2), "listener1.lines.size not eq(2)" end @@ -354,7 +355,7 @@ module FileWatch end .then("watch and wait") do tailing.watch_this(watch_dir) - wait(1.25).for{listener1.calls}.to eq([:open, :timed_out]) + wait(2).for{listener1.calls}.to eq([:open, :timed_out]) end .then("quit") do tailing.quit @@ -450,7 +451,7 @@ module FileWatch FileUtils.mv(file_path2, file_path3) end .then("wait") do - wait(4).for do + wait(8).for do listener1.lines.size == 32 && listener2.calls == [:delete] && listener3.calls == [:open, :accept, :timed_out] end.to eq(true), "listener1.lines != 32 or listener2.calls != [:delete] or listener3.calls != [:open, :accept, :timed_out]" end @@ -481,7 +482,7 @@ module FileWatch tailing.watch_this(watch_dir) end .then("wait for lines") do - wait(1.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out]) + wait(2.5).for{listener1.calls}.to eq([:open, :accept, :accept, :timed_out]) end .then("quit") do tailing.quit diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 54c3685..9a13663 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -36,7 +36,7 @@ end events = input(conf) do |pipeline, queue| - wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey + wait(2).for{File.exist?(tmpfile_path)}.to be_falsey 2.times.collect { queue.pop } end @@ -102,7 +102,7 @@ end events = input(conf) do |pipeline, queue| - wait(0.5).for{File.exist?(tmpfile_path)}.to be_falsey + wait(1).for{File.exist?(tmpfile_path)}.to be_falsey 3.times.collect { queue.pop } end @@ -255,7 +255,7 @@ 'mode' => "read", 'path' => "#{temp_directory}/*", 'stat_interval' => interval, - 'discover_interval' => interval, + 'discover_interval' => 1, 'sincedb_path' => "#{temp_directory}/.sincedb", 'sincedb_write_interval' => interval } @@ -284,7 +284,10 @@ wait_for_start_processing(@run_thread) end - after { plugin.stop } + after { + plugin.stop + @run_thread.join + } it 'processes a file' do wait_for_file_removal(sample_file) # watched discovery @@ -296,7 +299,7 @@ it 'removes watched file from collection' do wait_for_file_removal(sample_file) # watched discovery - sleep(0.25) # give CI some space to execute the removal + sleep(1) # give CI some space to execute the removal # TODO shouldn't be necessary once WatchedFileCollection does proper locking watched_files = plugin.watcher.watch.watched_files_collection expect( watched_files ).to be_empty @@ -330,7 +333,10 @@ wait_for_start_processing(@run_thread) end - after { plugin.stop } + after { + plugin.stop + @run_thread.join + } it 'cleans up sincedb entry' do wait_for_file_removal(sample_file) # watched discovery @@ -338,7 +344,7 @@ sincedb_content = File.read(sincedb_path).strip expect( sincedb_content ).to_not be_empty - Stud.try(3.times) do + Stud.try(3.times, RSpec::Expectations::ExpectationNotMetError) do sleep(1.5) # > sincedb_clean_after sincedb_content = File.read(sincedb_path).strip @@ -350,20 +356,36 @@ private - def wait_for_start_processing(run_thread, timeout: 1.0) + def wait_for_start_processing(run_thread, timeout: 10.0) begin Timeout.timeout(timeout) do - sleep(0.01) while run_thread.status != 'sleep' - sleep(timeout) unless plugin.queue - end - rescue Timeout::Error + # sleep(0.01) while run_thread.status != 'sleep' + while run_thread.status != 'sleep' + puts "not sleep" + sleep(0.01) + end + # sleep(0.1) while !plugin.queue + while !plugin.queue + sleep(0.1) + puts "no queue" + end + puts "the queue size is #{plugin.queue.size}" + # sleep(0.1) while !plugin.queue + while plugin.queue.size == 0 + sleep(0.1) + puts "no item on queue" + end + end + rescue Timeout::Error => e + puts "plugin timed out #{e}" raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue else + puts "plugin did not time out" raise "plugin did not start processing" unless plugin.queue end end - def wait_for_file_removal(path, timeout: 3 * interval) - wait(timeout).for { File.exist?(path) }.to be_falsey + def wait_for_file_removal(path) + wait(5).for { File.exist?(path) }.to be_falsey end end diff --git a/spec/inputs/file_tail_spec.rb b/spec/inputs/file_tail_spec.rb index 7aefc69..2eb1575 100644 --- a/spec/inputs/file_tail_spec.rb +++ b/spec/inputs/file_tail_spec.rb @@ -288,7 +288,7 @@ .run("create file") do File.open(tmpfile_path, "wb") { |file| file.puts(line) } end - .then_after(0.1, "identity is mapped") do + .then_after(0.25, "identity is mapped") do wait(0.75).for{subject.codec.identity_map[tmpfile_path]}.not_to be_nil, "identity is not mapped" end .then("wait for auto_flush") do @@ -395,7 +395,7 @@ let(:suffix) { "M" } it "an event is generated via auto_flush" do actions = RSpec::Sequencing - .run_after(0.1, "create files") do + .run_after(0.25, "create files") do File.open(tmpfile_path, "wb") do |fd| fd.puts("line1.1-of-a") fd.puts(" line1.2-of-a") @@ -478,7 +478,7 @@ "sincedb_path" => sincedb_path, "stat_interval" => 0.1, "max_open_files" => 1, - "close_older" => 0.5, + "close_older" => 1, "start_position" => "beginning", "file_sort_by" => "path", "delimiter" => TEST_FILE_DELIMITER) @@ -491,9 +491,9 @@ wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built" end .then("wait for close to flush last event of each identity") do - wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity" + wait(1.6).for{events.size}.to eq(4), "close does not flush last event of each identity" end - .then_after(0.1, "stop") do + .then_after(0.2, "stop") do subject.stop end subject.run(events)