You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MiaCTFer/client-1/webinfo/wafw00f/main.py

505 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Copyright (C) 2020, WAFW00F Developers.
See the LICENSE file for copying permission.
'''
# For keeping python2 support for now
from __future__ import print_function
import csv
import io
import json
import logging
import os
import random
import re
import sys
from collections import defaultdict
from optparse import OptionParser
from client.webinfo.wafw00f.lib.asciiarts import *
from client.webinfo.wafw00f import __version__, __license__
from client.webinfo.wafw00f.manager import load_plugins
from client.webinfo.wafw00f.wafprio import wafdetectionsprio
from client.webinfo.wafw00f.lib.evillib import urlParser, waftoolsengine, def_headers
class WAFW00F(waftoolsengine):
xsstring = '<script>alert("XSS");</script>'
sqlistring = "UNION SELECT ALL FROM information_schema AND ' or SLEEP(5) or '"
lfistring = '../../../../etc/passwd'
rcestring = '/bin/cat /etc/passwd; ping 127.0.0.1; curl google.com'
xxestring = '<!ENTITY xxe SYSTEM "file:///etc/shadow">]><pwn>&hack;</pwn>'
def __init__(self, target='www.example.com', debuglevel=0, path='/',
followredirect=True, extraheaders={}, proxies=None):
self.log = logging.getLogger('wafw00f')
self.attackres = None
waftoolsengine.__init__(self, target, debuglevel, path, proxies, followredirect, extraheaders)
self.knowledge = dict(generic=dict(found=False, reason=''), wafname=list())
def normalRequest(self):
return self.Request()
def customRequest(self, headers=None):
return self.Request(headers=headers)
def nonExistent(self):
return self.Request(path=self.path + str(random.randrange(100, 999)) + '.html')
def xssAttack(self):
return self.Request(path=self.path, params= {'s': self.xsstring})
def xxeAttack(self):
return self.Request(path=self.path, params= {'s': self.xxestring})
def lfiAttack(self):
return self.Request(path=self.path + self.lfistring)
def centralAttack(self):
return self.Request(path=self.path, params={'a': self.xsstring, 'b': self.sqlistring, 'c': self.lfistring})
def sqliAttack(self):
return self.Request(path=self.path, params= {'s': self.sqlistring})
def oscAttack(self):
return self.Request(path=self.path, params= {'s': self.rcestring})
def performCheck(self, request_method):
r = request_method()
if r is None:
raise RequestBlocked()
return r
# Most common attacks used to detect WAFs
attcom = [xssAttack, sqliAttack, lfiAttack]
attacks = [xssAttack, xxeAttack, lfiAttack, sqliAttack, oscAttack]
def genericdetect(self):
reason = ''
reasons = ['Blocking is being done at connection/packet level.',
'The server header is different when an attack is detected.',
'The server returns a different response code when an attack string is used.',
'It closed the connection for a normal request.',
'The response was different when the request wasn\'t made from a browser.'
]
try:
# Testing for no user-agent response. Detects almost all WAFs out there.
resp1 = self.performCheck(self.normalRequest)
if 'User-Agent' in self.headers:
del self.headers['User-Agent'] # Deleting the user-agent key from object not dict.
resp3 = self.customRequest(headers=def_headers)
if resp1.status_code != resp3.status_code:
self.log.info('Server returned a different response when request didn\'t contain the User-Agent header.')
reason = reasons[4]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a modified request is "%s"' % resp3.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a xss attack
resp2 = self.performCheck(self.xssAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a XSS attack vector was tried.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to cross-site scripting attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a lfi attack
resp2 = self.performCheck(self.lfiAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a directory traversal was attempted.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a file inclusion attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Testing the status code upon sending a sqli attack
resp2 = self.performCheck(self.sqliAttack)
if resp1.status_code != resp2.status_code:
self.log.info('Server returned a different response when a SQLi was attempted.')
reason = reasons[2]
reason += '\r\n'
reason += 'Normal response code is "%s",' % resp1.status_code
reason += ' while the response code to a SQL injection attack is "%s"' % resp2.status_code
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# Checking for the Server header after sending malicious requests
response = self.attackres
normalserver = resp1.headers.get('Server')
attackresponse_server = response.headers.get('Server')
if attackresponse_server:
if attackresponse_server != normalserver:
self.log.info('Server header changed, WAF possibly detected')
self.log.debug('Attack response: %s' % attackresponse_server)
self.log.debug('Normal response: %s' % normalserver)
reason = reasons[1]
reason += '\r\nThe server header for a normal response is "%s",' % normalserver
reason += ' while the server header a response to an attack is "%s",' % attackresponse_server
self.knowledge['generic']['reason'] = reason
self.knowledge['generic']['found'] = True
return True
# If at all request doesn't go, press F
except RequestBlocked:
self.knowledge['generic']['reason'] = reasons[0]
self.knowledge['generic']['found'] = True
return True
return False
def matchHeader(self, headermatch, attack=False):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
header, match = headermatch
headerval = r.headers.get(header)
if headerval:
# set-cookie can have multiple headers, python gives it to us
# concatinated with a comma
if header == 'Set-Cookie':
headervals = headerval.split(', ')
else:
headervals = [headerval]
for headerval in headervals:
if re.search(match, headerval, re.I):
return True
return False
def matchStatus(self, statuscode, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
if r.status_code == statuscode:
return True
return False
def matchCookie(self, match, attack=False):
return self.matchHeader(('Set-Cookie', match), attack=attack)
def matchReason(self, reasoncode, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
# We may need to match multiline context in response body
if str(r.reason) == reasoncode:
return True
return False
def matchContent(self, regex, attack=True):
if attack:
r = self.attackres
else: r = rq
if r is None:
return
# We may need to match multiline context in response body
if re.search(regex, r.text, re.I):
return True
return False
wafdetections = dict()
plugin_dict = load_plugins()
result_dict = {}
for plugin_module in plugin_dict.values():
wafdetections[plugin_module.NAME] = plugin_module.is_waf
# Check for prioritized ones first, then check those added externally
checklist = wafdetectionsprio
checklist += list(set(wafdetections.keys()) - set(checklist))
def identwaf(self, findall=False):
detected = list()
try:
self.attackres = self.performCheck(self.centralAttack)
except RequestBlocked:
return detected
for wafvendor in self.checklist:
self.log.info('Checking for %s' % wafvendor)
if self.wafdetections[wafvendor](self):
detected.append(wafvendor)
if not findall:
break
self.knowledge['wafname'] = detected
return detected
def calclogginglevel(verbosity):
default = 40 # errors are printed out
level = default - (verbosity * 10)
if level < 0:
level = 0
return level
def buildResultRecord(url, waf):
result = {}
result['url'] = url
if waf:
result['detected'] = True
if waf == 'generic':
result['firewall'] = 'Generic'
result['manufacturer'] = 'Unknown'
else:
result['firewall'] = waf.split('(')[0].strip()
result['manufacturer'] = waf.split('(')[1].replace(')', '').strip()
else:
result['detected'] = False
result['firewall'] = 'None'
result['manufacturer'] = 'None'
return result
def getTextResults(res=None):
# leaving out some space for future possibilities of newer columns
# newer columns can be added to this tuple below
keys = ('detected')
res = [({key: ba[key] for key in ba if key not in keys}) for ba in res]
rows = []
for dk in res:
p = [str(x) for _, x in dk.items()]
rows.append(p)
for m in rows:
m[1] = '%s (%s)' % (m[1], m[2])
m.pop()
defgen = [
(max([len(str(row[i])) for row in rows]) + 3)
for i in range(len(rows[0]))
]
rwfmt = "".join(["{:>"+str(dank)+"}" for dank in defgen])
textresults = []
for row in rows:
textresults.append(rwfmt.format(*row))
return textresults
def disableStdOut():
sys.stdout = None
def enableStdOut():
sys.stdout = sys.__stdout__
def getheaders(fn):
headers = {}
if not os.path.exists(fn):
logging.getLogger('wafw00f').critical('Headers file "%s" does not exist!' % fn)
return
with io.open(fn, 'r', encoding='utf-8') as f:
for line in f.readlines():
_t = line.split(':', 2)
if len(_t) == 2:
h, v = map(lambda x: x.strip(), _t)
headers[h] = v
return headers
class RequestBlocked(Exception):
pass
def main():
parser = OptionParser(usage='%prog url1 [url2 [url3 ... ]]\r\nexample: %prog http://www.victim.org/')
parser.add_option('-v', '--verbose', action='count', dest='verbose', default=0,
help='Enable verbosity, multiple -v options increase verbosity')
parser.add_option('-a', '--findall', action='store_true', dest='findall', default=False,
help='Find all WAFs which match the signatures, do not stop testing on the first one')
parser.add_option('-r', '--noredirect', action='store_false', dest='followredirect',
default=True, help='Do not follow redirections given by 3xx responses')
parser.add_option('-t', '--test', dest='test', help='Test for one specific WAF')
parser.add_option('-o', '--output', dest='output', help='Write output to csv, json or text file depending on file extension. For stdout, specify - as filename.',
default=None)
parser.add_option('-i', '--input-file', dest='input', help='Read targets from a file. Input format can be csv, json or text. For csv and json, a `url` column name or element is required.',
default=None)
parser.add_option('-l', '--list', dest='list', action='store_true',
default=False, help='List all WAFs that WAFW00F is able to detect')
parser.add_option('-p', '--proxy', dest='proxy', default=None,
help='Use an HTTP proxy to perform requests, examples: http://hostname:8080, socks5://hostname:1080, http://user:pass@hostname:8080')
parser.add_option('--version', '-V', dest='version', action='store_true',
default=False, help='Print out the current version of WafW00f and exit.')
parser.add_option('--headers', '-H', dest='headers', action='store', default=None,
help='Pass custom headers via a text file to overwrite the default header set.')
options, args = parser.parse_args()
logging.basicConfig(level=calclogginglevel(options.verbose))
log = logging.getLogger('wafw00f')
if options.output == '-':
disableStdOut()
print(randomArt())
if options.list:
print('[+] Can test for these WAFs:\r\n')
attacker = WAFW00F(None)
try:
m = [i.replace(')', '').split(' (') for i in wafdetectionsprio]
print(R+' WAF Name'+' '*24+'Manufacturer\n '+'-'*8+' '*24+'-'*12+'\n')
max_len = max(len(str(x)) for k in m for x in k)
for inner in m:
first = True
for elem in inner:
if first:
text = Y+" {:<{}} ".format(elem, max_len+2)
first = False
else:
text = W+"{:<{}} ".format(elem, max_len+2)
print(text, E, end="")
print()
sys.exit(0)
except Exception:
return
if options.version:
print('[+] The version of WAFW00F you have is %sv%s%s' % (B, __version__, E))
print('[+] WAFW00F is provided under the %s%s%s license.' % (C, __license__, E))
return
extraheaders = {}
if options.headers:
log.info('Getting extra headers from %s' % options.headers)
extraheaders = getheaders(options.headers)
if extraheaders is None:
parser.error('Please provide a headers file with colon delimited header names and values')
if len(args) == 0 and not options.input:
parser.error('No test target specified.')
#check if input file is present
if options.input:
log.debug("Loading file '%s'" % options.input)
try:
if options.input.endswith('.json'):
with open(options.input) as f:
try:
urls = json.loads(f.read())
except json.decoder.JSONDecodeError:
log.critical("JSON file %s did not contain well-formed JSON", options.input)
sys.exit(1)
log.info("Found: %s urls to check." %(len(urls)))
targets = [ item['url'] for item in urls ]
elif options.input.endswith('.csv'):
columns = defaultdict(list)
with open(options.input) as f:
reader = csv.DictReader(f)
for row in reader:
for (k,v) in row.items():
columns[k].append(v)
targets = columns['url']
else:
with open(options.input) as f:
targets = [x for x in f.read().splitlines()]
except FileNotFoundError:
log.error('File %s could not be read. No targets loaded.', options.input)
sys.exit(1)
else:
targets = args
results = []
for target in targets:
if not target.startswith('http'):
log.info('The url %s should start with http:// or https:// .. fixing (might make this unusable)' % target)
target = 'https://' + target
print('[*] Checking %s' % target)
pret = urlParser(target)
if pret is None:
log.critical('The url %s is not well formed' % target)
sys.exit(1)
(hostname, _, path, _, _) = pret
log.info('starting wafw00f on %s' % target)
proxies = dict()
if options.proxy:
proxies = {
"http": options.proxy,
"https": options.proxy,
}
attacker = WAFW00F(target, debuglevel=options.verbose, path=path,
followredirect=options.followredirect, extraheaders=extraheaders,
proxies=proxies)
global rq
rq = attacker.normalRequest()
if rq is None:
log.error('Site %s appears to be down' % hostname)
continue
if options.test:
if options.test in attacker.wafdetections:
waf = attacker.wafdetections[options.test](attacker)
if waf:
print('[+] The site %s%s%s is behind %s%s%s WAF.' % (B, target, E, C, options.test, E))
else:
print('[-] WAF %s was not detected on %s' % (options.test, target))
else:
print('[-] WAF %s was not found in our list\r\nUse the --list option to see what is available' % options.test)
return
waf = attacker.identwaf(options.findall)
log.info('Identified WAF: %s' % waf)
if len(waf) > 0:
for i in waf:
results.append(buildResultRecord(target, i))
print('[+] The site %s%s%s is behind %s%s%s WAF.' % (B, target, E, C, (E+' and/or '+C).join(waf), E))
if (options.findall) or len(waf) == 0:
print('[+] Generic Detection results:')
if attacker.genericdetect():
log.info('Generic Detection: %s' % attacker.knowledge['generic']['reason'])
print('[*] The site %s seems to be behind a WAF or some sort of security solution' % target)
print('[~] Reason: %s' % attacker.knowledge['generic']['reason'])
results.append(buildResultRecord(target, 'generic'))
else:
print('[-] No WAF detected by the generic detection')
results.append(buildResultRecord(target, None))
print('[~] Number of requests: %s' % attacker.requestnumber)
#print table of results
if len(results) > 0:
log.info("Found: %s matches." % (len(results)))
if options.output:
if options.output == '-':
enableStdOut()
print(os.linesep.join(getTextResults(results)))
elif options.output.endswith('.json'):
log.debug("Exporting data in json format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
json.dump(results, outfile, indent=2)
elif options.output.endswith('.csv'):
log.debug("Exporting data in csv format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
csvwriter = csv.writer(outfile, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
count = 0
for result in results:
if count == 0:
header = result.keys()
csvwriter.writerow(header)
count += 1
csvwriter.writerow(result.values())
else:
log.debug("Exporting data in text format to file: %s" % (options.output))
with open(options.output, 'w') as outfile:
outfile.write(os.linesep.join(getTextResults(results)))
def main1(target):
attacker = WAFW00F(target)
global rq
try:
rq = attacker.normalRequest()
except Exception as e:
print(f'ALERT', f'waf检测[{target}]访问出错{e}')
return False, None
if rq is None:
print(f'ALERT', f'waf检测[{target}]无法访问')
return False, None
try:
waf = attacker.identwaf(True)
except Exception as e:
print(f'ALERT', f'waf检测[{target}]检测出错{e}')
return False, None
if len(waf) > 0:
print(f'INFOR', f'waf检测[{target}]存在waf [{waf[0]}]')
return True, waf[0]
else:
return False, None
if __name__ == '__main__':
if sys.hexversion < 0x2060000:
sys.stderr.write('Your version of python is way too old... please update to 2.6 or later\r\n')
main()