2024.12.16第一次修改_AizenSousuke

AizenSousuke
DengYanjia 2 months ago
parent b2abccfa83
commit 51da9a2377

@ -1,72 +1,75 @@
import csv
import re
from netaddr import *
import xml.etree.ElementTree as ET
import pandas as pd
from datetime import datetime , timezone
from evtx import PyEvtxParser
from dateutil.parser import parse
from dateutil.parser import isoparse
from pytz import timezone
minlength=1000
Hunting_events=[{'Date and Time':[],'timestamp':[],'Channel':[],'Computer':[],'Event ID':[],'Original Event Log':[]}]
from netaddr import * # 导入netaddr库的所有内容用于处理网络地址
import xml.etree.ElementTree as ET # XML解析器
import pandas as pd # 数据分析库
from datetime import datetime, timezone # 日期时间处理
from evtx import PyEvtxParser # 解析Windows事件日志文件的库
from dateutil.parser import parse, isoparse # 解析日期时间字符串
from pytz import timezone # 处理时区
minlength = 1000 # 可能用于某个字符串长度的检查,但在这个文件中未使用
# 初始化一个字典列表,用于存储猎取的事件信息
Hunting_events = [{'Date and Time': [], 'timestamp': [], 'Channel': [], 'Computer': [], 'Event ID': [], 'Original Event Log': []}]
# 正则表达式用于从事件日志中提取特定信息
EventID_rex = re.compile('<EventID.*>(.*)<\/EventID>', re.IGNORECASE)
Channel_rex = re.compile('<Channel.*>(.*)<\/Channel>', re.IGNORECASE)
Computer_rex = re.compile('<Computer.*>(.*)<\/Computer>', re.IGNORECASE)
def Evtx_hunt(files,str_regexes,eid,input_timzone,output,timestart,timeend):
def Evtx_hunt(files, str_regexes, eid, input_timzone, output, timestart, timeend):
"""
解析并搜索Windows事件日志文件中的特定事件
参数:
- files: 要解析的事件日志文件列表
- str_regexes: 用于匹配事件数据的正则表达式列表
- eid: 事件ID如果提供则只搜索此ID的事件
- input_timzone: 输入日志的时区
- output: 输出文件名
- timestart, timeend: 搜索时间范围
"""
for file in files:
file=str(file)
print("Analyzing "+file)
file = str(file)
print("Analyzing " + file)
try:
parser = PyEvtxParser(file)
except:
print("Issue analyzing "+file +"\nplease check if its not corrupted")
print("Issue analyzing " + file + "\nplease check if its not corrupted")
continue
try:
for record in parser.records():
for record in parser.records():
try:
# 提取事件ID
EventID = EventID_rex.findall(record['data'])
# 如果提供了时间范围,则检查事件是否在该范围内
if timestart is not None and timeend is not None:
timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))
if not (timestamp > timestart and timestamp < timeend):
return
if len(EventID) > 0:
if eid is not None and EventID[0]!=eid:
continue
continue # 事件不在时间范围内,跳过
# 如果有EventID并且匹配eid如果eid不为None
if len(EventID) > 0 and (eid is None or EventID[0] == eid):
Computer = Computer_rex.findall(record['data'])
Channel = Channel_rex.findall(record['data'])
if len(Channel)>0:
channel=Channel[0]
else:
channel=" "
#print(record['data'])
# if record['data'].lower().find(str_regex.lower())>-1:
#print(str_regexes)
channel = Channel[0] if len(Channel) > 0 else " "
# 遍历所有提供的正则表达式
for str_regex in str_regexes:
rex=re.compile(str_regex, re.IGNORECASE)
#print(rex)
#print(rex.findall(record['data']))
rex = re.compile(str_regex, re.IGNORECASE)
if rex.findall(record['data']):
#print("EventID : "+EventID[0]+" , Data : "+record['data'])
# 如果匹配到正则表达式,记录事件信息
Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())))
Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat())
Hunting_events[0]['Channel'].append(channel)
Hunting_events[0]['Event ID'].append(EventID[0])
Hunting_events[0]['Computer'].append(Computer[0])
Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " "))
except Exception as e:
print("issue searching log : "+record['data']+"\n Error : "+print(e))
except Exception as e:
print("issue searching log : " + record['data'] + "\n Error : " + str(e)) # 修正了错误的打印函数调用
hunt_report(output)
def hunt_report(output):
"""
生成猎取事件的报告
参数:
- output: 输出CSV文件的前缀
"""
global Hunting_events
Events = pd.DataFrame(Hunting_events[0])
print("Found "+str(len(Hunting_events[0]["timestamp"]))+" Events")
Events.to_csv(output+"_hunting.csv", index=False)
print("Found " + str(len(Hunting_events[0]["timestamp"])) + " Events")
Events.to_csv(output + "_hunting.csv", index=False)

