You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client-1/webinfo/wafw00f/main.py

505 lines
21 KiB

2 months ago
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Copyright (C) 2020, WAFW00F Developers.
See the LICENSE file for copying permission.
'''
# For keeping python2 support for now
from __future__ import print_function
import csv
import io
import json
import logging
import os
import random
import re
import sys
from collections import defaultdict
from optparse import OptionParser
from client.webinfo.wafw00f.lib.asciiarts import *
from client.webinfo.wafw00f import __version__, __license__
from client.webinfo.wafw00f.manager import load_plugins
from client.webinfo.wafw00f.wafprio import wafdetectionsprio
from client.webinfo.wafw00f.lib.evillib import urlParser, waftoolsengine, def_headers
class WAFW00F(waftoolsengine):
xsstring = '<script>alert("XSS");</script>'
sqlistring = "UNION SELECT ALL FROM information_schema AND ' or SLEEP(5) or '"
lfistring = '../../../../etc/passwd'
rcestring = '/bin/cat /etc/passwd; ping 127.0.0.1; curl google.com'
xxestring = '<!ENTITY xxe SYSTEM "file:///etc/shadow">]><pwn>&hack;</pwn>'
def __init__(self, target='www.example.com', debuglevel=0, path='/',
followredirect=True, extraheaders={}, proxies=None):
self.log = logging.getLogger('wafw00f')
self.attackres = None
waftoolsengine.__init__(self, target, debuglevel, path, proxies, followredirect, extraheaders)
self.knowledge = dict(generic=dict(found=False, reason=''), wafname=list())
def normalRequest(self):
return self.Request()
def customRequest(self, headers=None):
return self.Request(headers=headers)
def nonExistent(self):
return self.Request(path=self.path + str(random.randrange(100, 999)) + '.html')
def xssAttack(self):
return self.Request(path=self.path, params= {'s': self.xsstring})
def xxeAttack(self):
return self.Request(path=self.path, params= {'s': self.xxestring})
def lfiAttack(self):
return self.Request(path=self.path + self.lfistring)
def centralAttack(self):
return self.Request(path=self.path, params={'a': self.xsstring, 'b': self.sqlistring, 'c': self.lfistring})
def sqliAttack(self):
return self.Request(path=self.path, params= {'s': self.sqlistring})
def oscAttack(self):
return self.Request(path=self.path, params= {'s': self.rcestring})
def performCheck(self, request_method):
r = request_method()
if r is None:
raise RequestBlocked()
return r
# Most common attacks used to detect WAFs
attcom = [xssAttack, sqliAttack, lfiAttack]
attacks = [xssAttack, xxeAttack, lfiAttack, sqliAttack, oscAttack]
def genericdetect(self):
reason = ''
reasons = ['Blocking is being done at connection/packet level.',
'The server header is different when an attack is detected.',
'The server returns a different response code when an attack string is used.',
'It closed the connection for a normal request.',
'The response was different when the request wasn\'t made from a browser.'
]
try:
# Testing for no user-agent response. Detects almost all WAFs out there.
resp1 = self.performCheck(self.normalRequest)
if 'User-Agent' in self.headers:
del self.headers['User-Agent'] # Deleting the user-agent key from object not dict.
resp3 = self.customRequest(headers=def_headers)
if resp1.status_code != resp3.status_code:
self.log.info('Server returned a different response when request didn\'t contain the User-Agent header.')
reason = reasons[4]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a modified request is "%s"' % resp3.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a xss attack
resp2 = self.performCheck(self.xssAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a XSS attack vector was tried.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to cross-site scripting attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a lfi attack
resp2 = self.performCheck(self.lfiAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a directory traversal was attempted.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a file inclusion attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a sqli attack
resp2 = self.performCheck(self.sqliAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a SQLi was attempted.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a SQL injection attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Checking for the Server header after sending malicious requests
response = self.attackres
normalserver = resp1.headers.get('Server')
attackresponse_server = response.headers.get('Server')
if attackresponse_server:
if attackresponse_server != normalserver:
self.log.info('Server header changed, WAF possibly detected')
self.log.debug('Attack response: %s' % attackresponse_server)
self.log.debug('Normal response: %s' % normalserver)
reason = reasons[1]
reason += '\r\nThe server header for a normal response is "%s",' % normalserver
reason += ' while the server header a response to an attack is "%s",' % attackresponse_server
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# If at all request doesn't go, press F
except RequestBlocked:
self.knowledge['generic']['reason'] = reasons[0]
self.knowledge['generic']['found'] = True
return True
return False
def matchHeader(self, headermatch, attack=False):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
header, match = headermatch
headerval = r.headers.get(header)
if headerval:
# set-cookie can have multiple headers, python gives it to us
# concatinated with a comma
if header == 'Set-Cookie':
headervals = headerval.split(', ')
else:
headervals = [headerval]
for headerval in headervals:
if re.search(match, headerval, re.I):
return True
return False
def matchStatus(self, statuscode, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
if r.status_code == statuscode:
return True
return False
def matchCookie(self, match, attack=False):
return self.matchHeader(('Set-Cookie', match), attack=attack)
def matchReason(self, reasoncode, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
# We may need to match multiline context in response body
if str(r.reason) == reasoncode:
return True
return False
def matchContent(self, regex, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
# We may need to match multiline context in response body
if re.search(regex, r.text, re.I):
return True
return False
wafdetections = dict()
plugin_dict = load_plugins()
result_dict = {}
for plugin_module in plugin_dict.values():
wafdetections[plugin_module.NAME] = plugin_module.is_waf
# Check for prioritized ones first, then check those added externally
checklist = wafdetectionsprio
checklist += list(set(wafdetections.keys()) - set(checklist))
def identwaf(self, findall=False):
detected = list()
try:
self.attackres = self.performCheck(self.centralAttack)
except RequestBlocked:
return detected
for wafvendor in self.checklist:
self.log.info('Checking for %s' % wafvendor)
if self.wafdetections[wafvendor](self):
detected.append(wafvendor)
if not findall:
break
self.knowledge['wafname'] = detected
return detected
def calclogginglevel(verbosity):
default = 40 # errors are printed out
level = default - (verbosity * 10)
if level < 0:
level = 0
return level
def buildResultRecord(url, waf):
result = {}
result['url'] = url
if waf:
result['detected'] = True
if waf == 'generic':
result['firewall'] = 'Generic'
result['manufacturer'] = 'Unknown'
else:
result['firewall'] = waf.split('(')[0].strip()
result['manufacturer'] = waf.split('(')[1].replace(')', '').strip()
else:
result['detected'] = False
result['firewall'] = 'None'
result['manufacturer'] = 'None'
return result
def getTextResults(res=None):
# leaving out some space for future possibilities of newer columns
# newer columns can be added to this tuple below
keys = ('detected')
res = [({key: ba[key] for key in ba if key not in keys}) for ba in res]
rows = []
for dk in res:
p = [str(x) for _, x in dk.items()]
rows.append(p)
for m in rows:
m[1] = '%s (%s)' % (m[1], m[2])
m.pop()
defgen = [
(max([len(str(row[i])) for row in rows]) + 3)
for i in range(len(rows[0]))
]
rwfmt = "".join(["{:>"+str(dank)+"}" for dank in defgen])
textresults = []
for row in rows:
textresults.append(rwfmt.format(*row))
return textresults
def disableStdOut():
sys.stdout = None
def enableStdOut():
sys.stdout = sys.__stdout__
def getheaders(fn):
headers = {}
if not os.path.exists(fn):
logging.getLogger('wafw00f').critical('Headers file "%s" does not exist!' % fn)
return
with io.open(fn, 'r', encoding='utf-8') as f:
for line in f.readlines():
_t = line.split(':', 2)
if len(_t) == 2:
h, v = map(lambda x: x.strip(), _t)
headers[h] = v
return headers
class RequestBlocked(Exception):
pass
def main():
parser = OptionParser(usage='%prog url1 [url2 [url3 ... ]]\r\nexample: %prog http://www.victim.org/')
parser.add_option('-v', '--verbose', action='count', dest='verbose', default=0,
help='Enable verbosity, multiple -v options increase verbosity')
parser.add_option('-a', '--findall', action='store_true', dest='findall', default=False,
help='Find all WAFs which match the signatures, do not stop testing on the first one')
parser.add_option('-r', '--noredirect', action='store_false', dest='followredirect',
default=True, help='Do not follow redirections given by 3xx responses')
parser.add_option('-t', '--test', dest='test', help='Test for one specific WAF')
parser.add_option('-o', '--output', dest='output', help='Write output to csv, json or text file depending on file extension. For stdout, specify - as filename.',
default=None)
parser.add_option('-i', '--input-file', dest='input', help='Read targets from a file. Input format can be csv, json or text. For csv and json, a `url` column name or element is required.',
default=None)
parser.add_option('-l', '--list', dest='list', action='store_true',
default=False, help='List all WAFs that WAFW00F is able to detect')
parser.add_option('-p', '--proxy', dest='proxy', default=None,
help='Use an HTTP proxy to perform requests, examples: http://hostname:8080, socks5://hostname:1080, http://user:pass@hostname:8080')
parser.add_option('--version', '-V', dest='version', action='store_true',
default=False, help='Print out the current version of WafW00f and exit.')
parser.add_option('--headers', '-H', dest='headers', action='store', default=None,
help='Pass custom headers via a text file to overwrite the default header set.')
options, args = parser.parse_args()
logging.basicConfig(level=calclogginglevel(options.verbose))
log = logging.getLogger('wafw00f')
if options.output == '-':
disableStdOut()
print(randomArt())
if options.list:
print('[+] Can test for these WAFs:\r\n')
attacker = WAFW00F(None)
try:
m = [i.replace(')', '').split(' (') for i in wafdetectionsprio]
print(R+' WAF Name'+' '*24+'Manufacturer\n '+'-'*8+' '*24+'-'*12+'\n')
max_len = max(len(str(x)) for k in m for x in k)
for inner in m:
first = True
for elem in inner:
if first:
text = Y+" {:<{}} ".format(elem, max_len+2)
first = False
else:
text = W+"{:<{}} ".format(elem, max_len+2)
print(text, E, end="")
print()
sys.exit(0)
except Exception:
return
if options.version:
print('[+] The version of WAFW00F you have is %sv%s%s' % (B, __version__, E))
print('[+] WAFW00F is provided under the %s%s%s license.' % (C, __license__, E))
return
extraheaders = {}
if options.headers:
log.info('Getting extra headers from %s' % options.headers)
extraheaders = getheaders(options.headers)
if extraheaders is None:
parser.error('Please provide a headers file with colon delimited header names and values')
if len(args) == 0 and not options.input:
parser.error('No test target specified.')
#check if input file is present
if options.input:
log.debug("Loading file '%s'" % options.input)
try:
if options.input.endswith('.json'):
with open(options.input) as f:
try:
urls = json.loads(f.read())
except json.decoder.JSONDecodeError:
log.critical("JSON file %s did not contain well-formed JSON", options.input)
sys.exit(1)
log.info("Found: %s urls to check." %(len(urls)))
targets = [ item['url'] for item in urls ]
elif options.input.endswith('.csv'):
columns = defaultdict(list)
with open(options.input) as f:
reader = csv.DictReader(f)
for row in reader:
for (k,v) in row.items():
columns[k].append(v)
targets = columns['url']
else:
with open(options.input) as f:
targets = [x for x in f.read().splitlines()]
except FileNotFoundError:
log.error('File %s could not be read. No targets loaded.', options.input)
sys.exit(1)
else:
targets = args
results = []
for target in targets:
if not target.startswith('http'):
log.info('The url %s should start with http:// or https:// .. fixing (might make this unusable)' % target)
target = 'https://' + target
print('[*] Checking %s' % target)
pret = urlParser(target)
if pret is None:
log.critical('The url %s is not well formed' % target)
sys.exit(1)
(hostname, _, path, _, _) = pret
log.info('starting wafw00f on %s' % target)
proxies = dict()
if options.proxy:
proxies = {
"http": options.proxy,
"https": options.proxy,
}
attacker = WAFW00F(target, debuglevel=options.verbose, path=path,
followredirect=options.followredirect, extraheaders=extraheaders,
proxies=proxies)
global rq
rq = attacker.normalRequest()
if rq is None:
log.error('Site %s appears to be down' % hostname)
continue
if options.test:
if options.test in attacker.wafdetections:
waf = attacker.wafdetections[options.test](attacker)
if waf:
print('[+] The site %s%s%s is behind %s%s%s WAF.' % (B, target, E, C, options.test, E))
else:
print('[-] WAF %s was not detected on %s' % (options.test, target))
else:
print('[-] WAF %s was not found in our list\r\nUse the --list option to see what is available' % options.test)
return
waf = attacker.identwaf(options.findall)
log.info('Identified WAF: %s' % waf)
if len(waf) > 0:
for i in waf:
results.append(buildResultRecord(target, i))
print('[+] The site %s%s%s is behind %s%s%s WAF.' % (B, target, E, C, (E+' and/or '+C).join(waf), E))
if (options.findall) or len(waf) == 0:
print('[+] Generic Detection results:')
if attacker.genericdetect():
log.info('Generic Detection: %s' % attacker.knowledge['generic']['reason'])
print('[*] The site %s seems to be behind a WAF or some sort of security solution' % target)
print('[~] Reason: %s' % attacker.knowledge['generic']['reason'])
results.append(buildResultRecord(target, 'generic'))
else:
print('[-] No WAF detected by the generic detection')
results.append(buildResultRecord(target, None))
print('[~] Number of requests: %s' % attacker.requestnumber)
#print table of results
if len(results) > 0:
log.info("Found: %s matches." % (len(results)))
if options.output:
if options.output == '-':
enableStdOut()
print(os.linesep.join(getTextResults(results)))
elif options.output.endswith('.json'):
log.debug("Exporting data in json format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
json.dump(results, outfile, indent=2)
elif options.output.endswith('.csv'):
log.debug("Exporting data in csv format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
csvwriter = csv.writer(outfile, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
count = 0
for result in results:
if count == 0:
header = result.keys()
csvwriter.writerow(header)
count += 1
csvwriter.writerow(result.values())
else:
log.debug("Exporting data in text format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
outfile.write(os.linesep.join(getTextResults(results)))
def main1(target):
attacker = WAFW00F(target)
global rq
try:
rq = attacker.normalRequest()
except Exception as e:
print(f'ALERT', f'waf检测[{target}]访问出错{e}')
return False, None
if rq is None:
print(f'ALERT', f'waf检测[{target}]无法访问')
return False, None
try:
waf = attacker.identwaf(True)
except Exception as e:
print(f'ALERT', f'waf检测[{target}]检测出错{e}')
return False, None
if len(waf) > 0:
print(f'INFOR', f'waf检测[{target}]存在waf [{waf[0]}]')
return True, waf[0]
else:
return False, None
if __name__ == '__main__':
if sys.hexversion < 0x2060000:
sys.stderr.write('Your version of python is way too old... please update to 2.6 or later\r\n')
main()