forked from nerdalert/nflow-generator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nflow-generator.go
337 lines (266 loc) · 7.89 KB
/
nflow-generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Run using:
//
// $ ./a.out
//
// go run nflow-generator.go nflow_logging.go nflow_payload.go -t 172.16.86.138 -p 9995
// Or:
// go build
// ./nflow-generator -t <ip> -p <port>
package main
import (
"fmt"
"net"
"os"
"strconv"
"time"
)
type Proto int
const (
FTP Proto = iota + 1
SSH
DNS
HTTP
HTTPS
NTP
SNMP
IMAPS
MYSQL
HTTPS_ALT
P2P
BITTORRENT
)
const MAX_FLOWS_PER_RECORD = 30
const TICK_INTERVAL_MS = 1000
func main() {
// Parse arguments from command line and environment variables
configArgs, err := ParseConfigArgs()
if err != nil {
panic(err)
}
// Read flow configuration file
var config ConfigFile
err = ReadFlowConfigFile(&config, configArgs.ConfigFile)
if err != nil {
panic(err)
}
// For generating docker compose configuration file
if opts.GenComposeFile != "" {
err := GenComposeFile(opts.GenComposeFile, config)
if err != nil {
panic(err)
}
return
}
// For generating prometheus service discovery file
if opts.GenTargetsFile != "" {
err := GenTargetsFile(opts.GenTargetsFile, config)
if err != nil {
panic(err)
}
return
}
if configArgs.HostName == "" {
panic(fmt.Errorf("host name not provided"))
}
if (config.CollectorIp == "" || config.CollectorPort == 0) && !opts.Simulate {
panic(fmt.Errorf("collector ip/port not provided"))
}
hostName := configArgs.HostName
fmt.Println("Host: " + hostName)
// Initialize random number generator using seed value
randGen := InitRandGen(config)
// Parse the field values for each flow
multiFlowConfigs := ParseUserFlows(&config)
// Expand flows with multiple values into multiple flows
expandedFlowConfigs := ExpandMultiFlows(multiFlowConfigs)
flowConfigs := expandedFlowConfigs
// Populate all missing fields using seeded randgen before sending
// so multiple generators will have the same values
SeedFlows(flowConfigs, randGen, configArgs, config)
// Filter flows for this host
enabledFlows := FilterEnabledFlows(flowConfigs)
// Generate graph file for topology visualization (WIP)
if opts.GenGraphFile != "" {
err := GenGraphFile(opts.GenGraphFile, flowConfigs)
if err != nil {
panic(err)
}
os.Exit(0)
}
// Initialize flow state for each flow
configFlowStates := InitFlowState(enabledFlows)
// Print all configured flows for this host
if !opts.DisableLogging {
for i := 0; i < len(enabledFlows); i++ {
flowConfig := flowConfigs[enabledFlows[i].ConfigIndex]
fmt.Printf(
"%15s = %15s %5d -> %15s %5d [%3d] = %d (tick %d) (count %d)\n",
hostName,
flowConfig.SrcAddr,
flowConfig.SrcPort,
flowConfig.DstAddr,
flowConfig.DstPort,
flowConfig.Proto,
flowConfig.Bytes,
flowConfig.Tick,
flowConfig.Count,
)
}
}
// Print flow configuration information
numEnabledFlows := len(enabledFlows)
fmt.Println("Number of user flow configs: " + strconv.Itoa(len(config.Flows)))
fmt.Println("Number of flows configured: " + strconv.Itoa(len(flowConfigs)))
fmt.Println("Number of active flows: " + strconv.Itoa(numEnabledFlows))
fmt.Println("Expected average flow rate: " + strconv.Itoa(numEnabledFlows/config.FlowTimeout))
if numEnabledFlows == 0 {
fmt.Println("No flows configured for this host")
return
}
// Initialize UDP connection to netflow collector
var conn *net.UDPConn
if !opts.Simulate {
conn, err = InitUdpConn(config)
if err != nil {
panic(err)
}
}
// Initialize prometheus metrics server
go HandleMetricsServer()
// Sleep until the start of the next 10 second interval
// so that multiple generators start at roughly the same time
now := time.Now()
diff := time.Duration(10-now.Second()%10) * time.Second
fmt.Printf("Sleeping for %v\n", diff)
time.Sleep(diff)
// Flows are sent every TICK_INTERVAL_MS
tick := 0
skipped := true
maxTick := config.FlowTimeout * 1000 / TICK_INTERVAL_MS
fmt.Println("Sending flows...")
for {
// Initialize bytes value for this tick
// Note we initialize bytes for all flows, even if they are not enabled
// so that the same values are used for all generators
for i := 0; i < len(flowConfigs); i++ {
flowConfigs[i].Bytes = GenBytesValue(randGen)
}
// Send flows for this tick
for i := 0; i < len(enabledFlows); {
records := []NetflowPayload{}
// Calculate sytem uptime for this tick
// This value is used in the netflow packet header
uptime := CreateCalcUptime()
// We send MAX_FLOWS_PER_RECORD netflow records per netflow packet
// so we use a nested loop to send all flows for this tick
j := 0
for ; j < MAX_FLOWS_PER_RECORD && i < len(enabledFlows); i, j = i+1, j+1 {
flowConfig := flowConfigs[enabledFlows[i].ConfigIndex]
// Check if the flow count has been reached
if flowConfig.Count != 0 && configFlowStates[i].Count+1 > flowConfig.Count {
continue
}
skipped = false
if flowConfig.Tick != tick {
continue
}
// If the flow has multiple hops, check if we should provide a value
// for the next hop field
numHops := len(flowConfig.Hops)
nextHopHostName := ""
if flowConfig.HostIndex < numHops-1 {
nextHopHostName = flowConfig.Hops[flowConfig.HostIndex+1]
}
// Create the netflow record
payload := CreateCustomFlow(
flowConfig.SrcAddr,
flowConfig.SrcPort,
flowConfig.DstAddr,
flowConfig.DstPort,
flowConfig.Proto,
FindHostIp(config.Hosts, nextHopHostName),
flowConfig.Bytes,
// TODO improve the logic for first_switched and last_switched
int(TICK_INTERVAL_MS/numHops)*(numHops-flowConfig.HostIndex),
int(TICK_INTERVAL_MS/numHops)*(numHops-flowConfig.HostIndex-1),
)
// Update the flow state
configFlowStates[i].Count++
configFlowStates[i].Bytes += flowConfig.Bytes
// Print the flow record
if !opts.DisableLogging {
fmt.Printf(
"%15s = %15s %5d -> %15s %5d [%3d] = %s -> %s = %d\n",
hostName,
ConvertIntToIp(payload.SrcIP).String(),
payload.SrcPort,
ConvertIntToIp(payload.DstIP).String(),
payload.DstPort,
payload.IpProtocol,
time.Unix(int64(uptime.UnixSec+payload.SysUptimeStart/1000), int64(uptime.UnixMsec)).Format("2006-01-02T15:04:05.000Z"),
time.Unix(int64(uptime.UnixSec+payload.SysUptimeEnd/1000), int64(uptime.UnixMsec)).Format("2006-01-02T15:04:05.000Z"),
payload.NumOctets,
)
}
records = append(records, payload)
}
if len(records) == 0 {
continue
}
// Create the netflow packet
data := new(Netflow)
data.Header = CreateNFlowHeader(len(records))
data.Records = records
buffer := BuildNFlowPayload(*data)
// Write the netflow packet to the UDP connection
if !opts.Simulate {
bytesWritten, err := conn.Write(buffer.Bytes())
if err != nil {
log.Fatal("Failed to write: ", err)
}
sentRecordsTotalBytesCounter.Add(float64(bytesWritten))
}
// Update prometheus metrics
sentNetflowTotalCounter.Inc()
sentRecordsTotalCounter.Add(float64(len(records)))
}
tick++
if tick == maxTick {
// If we went through a whole tick cycle without sending any flows
// then we are done sending flows
if skipped {
fmt.Println("No more flows to send")
break
}
tick = 0
skipped = true
}
// Sleep until the next tick
sleepInt := time.Duration(TICK_INTERVAL_MS)
time.Sleep(sleepInt * time.Millisecond)
}
if !opts.DisableLogging {
fmt.Println("Done sending flows, here are the stats:")
for i := 0; i < len(configFlowStates); i++ {
flowConfig := flowConfigs[enabledFlows[i].ConfigIndex]
flowConfigState := configFlowStates[i]
fmt.Printf(
"%15s = %15s %5d -> %15s %5d [%3d] = %d total = %d bytes\n",
hostName,
flowConfig.SrcAddr,
flowConfig.SrcPort,
flowConfig.DstAddr,
flowConfig.DstPort,
flowConfig.Proto,
flowConfigState.Count,
flowConfigState.Bytes,
)
}
}
if opts.StatsOutFile != "" {
err := GenStatsFile(opts.StatsOutFile, configFlowStates, enabledFlows, flowConfigs)
if err != nil {
panic(err)
}
}
}