diff --git a/include/pika_binlog.h b/include/pika_binlog.h index 851de88746..f8d598112e 100644 --- a/include/pika_binlog.h +++ b/include/pika_binlog.h @@ -13,6 +13,7 @@ #include "pstd/include/pstd_status.h" #include "pstd/include/noncopyable.h" #include "include/pika_define.h" +#include "net/src/dispatch_thread.h" std::string NewFileName(const std::string& name, uint32_t current); @@ -76,6 +77,7 @@ class Binlog : public pstd::noncopyable { } void Close(); + void FlushBufferedFile(); private: pstd::Status Put(const char* item, int len); @@ -108,6 +110,8 @@ class Binlog : public pstd::noncopyable { std::string filename_; std::atomic binlog_io_error_; + + net::TimerTaskThread timer_task_thread_; }; #endif diff --git a/include/pika_define.h b/include/pika_define.h index 3968f9072f..91874fc3ec 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -32,6 +32,7 @@ class PikaServer; /* Global Const */ constexpr int MAX_DB_NUM = 8; +constexpr int FWRITE_USER_SPACE_BUF_SIZE = 512LL << 10;//512KB /* Port shift */ const int kPortShiftRSync = 1000; diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index 6f4ed2861d..5af3ddb64e 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -81,7 +81,7 @@ Binlog::Binlog(std::string binlog_path, const int file_size) LOG(INFO) << "Binlog: Manifest file not exist, we create a new one."; profile = NewFileName(filename_, pro_num_); - s = pstd::NewWritableFile(profile, queue_); + s = pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE); if (!s.ok()) { LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString(); } @@ -112,7 +112,7 @@ Binlog::Binlog(std::string binlog_path, const int file_size) profile = NewFileName(filename_, pro_num_); DLOG(INFO) << "Binlog: open profile " << profile; - s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_); + s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_); if (!s.ok()) { LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString(); } @@ -120,15 +120,29 @@ Binlog::Binlog(std::string binlog_path, const int file_size) uint64_t filesize = queue_->Filesize(); DLOG(INFO) << "Binlog: filesize is " << filesize; } - InitLogFile(); + + timer_task_thread_.AddTimerTask("flush_binlog_task", 500, true, + [this] { this->FlushBufferedFile(); }); + timer_task_thread_.StartThread(); } Binlog::~Binlog() { std::lock_guard l(mutex_); + timer_task_thread_.StopThread(); Close(); } +void Binlog::FlushBufferedFile() { + std::lock_guard l(mutex_); + if (!opened_.load()) { + return; + } + if (queue_) { + queue_->Flush(); + } +} + void Binlog::Close() { if (!opened_.load()) { return; @@ -210,7 +224,7 @@ Status Binlog::Put(const char* item, int len) { if (filesize > file_size_) { std::unique_ptr queue; std::string profile = NewFileName(filename_, pro_num_ + 1); - s = pstd::NewWritableFile(profile, queue); + s = pstd::NewBufferedWritableFile(profile, queue, FWRITE_USER_SPACE_BUF_SIZE); if (!s.ok()) { LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString(); return s; @@ -263,9 +277,9 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* s = queue_->Append(pstd::Slice(buf, kHeaderSize)); if (s.ok()) { s = queue_->Append(pstd::Slice(ptr, n)); - if (s.ok()) { - s = queue_->Flush(); - } +// if (s.ok()) { +// s = queue_->Flush(); +// } } block_offset_ += static_cast(kHeaderSize + n); @@ -387,7 +401,7 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t pstd::DeleteFile(profile); } - pstd::NewWritableFile(profile, queue_); + pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE); Binlog::AppendPadding(queue_.get(), &pro_offset); pro_num_ = pro_num; @@ -426,7 +440,7 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) { version_->StableSave(); } - Status s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_); + Status s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_); if (!s.ok()) { return s; } diff --git a/src/pstd/include/env.h b/src/pstd/include/env.h index 8e8cbbaa37..ca04025ce9 100644 --- a/src/pstd/include/env.h +++ b/src/pstd/include/env.h @@ -68,6 +68,12 @@ Status NewSequentialFile(const std::string& fname, std::unique_ptr& result); +Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr& result, + int32_t user_space_buf_size_bytes); + +Status BufferedAppendableFile(const std::string& fname, std::unique_ptr& result, + int32_t user_space_buf_size_bytes, int64_t offset); + Status NewRWFile(const std::string& fname, std::unique_ptr& result); Status AppendSequentialFile(const std::string& fname, SequentialFile** result); @@ -100,7 +106,7 @@ class SequentialFile { // virtual Status Read(size_t n, char *&result, char *scratch) = 0; virtual Status Read(size_t n, Slice* result, char* scratch) = 0; virtual Status Skip(uint64_t n) = 0; - // virtual Status Close() = 0; + // virtual Status Close() = 0;F virtual char* ReadLine(char* buf, int n) = 0; }; diff --git a/src/pstd/src/env.cc b/src/pstd/src/env.cc index 7dadf924ea..00cac14009 100644 --- a/src/pstd/src/env.cc +++ b/src/pstd/src/env.cc @@ -1,15 +1,11 @@ #include "pstd/include/env.h" -#include #include #include #include -#include -#include #include #include -#include #include #include #include @@ -472,6 +468,102 @@ class PosixMmapFile : public WritableFile { uint64_t Filesize() override { return write_len_ + file_offset_ + (dst_ - base_); } }; +class BufferedWritableFile : public WritableFile { + private: + std::string filename_; + FILE* file_; + int32_t user_space_buf_size_; + uint64_t curr_file_size_; + char* buffer_; + + public: + BufferedWritableFile() = delete; + BufferedWritableFile(const BufferedWritableFile&) = delete; + BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; + + // caller must ensure passing-in 'file' is not nullptr + BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) + : filename_(std::move(file_name)), + file_(file), + user_space_buf_size_(user_space_buf_size), + curr_file_size_(curr_file_size), + buffer_(buf) { + assert(file_ && "file_ can not be nullptr"); + } + + ~BufferedWritableFile() override { + if (file_) { + BufferedWritableFile::Close(); + } + if(buffer_) { + free(buffer_); + } + } + + int32_t GetUserSpaceBufSize() const { return user_space_buf_size_; } + + Status Append(const Slice& data) override { + if (!file_) { + return IOError("fwrite target: " + filename_ + " is not opened", errno); + } + const char* src = data.data(); + size_t left = data.size(); + int32_t max_retries = 4; + int retry_count = 0; + + while (left > 0) { + size_t written = fwrite(src, sizeof(char), left, file_); + if (written == 0) { + if (ferror(file_)) { + int err_num = errno; + clearerr(file_); + return IOError("fwrite error with " + filename_, err_num); + } + if (errno == ENOSPC || ++retry_count > max_retries) { + return IOError(filename_, errno); + } + } + src += written; + left -= written; + curr_file_size_ += written; + retry_count = 0; + } + + return Status::OK(); + } + + Status Close() override { + if (fclose(file_) != 0) { + return IOError("fclose failed: " + filename_, errno); + } + file_ = nullptr; + return Status::OK(); + } + + Status Flush() override { + if (fflush(file_) != 0) { + return IOError("fflush failed: " + filename_, errno); + } + return Status::OK(); + } + + Status Sync() override { + auto s = BufferedWritableFile::Flush(); + if (!s.ok()) { + return s; + } + int32_t file_fd = fileno(file_); + if (fsync(file_fd) != 0) { + return IOError("fsync failed: " + filename_, errno); + } + return Status::OK(); + } + + Status Trim(uint64_t target) override { return Status::OK(); } + + uint64_t Filesize() override { return curr_file_size_; } +}; + RWFile::~RWFile() = default; class MmapRWFile : public RWFile { @@ -648,6 +740,89 @@ Status NewWritableFile(const std::string& fname, std::unique_ptr& return s; } +Status BufferedAppendableFile(const std::string& fname, std::unique_ptr& result, + int32_t user_space_buf_size_bytes, int64_t offset) { + const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644); + if (fd < 0) { + return IOError(fname, errno); + } + + FILE* file = fdopen(fd, "r+"); + if (file == nullptr) { + close(fd); + return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); + } + + if (user_space_buf_size_bytes < 0) { + fclose(file); + return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile"); + } + char* buf = nullptr; + int32_t r = 0; + if (user_space_buf_size_bytes != 0) { + buf = (char*)malloc(user_space_buf_size_bytes); + if (!buf) { + fclose(file); + return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); + } + r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); + } else { + r = setvbuf(file, nullptr, _IONBF, 0); + } + if (r != 0) { + fclose(file); + return IOError("Failed to set user space buffer for " + fname, errno); + } + + // Move the file pointer to the specified offset + if (fseek(file, offset, SEEK_SET) != 0) { + fclose(file); + return IOError("Failed to seek to the specified offset in " + fname, errno); + } + + result = std::make_unique(fname, file, user_space_buf_size_bytes, offset, buf); + return Status::OK(); +} + +Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr& result, + int32_t user_space_buf_size_bytes) { + const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644); + if (fd < 0) { + return IOError(fname, errno); + } + + FILE* file = fdopen(fd, "w+"); + if (file == nullptr) { + close(fd); + return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno); + } + + if (user_space_buf_size_bytes < 0) { + fclose(file); + return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile"); + } + char* buf = nullptr; + int32_t r = 0; + if (user_space_buf_size_bytes != 0) { + buf = (char*)malloc(user_space_buf_size_bytes); + if (!buf) { + fclose(file); + return Status::Error("Failed to allocate buffer when BufferedAppendableFile"); + } + r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes); + } else { + r = setvbuf(file, nullptr, _IONBF, 0); + } + if (r != 0) { + fclose(file); + return IOError("Failed to set user space buffer for " + fname, errno); + } + + // the file was trancated if it was existing for syscall open use flag "O_TRUNC", + result = std::make_unique(fname, file, user_space_buf_size_bytes, 0, buf); + return Status::OK(); +} + Status NewRWFile(const std::string& fname, std::unique_ptr& result) { Status s; const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_CLOEXEC, 0644);