diff --git a/doc/Apt-hunter_analyse泛读报告.docx b/doc/Apt-hunter_analyse泛读报告.docx index 30de859..7fb65e2 100644 Binary files a/doc/Apt-hunter_analyse泛读报告.docx and b/doc/Apt-hunter_analyse泛读报告.docx differ diff --git a/src/lib/EvtxHunt.py b/src/lib/EvtxHunt.py index 54c242e..c3eebc3 100644 --- a/src/lib/EvtxHunt.py +++ b/src/lib/EvtxHunt.py @@ -1,72 +1,75 @@ import csv import re -from netaddr import * -import xml.etree.ElementTree as ET -import pandas as pd -from datetime import datetime , timezone -from evtx import PyEvtxParser -from dateutil.parser import parse -from dateutil.parser import isoparse -from pytz import timezone -minlength=1000 - -Hunting_events=[{'Date and Time':[],'timestamp':[],'Channel':[],'Computer':[],'Event ID':[],'Original Event Log':[]}] - +from netaddr import * # 导入netaddr库的所有内容,用于处理网络地址 +import xml.etree.ElementTree as ET # XML解析器 +import pandas as pd # 数据分析库 +from datetime import datetime, timezone # 日期时间处理 +from evtx import PyEvtxParser # 解析Windows事件日志文件的库 +from dateutil.parser import parse, isoparse # 解析日期时间字符串 +from pytz import timezone # 处理时区 +minlength = 1000 # 可能用于某个字符串长度的检查,但在这个文件中未使用 +# 初始化一个字典列表,用于存储猎取的事件信息 +Hunting_events = [{'Date and Time': [], 'timestamp': [], 'Channel': [], 'Computer': [], 'Event ID': [], 'Original Event Log': []}] +# 正则表达式用于从事件日志中提取特定信息 EventID_rex = re.compile('(.*)<\/EventID>', re.IGNORECASE) Channel_rex = re.compile('(.*)<\/Channel>', re.IGNORECASE) Computer_rex = re.compile('(.*)<\/Computer>', re.IGNORECASE) -def Evtx_hunt(files,str_regexes,eid,input_timzone,output,timestart,timeend): +def Evtx_hunt(files, str_regexes, eid, input_timzone, output, timestart, timeend): + """ + 解析并搜索Windows事件日志文件中的特定事件。 + 参数: + - files: 要解析的事件日志文件列表 + - str_regexes: 用于匹配事件数据的正则表达式列表 + - eid: 事件ID,如果提供则只搜索此ID的事件 + - input_timzone: 输入日志的时区 + - output: 输出文件名 + - timestart, timeend: 搜索时间范围 + """ for file in files: - file=str(file) - print("Analyzing "+file) + file = str(file) + print("Analyzing " + file) try: parser = PyEvtxParser(file) except: - print("Issue analyzing "+file +"\nplease check if its not corrupted") + print("Issue analyzing " + file + "\nplease check if its not corrupted") continue - try: - - for record in parser.records(): - + + for record in parser.records(): + try: + # 提取事件ID EventID = EventID_rex.findall(record['data']) - + # 如果提供了时间范围,则检查事件是否在该范围内 if timestart is not None and timeend is not None: timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())) if not (timestamp > timestart and timestamp < timeend): - return - if len(EventID) > 0: - if eid is not None and EventID[0]!=eid: - continue - + continue # 事件不在时间范围内,跳过 + # 如果有EventID并且匹配eid(如果eid不为None) + if len(EventID) > 0 and (eid is None or EventID[0] == eid): Computer = Computer_rex.findall(record['data']) Channel = Channel_rex.findall(record['data']) - if len(Channel)>0: - channel=Channel[0] - else: - channel=" " - #print(record['data']) - # if record['data'].lower().find(str_regex.lower())>-1: - #print(str_regexes) + channel = Channel[0] if len(Channel) > 0 else " " + # 遍历所有提供的正则表达式 for str_regex in str_regexes: - rex=re.compile(str_regex, re.IGNORECASE) - #print(rex) - #print(rex.findall(record['data'])) + rex = re.compile(str_regex, re.IGNORECASE) if rex.findall(record['data']): - #print("EventID : "+EventID[0]+" , Data : "+record['data']) + # 如果匹配到正则表达式,记录事件信息 Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))) Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat()) Hunting_events[0]['Channel'].append(channel) Hunting_events[0]['Event ID'].append(EventID[0]) Hunting_events[0]['Computer'].append(Computer[0]) Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " ")) - except Exception as e: - print("issue searching log : "+record['data']+"\n Error : "+print(e)) + except Exception as e: + print("issue searching log : " + record['data'] + "\n Error : " + str(e)) # 修正了错误的打印函数调用 hunt_report(output) - - def hunt_report(output): + """ + 生成猎取事件的报告。 + 参数: + - output: 输出CSV文件的前缀 + """ global Hunting_events Events = pd.DataFrame(Hunting_events[0]) - print("Found "+str(len(Hunting_events[0]["timestamp"]))+" Events") - Events.to_csv(output+"_hunting.csv", index=False) + print("Found " + str(len(Hunting_events[0]["timestamp"])) + " Events") + Events.to_csv(output + "_hunting.csv", index=False) diff --git a/src/lib/O365Hunter.py b/src/lib/O365Hunter.py index 7df256a..c842205 100644 --- a/src/lib/O365Hunter.py +++ b/src/lib/O365Hunter.py @@ -7,25 +7,23 @@ import pandas as pd import geoip2.database import requests from dateutil import parser, tz -import pandas as pd -import json -import csv from pathlib import Path -start_time=0 -end_time=0 +# 初始化全局变量用于计时 +start_time = 0 +end_time = 0 + +# SQL查询语句用于检测密码喷洒攻击 password_spray_query = ''' WITH FailedLogins AS ( - SELECT UserId, - ClientIP, + ClientIP, datetime(CreationTime) AS LoginDate FROM events WHERE Operation = 'UserLoginFailed' - ) SELECT UserId, @@ -33,18 +31,18 @@ SELECT COUNT(DISTINCT ClientIP) AS UniqueIPCount, COUNT(*) AS FailedLoginAttempts, LoginDate - FROM FailedLogins GROUP BY UserId, - strftime('%Y-%m-%d %H', LoginDate) + strftime('%Y-%m-%d %H', LoginDate) HAVING COUNT(*) > 5 AND UniqueIPCount > 3 ORDER BY FailedLoginAttempts DESC; - ''' +''' +# SQL查询语句用于跟踪用户登录活动 user_logon_query = ''' SELECT UserId, @@ -52,18 +50,19 @@ SELECT COUNT(*) AS TotalLoginAttempts, SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins, SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins - FROM - events - where +FROM + events +WHERE Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed' - GROUP BY - UserId, - LoginDate - ORDER BY - LoginDate, - UserId; +GROUP BY + UserId, + LoginDate +ORDER BY + LoginDate, + UserId; ''' +# SQL查询语句用于统计用户执行的操作 User_operations_query = ''' SELECT UserId, @@ -77,12 +76,13 @@ ORDER BY OperationCount DESC; ''' +# SQL查询语句用于按天统计用户操作 user_operation_by_day_query = ''' SELECT UserId, DATE(CreationTime) AS OperationDate, COUNT(DISTINCT Operation) AS OperationCount, - GROUP_CONCAT( Operation, ', ') AS UniqueOperations + GROUP_CONCAT(Operation, ', ') AS UniqueOperations FROM events GROUP BY @@ -92,138 +92,162 @@ ORDER BY OperationCount DESC ''' +def convert_csv(input_file, temp): + """ + 将CSV文件转换为JSON格式的文件。 -def convert_csv(input_file,temp): - with open(input_file, 'r', encoding='utf-8') as csv_file: - # Create a CSV reader - reader = csv.DictReader(csv_file) - - json_file = 'audit_data.json' - json_file=os.path.join(temp, json_file) - with open(json_file, 'w', encoding='utf-8') as jsonl_file: - # Extract and write the AuditData column to a file as JSON Lines - for row in reader: - # Extract the AuditData which is already a JSON formatted string - json_data = json.loads(row['AuditData']) - # Convert the JSON object back to a string to store in the file - json_string = json.dumps(json_data) - # Write the JSON string to the file with a newline - jsonl_file.write(json_string + '\n') + 参数: + - input_file: 输入的CSV文件路径 + - temp: 临时目录路径 + 返回: + - json_file: 生成的JSON文件路径 + """ + json_file = os.path.join(temp, 'audit_data.json') + with open(input_file, 'r', encoding='utf-8') as csv_file, open(json_file, 'w', encoding='utf-8') as jsonl_file: + reader = csv.DictReader(csv_file) + for row in reader: + json_data = json.loads(row['AuditData']) + json_string = json.dumps(json_data) + jsonl_file.write(json_string + '\n') return json_file - def flatten_json_file(input_file, timezone, chunk_size=10000): - # Read the JSON file in chunks + """ + 将JSON文件展平并处理时间戳。 + + 参数: + - input_file: 输入的JSON文件路径 + - timezone: 目标时区 + - chunk_size: 处理的块大小 + + 返回: + - DataFrame: 展平后的数据 + """ chunks = [] with open(input_file, 'r') as file: lines = file.readlines() for i in range(0, len(lines), chunk_size): chunk = [json.loads(line) for line in lines[i:i + chunk_size]] - - # Convert the CreationTime to the desired timezone for record in chunk: if 'CreationTime' in record: - # Parse the CreationTime creation_time = parser.parse(record['CreationTime']) - - # Check if the datetime object is timezone aware if creation_time.tzinfo is None: - # Assume the original time is in UTC if no timezone info is present creation_time = creation_time.replace(tzinfo=tz.tzutc()) - - # Convert the CreationTime to the desired timezone record['CreationTime'] = creation_time.astimezone(timezone).isoformat() - chunks.append(pd.json_normalize(chunk)) - - # Concatenate all chunks into a single DataFrame - flattened_records = pd.concat(chunks, ignore_index=True) - - return flattened_records - + return pd.concat(chunks, ignore_index=True) def create_sqlite_db_from_dataframe(dataframe, db_name): - conn = sqlite3.connect(db_name) + """ + 从Pandas DataFrame创建SQLite数据库。 - # Convert all columns to string + 参数: + - dataframe: 包含数据的Pandas DataFrame + - db_name: SQLite数据库文件名 + """ + conn = sqlite3.connect(db_name) dataframe = dataframe.astype(str) - - # Write the DataFrame to SQLite, treating all fields as text dataframe.to_sql('events', conn, if_exists='replace', index=False, dtype={col_name: 'TEXT' for col_name in dataframe.columns}) - conn.close() - def read_detection_rules(rule_file): - with open(rule_file, 'r') as file: - rules = json.load(file) - return rules + """ + 从文件中读取检测规则。 + + 参数: + - rule_file: 包含检测规则的JSON文件路径 + 返回: + - rules: 规则列表 + """ + with open(rule_file, 'r') as file: + return json.load(file) def apply_detection_logic_sqlite(db_name, rules): + """ + 应用检测逻辑到SQLite数据库。 + + 参数: + - db_name: SQLite数据库文件名 + - rules: 检测规则列表 + + 返回: + - DataFrame: 检测到的异常事件 + """ conn = sqlite3.connect(db_name) all_detected_events = [] - for rule in rules: rule_name = rule['name'] severity = rule['severity'] query = rule['query'] - detected_events = pd.read_sql_query(query, conn) detected_events['RuleName'] = rule_name detected_events['Severity'] = severity - all_detected_events.append(detected_events) - conn.close() - - if all_detected_events: - result = pd.concat(all_detected_events, ignore_index=True) - else: - result = pd.DataFrame() - - return result + return pd.concat(all_detected_events, ignore_index=True) if all_detected_events else pd.DataFrame() def download_geolite_db(geolite_db_path): + """ + 下载GeoLite2数据库用于IP地理定位。 + + 参数: + - geolite_db_path: 保存GeoLite2数据库的路径 + """ url = "https://git.io/GeoLite2-Country.mmdb" print(f"Downloading GeoLite2 database from {url}...") response = requests.get(url) - response.raise_for_status() # Check if the download was successful - + response.raise_for_status() with open(geolite_db_path, 'wb') as file: file.write(response.content) print(f"GeoLite2 database downloaded and saved to {geolite_db_path}") def get_country_from_ip(ip, reader): + """ + 根据IP地址获取国家名称。 + + 参数: + - ip: IP地址 + - reader: GeoLite2数据库的读取器 + + 返回: + - str: 国家名称或'Unknown'如果无法解析 + """ try: - response = reader.country(ip) - return response.country.name + return reader.country(ip).country.name except Exception as e: - #print(f"Could not resolve IP {ip}: {e}") + print(f"Could not resolve IP {ip}: {e}") return 'Unknown' - def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False, geolite_db_path='GeoLite2-Country.mmdb'): + """ + 分析Office 365审计日志并生成报告。 + + 参数: + - auditfile: Office 365审计日志文件路径 + - rule_file: 检测规则文件路径 + - output: 输出目录 + - timezone: 目标时区 + - include_flattened_data: 是否包含展平后的数据 + - geolite_db_path: GeoLite2数据库文件路径 + """ + global start_time, end_time start_time = time.time() temp_dir = ".temp" if output is None or output == "": output = os.path.splitext(auditfile)[0] + try: - # Create necessary directories os.makedirs(output, exist_ok=True) os.makedirs(temp_dir, exist_ok=True) - # Check if the GeoLite2 database exists, and download it if not if not os.path.exists(geolite_db_path): download_geolite_db(geolite_db_path) - # Convert CSV to JSON (assuming convert_csv is a valid function that you have) json_file = convert_csv(auditfile, temp_dir) - - # Input and output file paths input_file = json_file db_name = os.path.join(temp_dir, 'audit_data.db') @@ -231,36 +255,28 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data rule_file = 'O365_detection_rules.json' output_file = f"{output}_o365_report.xlsx" - # Measure the start time - - - # Flatten the JSON file + # 展平JSON数据并处理时间戳 flattened_df = flatten_json_file(input_file, timezone) - # Create SQLite database from the flattened DataFrame + # 创建SQLite数据库 create_sqlite_db_from_dataframe(flattened_df, db_name) - # Open the GeoLite2 database + # 使用GeoLite2数据库解析IP地址 with geoip2.database.Reader(geolite_db_path) as reader: - # Resolve ClientIP to country names if 'ClientIP' in flattened_df.columns: flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader)) - # Read detection rules + # 读取检测规则并应用 rules = read_detection_rules(rule_file) - - # Apply detection logic using SQLite detected_events = apply_detection_logic_sqlite(db_name, rules) - # Reorder columns to make RuleName the first column + # 重新排序DataFrame列以便RuleName在前 if not detected_events.empty: - columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if - col not in ['RuleName', 'Severity']] + columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if col not in ['RuleName', 'Severity']] detected_events = detected_events[columns] - # Perform the brute-force detection query + # 执行其他SQL查询 conn = sqlite3.connect(db_name) - try: user_login_tracker_df = pd.read_sql_query(user_logon_query, conn) password_spray_df = pd.read_sql_query(password_spray_query, conn) @@ -269,20 +285,19 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data finally: conn.close() - # Create a new workbook with the detection results + # 生成Excel报告 with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer: if include_flattened_data: - # Split the flattened data into multiple sheets if needed + # 将展平后的数据分成多个工作表 max_rows_per_sheet = 65000 num_sheets = len(flattened_df) // max_rows_per_sheet + 1 - for i in range(num_sheets): start_row = i * max_rows_per_sheet end_row = (i + 1) * max_rows_per_sheet sheet_name = f'Flattened Data {i + 1}' flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False) - # Write statistics for various fields + # 写入各种统计信息到不同的工作表 detected_events.to_excel(writer, sheet_name='Detection Results', index=False) user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False) password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False) @@ -293,10 +308,8 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats') flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats') flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats') - flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, - sheet_name='AuthenticationType Stats') + flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, sheet_name='AuthenticationType Stats') - # Measure the end time end_time = time.time() print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds") @@ -304,18 +317,12 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data print(f"An error occurred during the analysis: {e}") finally: - #Clean up the temporary directory + # 清理临时目录 if os.path.exists(temp_dir): for file in Path(temp_dir).glob('*'): - file.unlink() # Delete the file - os.rmdir(temp_dir) # Remove the directory + file.unlink() + os.rmdir(temp_dir) - - # Write the User Login Tracker results to a new sheet - - # Measure the end time end_time = time.time() - - # Calculate and print the running time running_time = end_time - start_time print(f"Office365 hunter finished in time: {running_time:.2f} seconds") diff --git a/src/lib/SigmaHunter.py b/src/lib/SigmaHunter.py index 9114758..c374685 100644 --- a/src/lib/SigmaHunter.py +++ b/src/lib/SigmaHunter.py @@ -493,71 +493,115 @@ def optimised_parse_mp(file): 'ParentUser': ['Event_EventData_ParentUser']} parser = PyEvtxParser(str(file)) - for record in parser.records_json(): - - data=flatten(json.loads(record["data"])) - for key in mapping.keys(): - requiredfield = "None" - for field in mapping[key]: - if field in data: - requiredfield=field - break - - if requiredfield!="None": - if isinstance(data[requiredfield], list): - Alldata[key].append(",".join(data[requiredfield])) - else: - Alldata[key].append(str(data[requiredfield])) +for record in parser.records_json(): + # 将JSON格式的事件数据展平,方便后续处理 + data = flatten(json.loads(record["data"])) + + for key in mapping.keys(): + requiredfield = "None" + # 遍历mapping中的字段,找到第一个在数据中存在的字段 + for field in mapping[key]: + if field in data: + requiredfield = field + break + + if requiredfield != "None": + # 如果找到的字段值是一个列表,则将列表中的值以逗号分隔并加入到Alldata中 + if isinstance(data[requiredfield], list): + Alldata[key].append(",".join(data[requiredfield])) else: - if field == "Original_Event_Log": - Alldata[key].append(record["data"]) - #Alldata[key].append(None) - else: - - Alldata[key].append(None) + # 如果不是列表,直接将值转换为字符串并加入到Alldata中 + Alldata[key].append(str(data[requiredfield])) + else: + # 如果没有找到任何匹配的字段 + if field == "Original_Event_Log": + # 对于原始事件日志,将整个事件数据加入到Alldata中 + Alldata[key].append(record["data"]) + else: + # 对于其他未找到的字段,添加None值 + Alldata[key].append(None) #print("finished Parsing") #print(Alldata) + + # 使用锁来确保多进程环境下的数据插入是线程安全的 l.acquire() #print("Inserting data into "+DB) insert_into_db_mp(Alldata, DB) l.release() - print("Done Parsing : "+str(file)) - + print("Done Parsing : " + str(file)) def clean(DBName): - # specify the path to the file to be removed + """ + 清理指定的SQLite数据库文件。 + + 参数: + - DBName: SQLite数据库文件名 + """ file_path = DBName - # check if the file exists + # 检查文件是否存在 if os.path.isfile(file_path): - # remove the file + # 删除文件 os.remove(file_path) print(f"Temp Database has been removed.") else: print(f"Temp Database does not exist.") - def init(l): + """ + 初始化进程的全局锁。 + + 参数: + - l: 锁对象 + """ global lock lock = l - -def Sigma_Analyze(Path, rules,output, DBName="Events.sqlite"): - global l,DBconn,DB +def Sigma_Analyze(Path, rules, output, DBName="Events.sqlite"): + """ + 使用Sigma规则分析Windows事件日志。 + + 参数: + - Path: 事件日志文件路径 + - rules: Sigma规则文件路径 + - output: 输出文件名前缀 + - DBName: SQLite数据库文件名 + """ + global l, DBconn, DB tic_start = time.time() - DB=DBName + DB = DBName + # 创建SQLite数据库 Create_DB(DB) print("Analyzing logs using Sigma with below config : ") print(f"Logs Path : {Path}\nSigma Rules file : {rules}\nProfile : {output}") + + # 使用多进程加速处理 pool = multiprocessing.Pool(multiprocessing.cpu_count(), initializer=init, initargs=(l,)) + + # 自动检测日志文件 files = auto_detect(Path) + + # 多进程解析日志文件 results = pool.map(optimised_parse_mp, files) + + # 将Sigma规则插入数据库 RulesToDB(rules, DB) + + # 连接到数据库 DBconn = sqlite3.connect(DB) - optimised_search(DB,output) + + # 优化搜索,应用Sigma规则 + optimised_search(DB, output) + + # 清理临时数据库 clean(DBName) + + # 关闭数据库连接 DBconn.close() + toc_end = time.time() - print("Analysis results availble as CSV file with Name "+output+'_'+'Detections.csv') - print("Analysis results availble as Excel file with statistics as "+output+'_'+'Detections.xlsx') + # 打印分析结果的输出文件名 + print("Analysis results available as CSV file with Name " + output + '_' + 'Detections.csv') + print("Analysis results available as Excel file with statistics as " + output + '_' + 'Detections.xlsx') +