from 邓焱嘉

main
DengYanjia 1 month ago
parent 71f46cb052
commit 4cbd982f74

@ -1,13 +1,13 @@
import json
import sqlite3
import tempfile
import os
import time
import pandas as pd
import geoip2.database
import requests
from dateutil import parser, tz
from pathlib import Path
import json # 导入用于处理JSON数据的模块
import sqlite3 # 导入用于操作SQLite数据库的模块
import tempfile # 导入用于创建临时文件和目录的模块
import os # 导入用于操作系统功能的模块
import time # 导入用于处理时间相关功能的模块
import pandas as pd # 导入用于数据处理和分析的Pandas库
import geoip2.database # 导入用于GeoLite2数据库的GeoIP2库
import requests # 导入用于发送HTTP请求的模块
from dateutil import parser, tz # 导入用于解析和处理日期时间的模块
from pathlib import Path # 导入用于处理文件路径的模块
# 初始化全局变量用于计时
start_time = 0
@ -103,13 +103,22 @@ def convert_csv(input_file, temp):
返回:
- json_file: 生成的JSON文件路径
"""
# 创建一个新的JSON文件路径结合临时目录和文件名
json_file = os.path.join(temp, 'audit_data.json')
# 同时打开输入的CSV文件进行读取和新的JSON文件进行写入设置编码为UTF-8
# 使用上下文管理器确保文件正确关闭
with open(input_file, 'r', encoding='utf-8') as csv_file, open(json_file, 'w', encoding='utf-8') as jsonl_file:
# 使用csv.DictReader来读取CSV文件每行会转换为字典
reader = csv.DictReader(csv_file)
# 迭代读取CSV文件的每一行
for row in reader:
# 将CSV文件中'AuditData'字段的字符串解析为JSON对象
json_data = json.loads(row['AuditData'])
# 将JSON对象再次转换为字符串
json_string = json.dumps(json_data)
# 将转换后的JSON字符串写入json文件每行一个JSON对象以换行符结束
jsonl_file.write(json_string + '\n')
# 返回新创建的JSON文件的路径
return json_file
def flatten_json_file(input_file, timezone, chunk_size=10000):
@ -124,18 +133,30 @@ def flatten_json_file(input_file, timezone, chunk_size=10000):
返回:
- DataFrame: 展平后的数据
"""
# 初始化一个空列表用于存储数据块
chunks = []
# 打开输入的JSON文件进行读取
with open(input_file, 'r') as file:
# 读取所有行到一个列表中
lines = file.readlines()
# 按块大小迭代处理行
for i in range(0, len(lines), chunk_size):
# 将当前块的每一行解析为JSON对象
chunk = [json.loads(line) for line in lines[i:i + chunk_size]]
# 处理每个记录
for record in chunk:
# 如果记录中包含'CreationTime'字段
if 'CreationTime' in record:
# 解析'CreationTime'字段为日期时间对象
creation_time = parser.parse(record['CreationTime'])
# 如果日期时间对象没有时区信息设置为UTC
if creation_time.tzinfo is None:
creation_time = creation_time.replace(tzinfo=tz.tzutc())
# 将日期时间对象转换为目标时区并格式化为ISO格式字符串
record['CreationTime'] = creation_time.astimezone(timezone).isoformat()
# 将当前块展平并添加到数据块列表中
chunks.append(pd.json_normalize(chunk))
# 合并所有数据块为一个DataFrame并返回
return pd.concat(chunks, ignore_index=True)
def create_sqlite_db_from_dataframe(dataframe, db_name):
@ -146,10 +167,14 @@ def create_sqlite_db_from_dataframe(dataframe, db_name):
- dataframe: 包含数据的Pandas DataFrame
- db_name: SQLite数据库文件名
"""
# 连接到SQLite数据库如果数据库不存在则会创建
conn = sqlite3.connect(db_name)
# 将DataFrame中的所有列转换为字符串类型
dataframe = dataframe.astype(str)
# 将DataFrame写入SQLite数据库中的'table'表,如果表已存在则替换
dataframe.to_sql('events', conn, if_exists='replace', index=False,
dtype={col_name: 'TEXT' for col_name in dataframe.columns})
# 关闭数据库连接
conn.close()
def read_detection_rules(rule_file):
@ -176,17 +201,29 @@ def apply_detection_logic_sqlite(db_name, rules):
返回:
- DataFrame: 检测到的异常事件
"""
# 连接到SQLite数据库
conn = sqlite3.connect(db_name)
# 初始化一个空列表用于存储所有检测到的事件
all_detected_events = []
# 遍历每个检测规则
for rule in rules:
# 获取规则名称
rule_name = rule['name']
# 获取规则严重性
severity = rule['severity']
# 获取规则的SQL查询
query = rule['query']
# 执行SQL查询并将结果存储到DataFrame中
detected_events = pd.read_sql_query(query, conn)
# 添加规则名称列到DataFrame
detected_events['RuleName'] = rule_name
# 添加严重性列到DataFrame
detected_events['Severity'] = severity
# 将当前规则检测到的事件添加到列表中
all_detected_events.append(detected_events)
# 关闭数据库连接
conn.close()
# 合并所有检测到的事件为一个DataFrame并返回如果没有检测到事件则返回空DataFrame
return pd.concat(all_detected_events, ignore_index=True) if all_detected_events else pd.DataFrame()
def download_geolite_db(geolite_db_path):
@ -234,26 +271,22 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
- include_flattened_data: 是否包含展平后的数据
- geolite_db_path: GeoLite2数据库文件路径
"""
global start_time, end_time
start_time = time.time()
temp_dir = ".temp"
if output is None or output == "":
output = os.path.splitext(auditfile)[0]
global start_time, end_time # 声明全局变量start_time和end_time
start_time = time.time() # 记录开始时间
temp_dir = ".temp" # 设置临时目录路径
if output is None or output == "": # 如果输出目录未指定或为空
output = os.path.splitext(auditfile)[0] # 使用审计文件的基础名称作为输出目录
try:
os.makedirs(output, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)
if not os.path.exists(geolite_db_path):
download_geolite_db(geolite_db_path)
json_file = convert_csv(auditfile, temp_dir)
input_file = json_file
db_name = os.path.join(temp_dir, 'audit_data.db')
if rule_file is None:
rule_file = 'O365_detection_rules.json'
output_file = f"{output}_o365_report.xlsx"
os.makedirs(output, exist_ok=True) # 创建输出目录,如果不存在则创建
os.makedirs(temp_dir, exist_ok=True) # 创建临时目录,如果不存在则创建
if not os.path.exists(geolite_db_path): # 如果GeoLite2数据库文件不存在
download_geolite_db(geolite_db_path) # 下载GeoLite2数据库
json_file = convert_csv(auditfile, temp_dir) # 将CSV文件转换为JSON文件
input_file = json_file # 设置输入文件路径为转换后的JSON文件
db_name = os.path.join(temp_dir, 'audit_data.db') # 设置SQLite数据库文件路径
if rule_file is None: # 如果规则文件未指定
rule_file = 'O365_detection_rules.json' # 使用默认的规则文件名
output_file = f"{output}_o365_report.xlsx" # 设置输出的Excel报告文件路径
# 展平JSON数据并处理时间戳
flattened_df = flatten_json_file(input_file, timezone)

@ -75,22 +75,27 @@ def optimised_search(DB,output=""):
'falsepositives': [], 'level': [], 'rule': [], 'id': [], 'filename': []}
# 遍历所有的规则,执行查询并收集结果
for usecase in rules:
# 获取规则中的查询语句
query = usecase["rule"]
detected_events=search_db(query, DB)
# 执行查询,获取检测到的事件
detected_events = search_db(query, DB)
# 如果没有检测到事件,跳过当前规则
if len(detected_events) == 0:
continue
for detected in detected_events :
# 遍历检测到的事件
for detected in detected_events:
# 遍历所有字段,将规则中的信息填充到检测结果中
for field in Detections:
if field in usecase:
# print(usecase)
# 如果规则中包含该字段,添加到检测结果中
if isinstance(usecase[field], list):
Detections[field].append(",".join(usecase[field]))
else:
Detections[field].append(usecase[field])
else:
# 如果规则中不包含该字段,根据字段类型填充默认值
if field == "Original_Event_Log":
Detections['Original_Event_Log'].append(str(detected[0]))
elif field == "DateTime":
@ -98,13 +103,15 @@ def optimised_search(DB,output=""):
else:
Detections[field].append(" ")
# 将检测结果转换为DataFrame
Report = pd.DataFrame(Detections)
# 统计每个规则的检测次数
grouped = Report['title'].value_counts()
# 获取数据库游标
cursor = DBconn.cursor()
# 写入Excel文件
writer = pd.ExcelWriter(output+'_'+'Detections.xlsx', engine='xlsxwriter', options={'encoding': 'utf-8'})
# 写入Excel文件
writer = pd.ExcelWriter(output + '_' + 'Detections.xlsx', engine='xlsxwriter', options={'encoding': 'utf-8'})
grouped.to_excel(writer, sheet_name='Result Summary')
Report.to_excel(writer, sheet_name='Detailed Report', index=False)
writer.book.use_zip64()
@ -115,34 +122,42 @@ def optimised_search(DB,output=""):
#print('Done in {:.4f} seconds'.format(toc - tic))
def auto_detect(path):
global input_timezone
global input_timezone # 声明全局变量 input_timezone
if os.path.isdir(path):
files=list(libPath(path).rglob("*.[eE][vV][tT][xX]"))
if os.path.isdir(path): # 检查路径是否为目录
files = list(libPath(path).rglob("*.[eE][vV][tT][xX]")) # 获取目录下所有.evtx文件
elif os.path.isfile(path):
files=glob.glob(path)
elif os.path.isfile(path): # 检查路径是否为文件
files = glob.glob(path) # 获取文件路径
else:
print("Issue with the path" )
print("Issue with the path") # 打印路径错误信息
return
return files
return files # 返回文件列表
def Create_DB(db):
# Connect to SQLite database
# 连接到SQLite数据库
conn = sqlite3.connect(db)
# 将Alldata字典转换为DataFrame
Events = pd.DataFrame(Alldata)
# 获取数据库游标
c = conn.cursor()
Create="CREATE TABLE IF NOT EXISTS Events ( "
# 创建Events表的SQL语句
Create = "CREATE TABLE IF NOT EXISTS Events ( "
# 遍历Alldata字典的键添加字段到创建表的SQL语句中
for key in Alldata.keys():
Create+="\'"+key+"\'"+" TEXT COLLATE NOCASE,"
Create+="ID INTEGER, PRIMARY KEY(ID AUTOINCREMENT) )"
#print(Create)
Index="""CREATE INDEX IF NOT EXISTS "EVENTID_INDEX" ON "Events" ("EventID");"""
Create += "\'" + key + "\'" + " TEXT COLLATE NOCASE,"
# 添加ID字段作为主键
Create += "ID INTEGER, PRIMARY KEY(ID AUTOINCREMENT) )"
# 执行创建表的SQL语句
c.execute(Create)
# 创建索引的SQL语句
Index = """CREATE INDEX IF NOT EXISTS "EVENTID_INDEX" ON "Events" ("EventID");"""
# 执行创建索引的SQL语句
c.execute(Index)
# 关闭游标
c.close()
@ -159,31 +174,44 @@ Fields={}
def RulesToDB(rules_file,DB):
# 打开规则文件并加载JSON数据
with open(rules_file) as f:
rules = json.load(f)
# Connect to SQLite database
# 连接到SQLite数据库
conn = sqlite3.connect(DB)
c = conn.cursor()
# 初始化一个字典,用于存储规则的各个字段
Detections = {'title': [], 'id': [], 'status': [], 'description': [], 'author': [], 'tags': [],
'falsepositives': [], 'level': [], 'rule': [], 'filename': []}
# 遍历所有规则
for usecase in rules:
# 遍历每个字段
for field in Detections:
if field in usecase:
# print(usecase)
# 如果字段在规则中存在
if isinstance(usecase[field], list):
# 如果字段值是列表,将列表转换为逗号分隔的字符串
Detections[field].append(",".join(usecase[field]))
else:
# 否则,直接添加字段值
Detections[field].append(usecase[field])
else:
# 如果字段在规则中不存在,添加空字符串
Detections[field].append("")
print("Number of rules "+str(len(Detections["rule"])))
# 打印规则的数量
print("Number of rules " + str(len(Detections["rule"])))
# 将规则数据转换为DataFrame
Report = pd.DataFrame(Detections)
# 将规则数据插入到数据库的Rules表中
Report.to_sql('Rules', conn, if_exists='append', index=False)
# 提交事务并关闭数据库连接
conn.commit()
conn.close()

Loading…
Cancel
Save