forked from facebook/wdt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DirectorySourceQueue.h
326 lines (271 loc) · 10.5 KB
/
DirectorySourceQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <algorithm>
#include <condition_variable>
#include <dirent.h>
#include <glog/logging.h>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <utility>
#include <unordered_map>
#include "SourceQueue.h"
#include "WdtOptions.h"
#include "FileByteSource.h"
#include "Protocol.h"
namespace facebook {
namespace wdt {
/// filename-filesize pair. Negative filesize denotes the entire file.
typedef std::pair<std::string, int64_t> FileInfo;
/**
* SourceQueue that returns all the regular files under a given directory
* (recursively) as individual FileByteSource objects, sorted by decreasing
* file size.
*
* TODO: The actual building of the queue is specific to this implementation
* which may or may not make it easy to plug a different implementation
* (as shown by the current implementation of Sender.cpp)
*/
class DirectorySourceQueue : public SourceQueue {
public:
/**
* Create a DirectorySourceQueue.
* Call buildQueueSynchronously() or buildQueueAsynchronously() separately
* to actually recurse over the root directory gather files and sizes.
*
* @param rootDir root directory to recurse on
*/
explicit DirectorySourceQueue(const std::string &rootDir);
/**
* Recurse over given root directory, gather data about regular files and
* initialize internal data structures. getNextSource() will return sources
* as this call discovers them.
*
* This should only be called once. Subsequent calls will do nothing and
* return false. In case it is called from multiple threads, one of them
* will do initialization while the other calls will fail.
*
* This is synchronous in the succeeding thread - it will block until
* the directory is completely discovered. Use buildQueueAsynchronously()
* for async fetch from parallel thread.
*
* @return true iff initialization was successful and hasn't
* been done before
*/
bool buildQueueSynchronously();
/**
* Starts a new thread to build the queue @see buildQueueSynchronously()
* @return the created thread (to be joined if needed)
*/
std::thread buildQueueAsynchronously();
/// @return true iff all regular files under root dir have been consumed
bool finished() const override;
/// @return true if all the files have been discovered, false otherwise
bool fileDiscoveryFinished() const;
/**
* @param status this variable is set to the status of the transfer
*
* @return next FileByteSource to consume or nullptr when finished
*/
virtual std::unique_ptr<ByteSource> getNextSource(ErrorCode &status) override;
/// @return total number of files processed/enqueued
virtual int64_t getCount() const override;
/// @return total size of files processed/enqueued
virtual int64_t getTotalSize() const override;
/// @return total number of blocks and status of the transfer
std::pair<int64_t, ErrorCode> getNumBlocksAndStatus() const;
/**
* Sets regex representing files to include for transfer
*
* @param includePattern file inclusion regex
*/
void setIncludePattern(const std::string &includePattern);
/**
* Sets regex representing files to exclude for transfer
*
* @param excludePattern file exclusion regex
*/
void setExcludePattern(const std::string &excludePattern);
/**
* Sets regex representing directories to exclude for transfer
*
* @param pruneDirPattern directory exclusion regex
*/
void setPruneDirPattern(const std::string &pruneDirPattern);
/**
* Sets buffer size to use during creating individual FileByteSource object
*
* @param fileSourceBufferSize buffers size
*/
void setFileSourceBufferSize(const int64_t fileSourceBufferSize);
/**
* Stat the FileInfo input files (if their size aren't already specified) and
* insert them in the queue
*
* @param fileInfo files to transferred
*/
void setFileInfo(const std::vector<FileInfo> &fileInfo);
/// Get the file info in this directory queue
const std::vector<FileInfo> &getFileInfo() const;
/**
* Sets whether to follow symlink or not
*
* @param followSymlinks whether to follow symlink or not
*/
void setFollowSymlinks(const bool followSymlinks);
/**
* sets chunks which were sent in some previous transfer
*
* @param previouslyTransferredChunks previously sent chunk info
*/
void setPreviouslyReceivedChunks(
std::vector<FileChunksInfo> &previouslyTransferredChunks);
/**
* returns sources to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param sources sources to be returned to the queue
*/
void returnToQueue(std::vector<std::unique_ptr<ByteSource>> &sources);
/**
* returns a source to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param source source to be returned to the queue
*/
void returnToQueue(std::unique_ptr<ByteSource> &source);
/**
* Returns list of files which were not transferred. It empties the queue and
* adds queue entries to the failed file list. This function should be called
* after all the sending threads have finished execution
*
* @return stats for failed sources
*/
std::vector<TransferStats> &getFailedSourceStats();
/// @return returns list of directories which could not be opened
std::vector<std::string> &getFailedDirectories();
virtual ~DirectorySourceQueue();
/// Returns the time it took to traverse the directory tree
double getDirectoryTime() const {
return directoryTime_;
}
private:
/**
* Traverse rootDir_ to gather files and sizes to enqueue
*
* @return true on success, false on error
*/
bool explore();
/**
* Stat the input files and populate queue
* @return true on success, false on error
*/
bool enqueueFiles();
/**
* initial creation from either explore or enqueue files - always increment
* numentries inside the lock, doesn't check for fail retries
*
* @param fullPath full path of the file to be added
* @param relPath file path relative to root dir
* @param fileSize size of the file
* @param alreadyLocked whether lock has already been acquired by the
* calling method
*/
void createIntoQueue(const std::string &fullPath, const std::string &relPath,
const int64_t fileSize, bool alreadyLocked);
/**
* when adding multiple files, we have the option of using notify_one multiple
* times or notify_all once. depending on number of added sources, this
* function uses either notify_one or notify_all
*
* @param addedSource number of sources added
*/
void smartNotify(int32_t addedSource);
/// root directory to recurse on if fileInfo_ is empty
std::string rootDir_;
/// regex representing directories to prune
std::string pruneDirPattern_;
/// regex representing files to include
std::string includePattern_;
/// regex representing files to exclude
std::string excludePattern_;
/**
* buffer size to use when creating individual FileByteSource objects
* (returned by getNextSource).
*/
int64_t fileSourceBufferSize_;
/// List of files to enqueue instead of recursing over rootDir_.
std::vector<FileInfo> fileInfo_;
/// protects initCalled_/initFinished_/sourceQueue_
mutable std::mutex mutex_;
/// condition variable indicating sourceQueue_ is not empty
mutable std::condition_variable conditionNotEmpty_;
/// Indicates whether init() has been called to prevent multiple calls
bool initCalled_{false};
/// Indicates whether call to init() has finished
bool initFinished_{false};
struct SourceComparator {
bool operator()(const std::unique_ptr<ByteSource> &source1,
const std::unique_ptr<ByteSource> &source2) {
auto retryCount1 = source1->getTransferStats().getFailedAttempts();
auto retryCount2 = source2->getTransferStats().getFailedAttempts();
if (retryCount1 != retryCount2) {
return retryCount1 > retryCount2;
}
if (source1->getSize() != source2->getSize()) {
return source1->getSize() < source2->getSize();
}
if (source1->getOffset() != source2->getOffset()) {
return source1->getOffset() > source2->getOffset();
}
return source1->getIdentifier() > source2->getIdentifier();
}
};
/**
* priority queue of sources. Sources are first ordered by increasing
* failedAttempts, then by decreasing size. If sizes are equal(always for
* blocks), sources are ordered by offset. This way, we ensure that all the
* threads in the receiver side are not writing to the same file at the same
* time.
*/
std::priority_queue<std::unique_ptr<ByteSource>,
std::vector<std::unique_ptr<ByteSource>>,
SourceComparator> sourceQueue_;
/// Transfer stats for sources which are not transferred
std::vector<TransferStats> failedSourceStats_;
/// directories which could not be opened
std::vector<std::string> failedDirectories_;
/// Total number of files that have passed through the queue
int64_t numEntries_{0};
/// Seq-id of the next file to be inserted into the queue
int64_t nextSeqId_{0};
/// total number of blocks that have passed through the queue. Even when
/// blocks are actually disabled, our code internally treats files like single
/// blocks. So, numBlocks_ >= numFiles_.
int64_t numBlocks_{0};
/// Total size of entries/files that have passed through the queue
int64_t totalFileSize_{0};
/// Number of blocks dequeued
int64_t numBlocksDequeued_{0};
/// Whether to follow symlinks or not
bool followSymlinks_{false};
/// shared file data. this are used during transfer to add blocks
/// contribution
std::vector<SourceMetaData *> sharedFileData_;
/// A map from relative file name to previously received chunks
std::unordered_map<std::string, FileChunksInfo> previouslyTransferredChunks_;
const WdtOptions &options_;
/// Stores the time difference between the start and the end of the
/// traversal of directory
double directoryTime_{0};
};
}
}