-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.py
executable file
·91 lines (69 loc) · 2.34 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3
"""Nitro.
Usage:
nitro [options] <vm_name>
Options:
-h --help Show this screen
--nobackend Don't analyze events
-o FILE --out=FILE Output file (stdout if not specified)
"""
import logging
import signal
import json
from pprint import pprint
import libvirt
from libvmi import LibvmiError
from docopt import docopt
from nitro.nitro import Nitro
def init_logger():
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
class NitroRunner:
def __init__(self, vm_name, analyze_enabled, output=None):
self.vm_name = vm_name
self.analyze_enabled = analyze_enabled
self.output = output
# get domain from libvirt
con = libvirt.open('qemu:///system')
self.domain = con.lookupByName(vm_name)
self.events = []
self.nitro = None
# define new SIGINT handler, to stop nitro
signal.signal(signal.SIGINT, self.sigint_handler)
def run(self):
self.nitro = Nitro(self.domain, self.analyze_enabled)
self.nitro.listener.set_traps(True)
for event in self.nitro.listen():
event_info = event.as_dict()
if self.analyze_enabled:
try:
syscall = self.nitro.backend.process_event(event)
except LibvmiError:
logging.error("Backend event processing failure")
else:
event_info = syscall.as_dict()
if self.output is None:
pprint(event_info, width=1)
else:
self.events.append(event_info)
if self.analyze_enabled:
# we can safely stop the backend
self.nitro.backend.stop()
if self.output is not None:
logging.info('Writing events')
with open(self.output, 'w') as f:
json.dump(self.events, f, indent=4)
def sigint_handler(self, *args, **kwargs):
logging.info('CTRL+C received, stopping Nitro')
self.nitro.stop()
def main():
init_logger()
args = docopt(__doc__)
vm_name = args['<vm_name>']
analyze_enabled = False if args['--nobackend'] else True
output = args['--out']
runner = NitroRunner(vm_name, analyze_enabled, output)
runner.run()
if __name__ == '__main__':
main()