-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
78 lines (68 loc) · 2.28 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
'use strict';
const { getRecordsFromEvent } = require('./lib/get-records');
const logger = require('./lib/logger');
const loadenv = require('./lib/loadenv');
const timestamp = require('./lib/timestamp');
const { BigqueryInputs, DynamoInputs, PingbackInputs, FrequencyInputs } = require('./lib/inputs');
exports.handler = async event => {
let records = [];
// debug timeouts
let timer = null;
if (process.env.DEBUG) {
timer = setTimeout(() => logger.error('TIMEOUT', { event }), 29000);
}
if (!event || !event.Records) {
logger.error(`Invalid event input: ${JSON.stringify(event)}`);
} else {
records = (await getRecordsFromEvent(event)).filter(r => {
if (process.env.PROCESS_AFTER && r && r.timestamp) {
const after = timestamp.toEpochSeconds(parseInt(process.env.PROCESS_AFTER));
const time = timestamp.toEpochSeconds(r.timestamp);
return time > after;
} else if (process.env.PROCESS_UNTIL && r && r.timestamp) {
const until = timestamp.toEpochSeconds(parseInt(process.env.PROCESS_UNTIL));
const time = timestamp.toEpochSeconds(r.timestamp);
return time <= until;
} else {
return !!r;
}
});
}
// nothing to do
if (records.length === 0) {
clearTimeout(timer);
return;
}
// log the raw/decoded input counts
logger.info('Event records', { raw: event.Records.length, decoded: records.length });
await new Promise((resolve, reject) => {
loadenv.load(() => {
resolve();
});
});
// figure out what type of records we process
let inputs;
if (process.env.PINGBACKS) {
inputs = new PingbackInputs(records);
} else if (process.env.DYNAMODB) {
inputs = new DynamoInputs(records);
} else if (process.env.FREQUENCY) {
inputs = new FrequencyInputs(records);
} else {
inputs = new BigqueryInputs(records);
}
// complain very loudly about unrecognized input records
inputs.unrecognized.forEach(r => {
logger.error(`Unrecognized input record: ${JSON.stringify(r)}`);
});
// run inserts in parallel
try {
const results = await inputs.insertAll();
clearTimeout(timer);
let total = results.reduce((acc, r) => acc + r.count, 0);
return `Inserted ${total} rows`;
} catch (err) {
clearTimeout(timer);
throw logger.errors(err);
}
};