Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feat: use user space buff to write binlog instead of Mmap #2848

Open
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"
#include "include/pika_define.h"
#include "net/src/dispatch_thread.h"

std::string NewFileName(const std::string& name, uint32_t current);

Expand Down Expand Up @@ -76,6 +77,7 @@ class Binlog : public pstd::noncopyable {
}

void Close();
void FlushBufferedFile();

private:
pstd::Status Put(const char* item, int len);
Expand Down Expand Up @@ -108,6 +110,8 @@ class Binlog : public pstd::noncopyable {
std::string filename_;

std::atomic<bool> binlog_io_error_;

net::TimerTaskThread timer_task_thread_;
};

#endif
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class PikaServer;
/* Global Const */
constexpr int MAX_DB_NUM = 8;
constexpr int FWRITE_USER_SPACE_BUF_SIZE = 512LL << 10;//512KB

/* Port shift */
const int kPortShiftRSync = 1000;
Expand Down
32 changes: 23 additions & 9 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Binlog::Binlog(std::string binlog_path, const int file_size)
LOG(INFO) << "Binlog: Manifest file not exist, we create a new one.";

profile = NewFileName(filename_, pro_num_);
s = pstd::NewWritableFile(profile, queue_);
s = pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE);
if (!s.ok()) {
LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString();
}
Expand Down Expand Up @@ -112,23 +112,37 @@ Binlog::Binlog(std::string binlog_path, const int file_size)

profile = NewFileName(filename_, pro_num_);
DLOG(INFO) << "Binlog: open profile " << profile;
s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_);
s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_);
if (!s.ok()) {
LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString();
}

uint64_t filesize = queue_->Filesize();
DLOG(INFO) << "Binlog: filesize is " << filesize;
}

InitLogFile();

timer_task_thread_.AddTimerTask("flush_binlog_task", 500, true,
[this] { this->FlushBufferedFile(); });
timer_task_thread_.StartThread();
}

Binlog::~Binlog() {
std::lock_guard l(mutex_);
timer_task_thread_.StopThread();
Close();
}

void Binlog::FlushBufferedFile() {
std::lock_guard l(mutex_);
if (!opened_.load()) {
return;
}
if (queue_) {
queue_->Flush();
}
}

void Binlog::Close() {
if (!opened_.load()) {
return;
Expand Down Expand Up @@ -210,7 +224,7 @@ Status Binlog::Put(const char* item, int len) {
if (filesize > file_size_) {
std::unique_ptr<pstd::WritableFile> queue;
std::string profile = NewFileName(filename_, pro_num_ + 1);
s = pstd::NewWritableFile(profile, queue);
s = pstd::NewBufferedWritableFile(profile, queue, FWRITE_USER_SPACE_BUF_SIZE);
if (!s.ok()) {
LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString();
return s;
Expand Down Expand Up @@ -263,9 +277,9 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int*
s = queue_->Append(pstd::Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(pstd::Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
// if (s.ok()) {
// s = queue_->Flush();
// }
}
block_offset_ += static_cast<int32_t>(kHeaderSize + n);

Expand Down Expand Up @@ -387,7 +401,7 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t
pstd::DeleteFile(profile);
}

pstd::NewWritableFile(profile, queue_);
pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE);
Binlog::AppendPadding(queue_.get(), &pro_offset);

pro_num_ = pro_num;
Expand Down Expand Up @@ -426,7 +440,7 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) {
version_->StableSave();
}

Status s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_);
Status s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_);
if (!s.ok()) {
return s;
}
Expand Down
8 changes: 7 additions & 1 deletion src/pstd/include/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ Status NewSequentialFile(const std::string& fname, std::unique_ptr<SequentialFil

Status NewWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result);

Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes);

Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset);

Status NewRWFile(const std::string& fname, std::unique_ptr<RWFile>& result);

Status AppendSequentialFile(const std::string& fname, SequentialFile** result);
Expand Down Expand Up @@ -100,7 +106,7 @@ class SequentialFile {
// virtual Status Read(size_t n, char *&result, char *scratch) = 0;
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
virtual Status Skip(uint64_t n) = 0;
// virtual Status Close() = 0;
// virtual Status Close() = 0;F
virtual char* ReadLine(char* buf, int n) = 0;
};

Expand Down
183 changes: 179 additions & 4 deletions src/pstd/src/env.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#include "pstd/include/env.h"

#include <dirent.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <cassert>

#include <cstdio>
#include <fstream>
#include <sstream>
#include <utility>
#include <thread>
Expand Down Expand Up @@ -472,6 +468,102 @@ class PosixMmapFile : public WritableFile {
uint64_t Filesize() override { return write_len_ + file_offset_ + (dst_ - base_); }
};

class BufferedWritableFile : public WritableFile {
private:
std::string filename_;
FILE* file_;
int32_t user_space_buf_size_;
uint64_t curr_file_size_;
char* buffer_;

public:
BufferedWritableFile() = delete;
BufferedWritableFile(const BufferedWritableFile&) = delete;
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete;

// caller must ensure passing-in 'file' is not nullptr
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
}

~BufferedWritableFile() override {
if (file_) {
BufferedWritableFile::Close();
}
if(buffer_) {
free(buffer_);
}
}
Comment on lines +494 to +501
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors from fclose in the destructor.

The destructor should handle potential errors from fclose to ensure that any issues during file closing are logged or managed.