@ -7,25 +7,23 @@ import pandas as pd
import geoip2.database
import requests
from dateutil import parser, tz
import pandas as pd
import json
import csv
from pathlib import Path
start_time=0
end_time=0
# 初始化全局变量用于计时
start_time = 0
end_time = 0
# SQL查询语句用于检测密码喷洒攻击
password_spray_query = '''
WITH FailedLogins AS (
SELECT
UserId,
ClientIP,
ClientIP,
datetime(CreationTime) AS LoginDate
FROM
events
WHERE
Operation = 'UserLoginFailed'
)
SELECT
UserId,
@ -33,18 +31,18 @@ SELECT
COUNT(DISTINCT ClientIP) AS UniqueIPCount,
COUNT(*) AS FailedLoginAttempts,
LoginDate
FROM
FailedLogins
GROUP BY
UserId,
strftime('%Y-%m-%d %H', LoginDate)
strftime('%Y-%m-%d %H', LoginDate)
HAVING
COUNT(*) > 5 AND UniqueIPCount > 3
ORDER BY
FailedLoginAttempts DESC;
'''
'''
# SQL查询语句用于跟踪用户登录活动
user_logon_query = '''
SELECT
UserId,
@ -52,18 +50,19 @@ SELECT
COUNT(*) AS TotalLoginAttempts,
SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins,
SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins
FROM
events
where
FROM
events
WHERE
Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed'
GROUP BY
UserId,
LoginDate
ORDER BY
LoginDate,
UserId;
GROUP BY
UserId,
LoginDate
ORDER BY
LoginDate,
UserId;
'''
# SQL查询语句用于统计用户执行的操作
User_operations_query = '''
SELECT
UserId,
@ -77,12 +76,13 @@ ORDER BY
OperationCount DESC;
'''
# SQL查询语句用于按天统计用户操作
user_operation_by_day_query = '''
SELECT
UserId,
DATE(CreationTime) AS OperationDate,
COUNT(DISTINCT Operation) AS OperationCount,
GROUP_CONCAT( Operation, ', ') AS UniqueOperations
GROUP_CONCAT(Operation, ', ') AS UniqueOperations
FROM
events
GROUP BY
@ -92,138 +92,162 @@ ORDER BY
OperationCount DESC
'''
def convert_csv(input_file, temp):
"""
将CSV文件转换为JSON格式的文件
def convert_csv(input_file,temp):
with open(input_file, 'r', encoding='utf-8') as csv_file:
# Create a CSV reader
reader = csv.DictReader(csv_file)
json_file = 'audit_data.json'
json_file=os.path.join(temp, json_file)
with open(json_file, 'w', encoding='utf-8') as jsonl_file:
# Extract and write the AuditData column to a file as JSON Lines
for row in reader:
# Extract the AuditData which is already a JSON formatted string
json_data = json.loads(row['AuditData'])
# Convert the JSON object back to a string to store in the file
json_string = json.dumps(json_data)
# Write the JSON string to the file with a newline
jsonl_file.write(json_string + '\n')
参数:
- input_file: 输入的CSV文件路径
- temp: 临时目录路径
返回:
- json_file: 生成的JSON文件路径
"""
json_file = os.path.join(temp, 'audit_data.json')
with open(input_file, 'r', encoding='utf-8') as csv_file, open(json_file, 'w', encoding='utf-8') as jsonl_file:
reader = csv.DictReader(csv_file)
for row in reader:
json_data = json.loads(row['AuditData'])
json_string = json.dumps(json_data)
jsonl_file.write(json_string + '\n')
return json_file
def flatten_json_file(input_file, timezone, chunk_size=10000):
# Read the JSON file in chunks
"""
将JSON文件展平并处理时间戳
参数:
- input_file: 输入的JSON文件路径
- timezone: 目标时区
- chunk_size: 处理的块大小
返回:
- DataFrame: 展平后的数据
"""
chunks = []
with open(input_file, 'r') as file:
lines = file.readlines()
for i in range(0, len(lines), chunk_size):
chunk = [json.loads(line) for line in lines[i:i + chunk_size]]
# Convert the CreationTime to the desired timezone
for record in chunk:
if 'CreationTime' in record:
# Parse the CreationTime
creation_time = parser.parse(record['CreationTime'])
# Check if the datetime object is timezone aware
if creation_time.tzinfo is None:
# Assume the original time is in UTC if no timezone info is present
creation_time = creation_time.replace(tzinfo=tz.tzutc())
# Convert the CreationTime to the desired timezone
record['CreationTime'] = creation_time.astimezone(timezone).isoformat()
chunks.append(pd.json_normalize(chunk))
# Concatenate all chunks into a single DataFrame
flattened_records = pd.concat(chunks, ignore_index=True)
return flattened_records
return pd.concat(chunks, ignore_index=True)
def create_sqlite_db_from_dataframe(dataframe, db_name):
conn = sqlite3.connect(db_name)
"""
从Pandas DataFrame创建SQLite数据库
# Convert all columns to string
参数:
- dataframe: 包含数据的Pandas DataFrame
- db_name: SQLite数据库文件名
"""
conn = sqlite3.connect(db_name)
dataframe = dataframe.astype(str)
# Write the DataFrame to SQLite, treating all fields as text
dataframe.to_sql('events', conn, if_exists='replace', index=False,
dtype={col_name: 'TEXT' for col_name in dataframe.columns})
conn.close()
def read_detection_rules(rule_file):
with open(rule_file, 'r') as file:
rules = json.load(file)
return rules
"""
从文件中读取检测规则
参数:
- rule_file: 包含检测规则的JSON文件路径
返回:
- rules: 规则列表
"""
with open(rule_file, 'r') as file:
return json.load(file)
def apply_detection_logic_sqlite(db_name, rules):
"""
应用检测逻辑到SQLite数据库
参数:
- db_name: SQLite数据库文件名
- rules: 检测规则列表
返回:
- DataFrame: 检测到的异常事件
"""
conn = sqlite3.connect(db_name)
all_detected_events = []
for rule in rules:
rule_name = rule['name']
severity = rule['severity']
query = rule['query']
detected_events = pd.read_sql_query(query, conn)
detected_events['RuleName'] = rule_name
detected_events['Severity'] = severity
all_detected_events.append(detected_events)
conn.close()
if all_detected_events:
result = pd.concat(all_detected_events, ignore_index=True)
else:
result = pd.DataFrame()
return result
return pd.concat(all_detected_events, ignore_index=True) if all_detected_events else pd.DataFrame()
def download_geolite_db(geolite_db_path):
"""
下载GeoLite2数据库用于IP地理定位
参数:
- geolite_db_path: 保存GeoLite2数据库的路径
"""
url = "https://git.io/GeoLite2-Country.mmdb"
print(f"Downloading GeoLite2 database from {url}...")
response = requests.get(url)
response.raise_for_status() # Check if the download was successful
response.raise_for_status()
with open(geolite_db_path, 'wb') as file:
file.write(response.content)
print(f"GeoLite2 database downloaded and saved to {geolite_db_path}")
def get_country_from_ip(ip, reader):
"""
根据IP地址获取国家名称
参数:
- ip: IP地址
- reader: GeoLite2数据库的读取器
返回:
- str: 国家名称或'Unknown'如果无法解析
"""
try:
response = reader.country(ip)
return response.country.name
return reader.country(ip).country.name
except Exception as e:
#print(f"Could not resolve IP {ip}: {e}")
print(f"Could not resolve IP {ip}: {e}")
return 'Unknown'
def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False,
geolite_db_path='GeoLite2-Country.mmdb'):
"""
分析Office 365审计日志并生成报告
参数:
- auditfile: Office 365审计日志文件路径
- rule_file: 检测规则文件路径
- output: 输出目录
- timezone: 目标时区
- include_flattened_data: 是否包含展平后的数据
- geolite_db_path: GeoLite2数据库文件路径
"""
global start_time, end_time
start_time = time.time()
temp_dir = ".temp"
if output is None or output == "":
output = os.path.splitext(auditfile)[0]
try:
# Create necessary directories
os.makedirs(output, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)
# Check if the GeoLite2 database exists, and download it if not
if not os.path.exists(geolite_db_path):
download_geolite_db(geolite_db_path)
# Convert CSV to JSON (assuming convert_csv is a valid function that you have)
json_file = convert_csv(auditfile, temp_dir)
# Input and output file paths
input_file = json_file
db_name = os.path.join(temp_dir, 'audit_data.db')
@ -231,36 +255,28 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
rule_file = 'O365_detection_rules.json'
output_file = f"{output}_o365_report.xlsx"
# Measure the start time
# Flatten the JSON file
# 展平JSON数据并处理时间戳
flattened_df = flatten_json_file(input_file, timezone)
# Create SQLite database from the flattened DataFrame
# 创建SQLite数据库
create_sqlite_db_from_dataframe(flattened_df, db_name)
# Open the GeoLite2 database
# 使用GeoLite2数据库解析IP地址
with geoip2.database.Reader(geolite_db_path) as reader:
# Resolve ClientIP to country names
if 'ClientIP' in flattened_df.columns:
flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader))
# Read detection rules
# 读取检测规则并应用
rules = read_detection_rules(rule_file)
# Apply detection logic using SQLite
detected_events = apply_detection_logic_sqlite(db_name, rules)
# Reorder columns to make RuleName the first column
# 重新排序DataFrame列以便RuleName在前
if not detected_events.empty:
columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if
col not in ['RuleName', 'Severity']]
columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if col not in ['RuleName', 'Severity']]
detected_events = detected_events[columns]
# Perform the brute-force detection query
# 执行其他SQL查询
conn = sqlite3.connect(db_name)
try:
user_login_tracker_df = pd.read_sql_query(user_logon_query, conn)
password_spray_df = pd.read_sql_query(password_spray_query, conn)
@ -269,20 +285,19 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
finally:
conn.close()
# Create a new workbook with the detection results
# 生成Excel报告
with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
if include_flattened_data:
# Split the flattened data into multiple sheets if needed
# 将展平后的数据分成多个工作表
max_rows_per_sheet = 65000
num_sheets = len(flattened_df) // max_rows_per_sheet + 1
for i in range(num_sheets):
start_row = i * max_rows_per_sheet
end_row = (i + 1) * max_rows_per_sheet
sheet_name = f'Flattened Data {i + 1}'
flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False)
# Write statistics for various fields
# 写入各种统计信息到不同的工作表
detected_events.to_excel(writer, sheet_name='Detection Results', index=False)
user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False)
password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False)
@ -293,10 +308,8 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats')
flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats')
flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats')
flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer,
sheet_name='AuthenticationType Stats')
flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, sheet_name='AuthenticationType Stats')
# Measure the end time
end_time = time.time()
print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds")
@ -304,18 +317,12 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
print(f"An error occurred during the analysis: {e}")
finally:
#Clean up the temporary directory
# 清理临时目录
if os.path.exists(temp_dir):
for file in Path(temp_dir).glob('*'):
file.unlink() # Delete the file
os.rmdir(temp_dir) # Remove the directory
file.unlink()
os.rmdir(temp_dir)
# Write the User Login Tracker results to a new sheet
# Measure the end time
end_time = time.time()
# Calculate and print the running time
running_time = end_time - start_time
print(f"Office365 hunter finished in time: {running_time:.2f} seconds")

Loading…
Cancel
Save