This repository has been archived by the owner on Jun 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
/
clamav.py
113 lines (93 loc) · 3.62 KB
/
clamav.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
# This file is part of Viper - https://github.com/viper-framework/viper
# See the file 'LICENSE' for copying permission.
try:
import pyclamd
HAVE_CLAMD = True
except ImportError:
HAVE_CLAMD = False
import os
from viper.common.abstracts import Module
from viper.core.database import Database, Malware
from viper.core.session import __sessions__
from viper.core.storage import get_sample_path
class ClamAV(Module):
cmd = 'clamav'
description = 'Scan file from local ClamAV daemon'
authors = ['neriberto']
def __init__(self):
super(ClamAV, self).__init__()
self.parser.add_argument('-s', '--socket', help='Specify an unix socket (default: Clamd Unix Socket)')
self.parser.add_argument('-a', '--all', action='store_true', help='Scan all files')
self.parser.add_argument('-t', '--tag', action='store_true', help='Tag file(s) with the signature name when detect as malware')
self.db = Database()
def run(self):
super(ClamAV, self).run()
if self.args is None:
return
if not HAVE_CLAMD:
self.log('error', "Missing dependency, install pyclamd (`pip install pyclamd`)")
return
if not self.Connect():
self.log('error', 'Daemon is not responding!')
return
if self.args.all:
self.ScanAll()
elif __sessions__.is_set():
self.ScanFile(__sessions__.current.file)
else:
self.log('error', 'No open session')
def ScanAll(self):
samples = self.db.find(key='all')
for sample in samples:
if sample.size == 0:
continue
self.ScanFile(sample)
def Connect(self):
self.daemon = None
self.socket = self.args.socket
try:
if self.socket is not None:
self.daemon = pyclamd.ClamdUnixSocket(self.socket)
self.log('info', 'Using socket {0} to scan'.format(self.socket))
else:
self.daemon = pyclamd.ClamdUnixSocket()
self.socket = 'Clamav'
return self.daemon.ping()
except Exception as ex:
msg = 'Daemon connection failure, {0}'.format(ex)
self.log('error,', msg)
return False
def ScanFile(self, sample):
if isinstance(sample, Malware):
sample_path = get_sample_path(sample.sha256)
else:
sample_path = sample.path
if not os.path.exists(sample_path):
self.log('error', 'The file does not exists at path {0}'.format(sample_path))
return
try:
if self.daemon.ping():
with open(sample_path, 'rb') as fd:
results = self.daemon.scan_stream(fd.read())
else:
self.log('error', "Unable to connect to the daemon")
except Exception as ex:
msg = 'Unable to scan file {0} with antivirus daemon, {1}'.format(sample.sha256, ex)
self.log('error', msg)
return
found = None
name = None
if results:
for item in results:
found = results[item][0]
name = results[item][1]
if found == 'ERROR':
self.log('error', "Check permissions of the binary folder, {0}".format(name))
else:
if name is not None:
if self.args.tag:
self.db.add_tags(sample.sha256, name)
else:
name = 'Threat not found!'
self.log('info', "{0} identify {1} as : {2}".format(self.socket, sample.sha256, name))