-
Notifications
You must be signed in to change notification settings - Fork 16
/
FFanalyse.py
208 lines (181 loc) · 7.81 KB
/
FFanalyse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/python
"""
Domain analysis script. Performs a DNS query and examines the response to determine if domain is Fast-Flux.
Also performs a check using multiple methods to determine whether domain name was algorithmically generated (DGA).
Author: Etienne Stalmans
Version: 1.0 (2013)
"""
import sys,string
import getopt
import dns.resolver
import Geolocate
import pygeoip
import argparse
import URLAnalysis
class ffanalyse():
def main(self,domain,verbose):
self.defaults= { 'protocol':'udp', 'port':53, 'opcode':'query',
'qtype':'A', 'rd':1, 'timing':1, 'timeout': 3,
'server_rotate': 0,'server':[] }
self.verbose = verbose
self.domain = domain
self.gl = Geolocate.Geolocate('GeoLiteCity.dat')
self.geoIP = pygeoip.GeoIP('GeoIPASNum.dat')
self.urla = URLAnalysis.urlanalyse()
self.urla.main('output_b.dgt','output_m.dgt')
self.resolve_nameservers()
self.get_dns(domain)
def resolve_nameservers(self):
#check if windows system
if sys.platform in ('win32', 'nt'):
import win32dns
self.defaults['server']=win32dns.RegistryResolve()
else:
self.ParseResolvConf()
#get nameservers for UNIX type systems
def ParseResolvConf(self,resolv_path="/etc/resolv.conf"):
"parses the /etc/resolv.conf file and sets defaults for name servers"
lines=open(resolv_path).readlines()
for line in lines:
line = string.strip(line)
if not line or line[0]==';' or line[0]=='#':
continue
fields=string.split(line)
if len(fields) < 2:
continue
if fields[0]=='domain' and len(fields) > 1:
self.defaults['domain']=fields[1]
if fields[0]=='search':
pass
if fields[0]=='options':
pass
if fields[0]=='sortlist':
pass
if fields[0]=='nameserver':
self.defaults['server'].append(fields[1])
def get_asn(self,ip):
"""
Method to return the country and ASN of an IP address
@param ip to get data for
@return [asn record, country]
"""
asnrec = self.geoIP.org_by_addr(ip)
country = self.gl.getCountry(ip)
if self.verbose:
print asnrec,country
if asnrec == None:
return ['Unknown','Unknown']
else:
return [asnrec.split(' ')[0],country]
def get_dns(self,qname):
"""
Perform the DNS query and send the result to the analyzer.
@param qname the domain to query
"""
rdtype=dns.rdatatype.A
rdclass=dns.rdataclass.IN
request = dns.message.make_query(qname, rdtype, rdclass)
response = dns.query.udp(request,self.defaults['server'][0])
rcode = dns.rcode.from_flags(response.flags,response.ednsflags)
if rcode == 3:
#print '%s NXDomain, domain not found'%qname
return
if rcode == 2:
#print '%s SERVFail, server failed trying to find domain'%qname
return
if rcode != 0:
#print '%s Invalid response from dns server'%qname
return
if self.verbose:
print 'QUERY RESPONSE:\n%s'%str(response)
self.analyse(response)
def analyse(self,response):
"""
Perform analysis on a DNS query response.
Checks for Fast-Flux using modified Holz classifier, Jaroslaw/Patrycja classifier.
Checks for Fast-Flux using Geolocation
Checks for DGA using multiple statistical classifiers
@param response from the DNS query
"""
qname = str(response.question).split()[1] #Query Name
network_ranges = [] #A record responses
nameserver_net_ranges = []
a_count = 0
ns_count = 0
diff_count = 0
ns_diff_count = 0
a_ttl = 86400
ns_ttl = 86400
ar_count = -1
country_count = 0
asn_count = 0
countries = []
nameservers = []
asns = []
answers = []
ttl_score = 0
#check answers
if response.rcode == 'NONE':
print "We have a NONE record..."
return False
if len(response.authority)>0:
rdata = response.authority[0]
ans = str(rdata).split('\n')
for a in ans:
nameservers.append(a.split()[4])
if len(response.answer)==0:
print 'Empty Response section'
return
for a in response.answer[0]: #parse each line in the answer
if '.' in str(a): #weak check that it is an IP address returned
answers.append(str(a))
a_ttl = response.answer[0].ttl #get the TTL
a_count = len(answers) #the number of IP addresses returned
if a_count > 0:
for ip in answers:
ip = str(ip)
st = ip[:ip.rfind('.')]
asnd = self.get_asn(ip)
if asnd:
asn,country = asnd
if country not in countries:
countries.append(country)
if asn not in asns:
asns.append(asn)
if st not in network_ranges:
network_ranges.append(st)
diff_count = len(network_ranges) #number of IP ranges we have
country_count= len(countries) #number of countries that host A records
asn_count = len(asns) #number of netblocks
if a_ttl <= 300:
ttl_score = 1
print "Qname{0:20}|TTL{0:5}|A Records{0:2}|Ranges{0:2}|ASNs{0:2}|Countries{0:2}|Nameservers{0:2}|".format('')
print "{:25}|{:8}|{:11}|{:8}|{:6}|{:11}|{:13}|".format(qname,a_ttl,a_count,diff_count,asn_count,country_count,ns_count)
#calculate score according to Thorsten
#===================================================
t_score = (1.32*a_count+18.54*asn_count+0*ns_count+ttl_score*5)-50
#===================================================
#calculate Jaroslaw/Patrycja score
#===================================================
jp_score = a_count+ns_count+diff_count*1.5+asn_count*1.5+ttl_score+country_count*2
#===================================================
print "\n---- Fast-Flux Scores ----"
print "Modified Thorsten/Holz: Score (%i) Classified (%s)"%(t_score,"\033[91mFast-Flux\033[0m" if t_score>0 else "\033[92mClean\033[0m")
print "Modified Jaroslaw/Patrycja: Score (%i) Classified (%s)"%(jp_score,"\033[91mFast-Flux\033[0m" if jp_score>=18 else "\033[92mClean\033[0m")
print "Rule Based: %s"%("\033[91mFast-Flux\033[0m" if diff_count!=0 and a_count>=2 or ns_count>1 and ((diff_count>=1 and asn_count>1)or ttl_score == 1) else "\033[92mClean\033[0m")
print "\n---- Geolocation ----"
self.gl.calcValues(answers) #do geolocation check
print "\n---- URL Analysis ----"
self.urla.checkDomain(qname) #do check for DGA
def setOpts(argv):
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--domain',dest='domain',action='store',required=True,
help="A Domain to analyse")
parser.add_argument('-v', dest='verbose', action='store_true',default=False,
help="Verbose output")
arg = parser.parse_args()
return (arg.__dict__['domain'],arg.__dict__['verbose'])
if __name__ == "__main__":
opts = setOpts(sys.argv[1:])
ff = ffanalyse()
ff.main(opts[0],opts[1])