-  if (file_) {
-    BufferedWritableFile::Close();
-  }
+  if (file_) {
+    auto s = BufferedWritableFile::Close();
+    if (!s.ok()) {
+      LOG(WARNING) << "Error closing file in destructor: " << s.ToString();
+    }
+  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
~BufferedWritableFile() override {
if (file_) {
BufferedWritableFile::Close();
}
if(buffer_) {
free(buffer_);
}
}
~BufferedWritableFile() override {
if (file_) {
auto s = BufferedWritableFile::Close();
if (!s.ok()) {
LOG(WARNING) << "Error closing file in destructor: " << s.ToString();
}
}
if(buffer_) {
free(buffer_);
}
}


int32_t GetUserSpaceBufSize() const { return user_space_buf_size_; }

Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;

while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}

return Status::OK();
}
Comment on lines +505 to +533
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve handling of ENOSPC and retry logic in Append method.

The Append method should handle ENOSPC more gracefully and ensure that retries are limited to avoid infinite loops.

-  if (errno == ENOSPC || ++retry_count > max_retries) {
+  if (errno == ENOSPC) {
+    return Status::Error("No space left on device: " + filename_);
+  }
+  if (++retry_count > max_retries) {
    return IOError(filename_, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC) {
return Status::Error("No space left on device: " + filename_);
}
if (++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}


Status Close() override {
if (fclose(file_) != 0) {
return IOError("fclose failed: " + filename_, errno);
}
file_ = nullptr;
return Status::OK();
}
Comment on lines +535 to +541
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential errors from fclose in Close method.

The Close method should handle potential errors from fclose to ensure that any issues during file closing are logged or managed.

-  if (fclose(file_) != 0) {
-    return IOError("fclose failed: " + filename_, errno);
-  }
+  if (fclose(file_) != 0) {
+    auto err = errno;
+    file_ = nullptr;
+    return IOError("fclose failed: " + filename_, err);
  }
  file_ = nullptr;
  return Status::OK();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Close() override {
if (fclose(file_) != 0) {
return IOError("fclose failed: " + filename_, errno);
}
file_ = nullptr;
return Status::OK();
}
Status Close() override {
if (fclose(file_) != 0) {
auto err = errno;
file_ = nullptr;
return IOError("fclose failed: " + filename_, err);
}
file_ = nullptr;
return Status::OK();
}


Status Flush() override {
if (fflush(file_) != 0) {
return IOError("fflush failed: " + filename_, errno);
}
return Status::OK();
}

Status Sync() override {
auto s = BufferedWritableFile::Flush();
if (!s.ok()) {
return s;
}
int32_t file_fd = fileno(file_);
if (fsync(file_fd) != 0) {
return IOError("fsync failed: " + filename_, errno);
}
return Status::OK();
}

Status Trim(uint64_t target) override { return Status::OK(); }

uint64_t Filesize() override { return curr_file_size_; }
};

RWFile::~RWFile() = default;

class MmapRWFile : public RWFile {
Expand Down Expand Up @@ -648,6 +740,89 @@ Status NewWritableFile(const std::string& fname, std::unique_ptr<WritableFile>&
return s;
}

Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}

result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}
Comment on lines +743 to +785
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in BufferedAppendableFile function.

The function should handle errors from buffer allocation and fseek more gracefully.

-  if (user_space_buf_size_bytes != 0) {
-    buf = (char*)malloc(user_space_buf_size_bytes);
-    if (!buf) {
-      fclose(file);
-      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
-    }
-    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
-  } else {
-    r = setvbuf(file, nullptr, _IONBF, 0);
-  }
-  if (r != 0) {
-    fclose(file);
-    return IOError("Failed to set user space buffer for " + fname, errno);
-  }
+  if (user_space_buf_size_bytes != 0) {
+    buf = (char*)malloc(user_space_buf_size_bytes);
+    if (!buf) {
+      fclose(file);
+      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+    }
+    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+  } else {
+    r = setvbuf(file, nullptr, _IONBF, 0);
+  }
+  if (r != 0) {
+    free(buf);
+    fclose(file);
+    return IOError("Failed to set user space buffer for " + fname, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}
Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
free(buf);
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}


Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}
Comment on lines +787 to +824
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in NewBufferedWritableFile function.

The function should handle errors from buffer allocation more gracefully.

-  if (user_space_buf_size_bytes != 0) {
-    buf = (char*)malloc(user_space_buf_size_bytes);
-    if (!buf) {
-      fclose(file);
-      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
-    }
-    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
-  } else {
-    r = setvbuf(file, nullptr, _IONBF, 0);
-  }
-  if (r != 0) {
-    fclose(file);
-    return IOError("Failed to set user space buffer for " + fname, errno);
-  }
+  if (user_space_buf_size_bytes != 0) {
+    buf = (char*)malloc(user_space_buf_size_bytes);
+    if (!buf) {
+      fclose(file);
+      return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
+    }
+    r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
+  } else {
+    r = setvbuf(file, nullptr, _IONBF, 0);
+  }
+  if (r != 0) {
+    free(buf);
+    fclose(file);
+    return IOError("Failed to set user space buffer for " + fname, errno);
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}
Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}
FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}
if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
free(buf);
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}
// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}


Status NewRWFile(const std::string& fname, std::unique_ptr<RWFile>& result) {
Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_CLOEXEC, 0644);
Expand Down
Loading