forked from facebook/wdt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FileWriter.cpp
138 lines (132 loc) · 4.4 KB
/
FileWriter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include "FileWriter.h"
#include "WdtOptions.h"
#include "Reporting.h"
#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/stat.h>
#include <sys/types.h>
namespace facebook {
namespace wdt {
ErrorCode FileWriter::open() {
auto &options = WdtOptions::get();
if (options.skip_writes) {
return OK;
}
if (blockDetails_->fileSize == blockDetails_->dataSize) {
// single block file
WDT_CHECK(blockDetails_->offset == 0);
fd_ = fileCreator_->openAndSetSize(blockDetails_);
} else {
// multi block file
fd_ = fileCreator_->openForBlocks(threadIndex_, blockDetails_);
if (fd_ >= 0 && blockDetails_->offset > 0) {
START_PERF_TIMER
if (lseek(fd_, blockDetails_->offset, SEEK_SET) < 0) {
PLOG(ERROR) << "Unable to seek " << blockDetails_->fileName;
close();
} else {
RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK)
}
}
}
if (fd_ == -1) {
LOG(ERROR) << "File open/seek failed for " << blockDetails_->fileName;
return FILE_WRITE_ERROR;
}
return OK;
}
void FileWriter::close() {
if (fd_ >= 0) {
START_PERF_TIMER
if (::close(fd_) != 0) {
PLOG(ERROR) << "Unable to close fd " << fd_;
}
RECORD_PERF_RESULT(PerfStatReport::FILE_CLOSE)
fd_ = -1;
}
}
ErrorCode FileWriter::write(char *buf, int64_t size) {
auto &options = WdtOptions::get();
if (!options.skip_writes) {
int64_t count = 0;
while (count < size) {
START_PERF_TIMER
int64_t written = ::write(fd_, buf + count, size - count);
if (written == -1) {
if (errno == EINTR) {
VLOG(1) << "Disk write interrupted, retrying "
<< blockDetails_->fileName;
continue;
}
PLOG(ERROR) << "File write failed for " << blockDetails_->fileName
<< "fd : " << fd_ << " " << written << " " << count << " "
<< size;
return FILE_WRITE_ERROR;
}
RECORD_PERF_RESULT(PerfStatReport::FILE_WRITE)
count += written;
}
VLOG(1) << "Successfully written " << count << " bytes to fd " << fd_
<< " for file " << blockDetails_->fileName;
bool finished = ((totalWritten_ + size) == blockDetails_->dataSize);
if (options.enable_download_resumption && finished) {
if (fsync(fd_) != 0) {
PLOG(ERROR) << "fsync failed for " << blockDetails_->fileName
<< " offset " << blockDetails_->offset << " file-size "
<< blockDetails_->fileSize
<< " data-size << blockDetails_->dataSize";
return FILE_WRITE_ERROR;
}
} else {
syncFileRange(count, finished);
}
}
totalWritten_ += size;
return OK;
}
void FileWriter::syncFileRange(int64_t written, bool forced) {
#ifdef HAS_SYNC_FILE_RANGE
auto &options = WdtOptions::get();
if (options.disk_sync_interval_mb < 0) {
return;
}
const int64_t syncIntervalBytes = options.disk_sync_interval_mb * 1024 * 1024;
writtenSinceLastSync_ += written;
if (writtenSinceLastSync_ == 0) {
// no need to sync
VLOG(1) << "skipping syncFileRange for " << blockDetails_->fileName
<< ". Data written " << written
<< " sync forced = " << std::boolalpha << forced;
return;
}
if (forced || writtenSinceLastSync_ > syncIntervalBytes) {
// sync_file_range with flag SYNC_FILE_RANGE_WRITE is an asynchronous
// operation. So, this is not that costly. Source :
// http://yoshinorimatsunobu.blogspot.com/2014/03/how-syncfilerange-really-works.html
START_PERF_TIMER
auto status = sync_file_range(fd_, nextSyncOffset_, writtenSinceLastSync_,
SYNC_FILE_RANGE_WRITE);
if (status != 0) {
PLOG(ERROR) << "sync_file_range() failed for " << blockDetails_->fileName
<< "fd " << fd_;
return;
}
RECORD_PERF_RESULT(PerfStatReport::SYNC_FILE_RANGE)
VLOG(1) << "file range [" << nextSyncOffset_ << " " << writtenSinceLastSync_
<< "] synced for file " << blockDetails_->fileName;
nextSyncOffset_ += writtenSinceLastSync_;
writtenSinceLastSync_ = 0;
}
#endif
}
}
}