forked from ikreymer/webarchive-indexing
-
Notifications
You must be signed in to change notification settings - Fork 3
/
samplecdxjob.py
104 lines (78 loc) · 3.44 KB
/
samplecdxjob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
import random
from heapq import heappush, heapreplace
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol
from mrjob.util import log_to_stream
LOG = logging.getLogger('SampleCDXJob')
log_to_stream(format="%(asctime)s %(levelname)s %(name)s: %(message)s",
name='SampleCDXJob')
#=============================================================================
class SampleCDXJob(MRJob):
""" Sample CDX key space using reservoir sampling
MR algorithm adapted:
https://had00b.blogspot.com/2013/07/random-subset-in-mapreduce.html
"""
HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.CombineTextInputFormat'
INPUT_PROTOCOL = RawValueProtocol
OUTPUT_PROTOCOL = RawValueProtocol
JOBCONF = {'mapreduce.task.timeout': '9600000',
'mapreduce.input.fileinputformat.split.maxsize': '50000000',
'mapreduce.map.speculative': 'false',
'mapreduce.reduce.speculative': 'false',
'mapreduce.job.jvm.numtasks': '-1',
# the output should not be compressed even if the default is to compress output,
# otherwise reading from MRJobRunner.cat_output() needs decompression on the fly
'mapreduce.output.fileoutputformat.compress': 'false',
'mapreduce.job.reduces': '1'
}
def configure_args(self):
"""Custom command line options for indexing"""
super(SampleCDXJob, self).configure_args()
self.add_passthru_arg('--shards', dest='shards',
type=int,
default=300,
help='Number of shards in output ' +
'(create shards-1 splits')
self.add_passthru_arg('--scaler', dest='scaler',
type=int,
default=100,
help='Scaler for sample size: ' +
'Sample size = shards * scaler')
self.add_passthru_arg('--splitfile', dest='splitfile',
help='Split file output dest, ' +
'will contain shards-1 splits')
def mapper_init(self):
self.N = self.options.shards * self.options.scaler
self.H = []
def mapper(self, _, line):
line = line.split('\t')[-1]
if line.startswith(' CDX'):
return
r = random.random()
if len(self.H) < self.N:
heappush(self.H, (r, line))
elif r > self.H[0][0]:
heapreplace(self.H, (r, line))
def mapper_final(self):
for (r, x) in self.H:
# by negating the id, the reducer receives
# the elements from highest to lowest
yield -r, x
def reducer_init(self):
self.N = self.options.shards * self.options.scaler
self.output_list = []
def reducer(self, key, values):
for x in values:
if len(self.output_list) >= self.N:
return
self.output_list.append(x)
def reducer_final(self):
# sample sorted list by scaler, skip first element
# to get a N-1 even samples from N*SCALER set
self.output_list = sorted(self.output_list)
self.output_list = self.output_list[0::self.options.scaler][1:]
for x in self.output_list:
yield '', x
if __name__ == "__main__":
SampleCDXJob().run()