Compare commits

..

No commits in common. 'main' and 'NineLamp' have entirely different histories.

@ -1,24 +1,15 @@
#!/bin/bash #!/bin/bash
# 检查脚本是否只有一个参数输入
if [ "$#" -ne 1 ]; then if [ "$#" -ne 1 ]; then
echo "Please enter rules path as argument " echo "Please enter rules path as argument "
exit 1 exit 1
fi fi
# 输出正在克隆Sigma转换工具的信息
echo "Getting Sigma Converter Toot" echo "Getting Sigma Converter Toot"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/legacy-sigmatools.git git clone https://github.com/SigmaHQ/legacy-sigmatools.git
# 输出正在转换sigma规则的信息
echo "Converting sigma rules " echo "Converting sigma rules "
# 执行Sigma转换工具将sigma规则文件转换为json格式
# --recurse: 递归处理指定目录下的所有规则文件
# --target sqlite: 指定转换的目标格式为sqlite
# --backend-option table=Events: 指定输出的表名为Events
# -d $1: 指定sigma规则文件的目录为脚本的第一个参数
# -c lib/config/sigma-converter-rules-config.yml: 指定配置文件路径
# -o rules.json: 指定输出文件名为rules.json
# --output-fields: 指定输出的字段内容
legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d $1 -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d $1 -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status
# 输出转换完成的信息,包括生成的文件名
echo "Rules created with file name : rules.json " echo "Rules created with file name : rules.json "

@ -1,23 +1,11 @@
#!/bin/bash #!/bin/bash
# 输出转换完成的信息,包括生成的文件名
echo "Getting Sigma Converter Toot" echo "Getting Sigma Converter Toot"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/legacy-sigmatools.git git clone https://github.com/SigmaHQ/legacy-sigmatools.git
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
echo "Getting Sigma Rules" echo "Getting Sigma Rules"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/sigma.git git clone https://github.com/SigmaHQ/sigma.git
# 输出正在转换sigma规则的信息
echo "Converting sigma rules " echo "Converting sigma rules "
# 执行Sigma转换工具将sigma规则文件转换为json格式
# --recurse: 递归处理指定目录下的所有规则文件
# --target sqlite: 指定转换的目标格式为sqlite
# --backend-option table=Events: 指定输出的表名为Events
# -d sigma/rules/windows/: 指定sigma规则文件的目录为sigma仓库中的windows规则目录
# -c lib/config/sigma-converter-rules-config.yml: 指定配置文件路径
# -o rules.json: 指定输出文件名为rules.json
# --output-fields: 指定输出的字段内容
legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d sigma/rules/windows/ -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d sigma/rules/windows/ -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status
# 输出转换完成的信息,包括生成的文件名
echo "Rules created with file name : rules.json " echo "Rules created with file name : rules.json "

@ -1,100 +1,99 @@
{ [
"description": "此 JSON 文件包含与 O365 安全检测相关的规则,每条规则包括名称、严重性等级和查询语句。", {
"rules": [ "name": "Suspicious User Agent",
{ "severity": "High",
"name": "Suspicious User Agent", "query": "SELECT * FROM events WHERE UserAgent LIKE '%python%' OR UserAgent LIKE '%ruler%' OR UserAgent LIKE '%curl%' OR UserAgent LIKE '%Wget%' OR UserAgent LIKE '%python-requests%' OR UserAgent LIKE '%AADInternals%' OR UserAgent LIKE '%azurehound%' OR UserAgent LIKE '%axios%' OR UserAgent LIKE '%BAV2ROPC%' "
"severity": "High", },
"query": "SELECT * FROM events WHERE UserAgent LIKE '%python%' OR UserAgent LIKE '%ruler%' OR UserAgent LIKE '%curl%' OR UserAgent LIKE '%Wget%' OR UserAgent LIKE '%python-requests%' OR UserAgent LIKE '%AADInternals%' OR UserAgent LIKE '%azurehound%' OR UserAgent LIKE '%axios%' OR UserAgent LIKE '%BAV2ROPC%' " {
}, "name": "User adding or removing Inbox Rule",
{ "severity": "Medium",
"name": "User adding or removing Inbox Rule", "query": "SELECT * FROM events WHERE Operation LIKE '%InboxRule%' OR Operation LIKE 'Set-Mailbox' OR Operation LIKE '%DeliverToMailboxAndForward%' OR Operation LIKE '%ForwardingAddress%' OR Operation LIKE '%ForwardingAddress%' "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE Operation LIKE '%InboxRule%' OR Operation LIKE 'Set-Mailbox' OR Operation LIKE '%DeliverToMailboxAndForward%' OR Operation LIKE '%ForwardingAddress%' OR Operation LIKE '%ForwardingAddress%' " {
}, "name": "After Hours Activity",
{ "severity": "Medium",
"name": "After Hours Activity", "query": "SELECT * FROM events WHERE (CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END >= 20 OR CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END < 6) AND NOT (Operation LIKE 'File%' OR Operation LIKE 'List%' OR Operation LIKE 'Page%' OR Operation LIKE '%UserLogin%');"
"severity": "Medium", },
"query": "SELECT * FROM events WHERE (CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END >= 20 OR CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END < 6) AND NOT (Operation LIKE 'File%' OR Operation LIKE 'List%' OR Operation LIKE 'Page%' OR Operation LIKE '%UserLogin%');" {
}, "name": "Possible file exfiltration",
{ "severity": "Low",
"name": "Possible file exfiltration", "query": "SELECT * FROM events WHERE Operation LIKE '%FileUploaded%' "
"severity": "Low", },
"query": "SELECT * FROM events WHERE Operation LIKE '%FileUploaded%' " {
}, "name": "Admin searching in emails of other users",
{ "severity": "Low",
"name": "Admin searching in emails of other users", "query": "SELECT * FROM events WHERE Operation LIKE '%SearchStarted%' OR Operation LIKE '%SearchExportDownloaded%' OR Operation LIKE '%ViewedSearchExported%' "
"severity": "Low", },
"query": "SELECT * FROM events WHERE Operation LIKE '%SearchStarted%' OR Operation LIKE '%SearchExportDownloaded%' OR Operation LIKE '%ViewedSearchExported%' " {
}, "name": "Strong Authentication Disabled",
{ "severity": "medium",
"name": "Strong Authentication Disabled", "query": "SELECT * FROM events WHERE Operation LIKE '%disable strong authentication%'"
"severity": "medium", },
"query": "SELECT * FROM events WHERE Operation LIKE '%disable strong authentication%'" {
}, "name": "User added to admin group",
{ "severity": "High",
"name": "User added to admin group", "query": "SELECT * FROM events WHERE ( Operation LIKE '%add member to group%' AND ModifiedProperties Like '%admin%') OR ( Operation LIKE '%AddedToGroup%' AND TargetUserOrGroupName Like '%admin%') "
"severity": "High", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%add member to group%' AND ModifiedProperties Like '%admin%') OR ( Operation LIKE '%AddedToGroup%' AND TargetUserOrGroupName Like '%admin%') " {
}, "name": "New Policy created",
{ "severity": "Medium",
"name": "New Policy created", "query": "SELECT * FROM events WHERE ( Operation LIKE '%add policy%' ) "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%add policy%' ) " {
}, "name": "Security Alert triggered",
{ "severity": "Medium",
"name": "Security Alert triggered", "query": "SELECT * FROM events WHERE ( Operation LIKE '%AlertTriggered%' AND NOT Severity Like '%Low%') "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%AlertTriggered%' AND NOT Severity Like '%Low%') " {
}, "name": "Transport rules ( mail flow rules ) modified",
{ "severity": "High",
"name": "Transport rules ( mail flow rules ) modified", "query": "SELECT * FROM events WHERE ( Operation LIKE '%TransportRule%') "
"severity": "High", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%TransportRule%') " {
}, "name": "An application was registered in Azure AD",
{ "severity": "Medium",
"name": "An application was registered in Azure AD", "query": "SELECT * FROM events WHERE ( Operation LIKE '%Add service principal.%') "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Add service principal.%') " {
}, "name": "Add app role assignment grant to user",
{ "severity": "Medium",
"name": "Add app role assignment grant to user", "query": "SELECT * FROM events WHERE ( Operation LIKE '%Add app role assignment grant to user.%') "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Add app role assignment grant to user.%') " {
}, "name": "eDiscovery Abuse",
{ "severity": "High",
"name": "eDiscovery Abuse", "query": "SELECT * FROM events WHERE ( Operation LIKE '%New-ComplianceSearch%') "
"severity": "High", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%New-ComplianceSearch%') " {
}, "name": "Operations affecting OAuth Applications",
{ "severity": "Medium",
"name": "Operations affecting OAuth Applications", "query": "SELECT * FROM events WHERE ( Operation = 'Add application.' OR Operation = 'Update application' OR Operation = 'Add service principal.' OR Operation = 'Update application Certificates and secrets management' OR Operation = 'Update applicationUpdate service principal.' OR Operation = 'Add app role assignment grant to user.' OR Operation = 'Add delegated permission grant.' OR Operation = 'Add owner to application.' OR Operation = 'Add owner to service principal.') "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation = 'Add application.' OR Operation = 'Update application' OR Operation = 'Add service principal.' OR Operation = 'Update application Certificates and secrets management' OR Operation = 'Update applicationUpdate service principal.' OR Operation = 'Add app role assignment grant to user.' OR Operation = 'Add delegated permission grant.' OR Operation = 'Add owner to application.' OR Operation = 'Add owner to service principal.') " {
}, "name": "Suspicious Operations affecting Mailbox ",
{ "severity": "Medium",
"name": "Suspicious Operations affecting Mailbox ", "query": "SELECT * FROM events WHERE ( Operation = 'Set-MailboxJunkEmailConfiguration' OR Operation = 'SoftDelete' OR Operation = 'SendAs' OR Operation = 'HardDelete' OR Operation = 'MoveToDeletedItems' ) "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation = 'Set-MailboxJunkEmailConfiguration' OR Operation = 'SoftDelete' OR Operation = 'SendAs' OR Operation = 'HardDelete' OR Operation = 'MoveToDeletedItems' ) " {
}, "name": "Suspicious Operations affecting SharePoint ",
{ "severity": "Medium",
"name": "Suspicious Operations affecting SharePoint ", "query": "SELECT * FROM events WHERE ( Operation = 'AddedToSecureLink' OR Operation = 'SearchQueryPerformed' OR Operation = 'SecureLinkCreated' OR Operation = 'SecureLinkUpdated' OR Operation = 'SharingInvitationCreated' ) "
"severity": "Medium", },
"query": "SELECT * FROM events WHERE ( Operation = 'AddedToSecureLink' OR Operation = 'SearchQueryPerformed' OR Operation = 'SecureLinkCreated' OR Operation = 'SecureLinkUpdated' OR Operation = 'SharingInvitationCreated' ) " {
}, "name": "User Modifying RetentionPolicy ",
{ "severity": "High",
"name": "User Modifying RetentionPolicy ", "query": "SELECT * FROM events WHERE ( Operation LIKE '%UnifiedAuditLogRetentionPolicy%' ) "
"severity": "High", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%UnifiedAuditLogRetentionPolicy%' ) " {
}, "name": "User Modifying Audit Logging ",
{ "severity": "High",
"name": "User Modifying Audit Logging ", "query": "SELECT * FROM events WHERE ( Operation LIKE '%AdminAuditLogConfig%' ) "
"severity": "High", },
"query": "SELECT * FROM events WHERE ( Operation LIKE '%AdminAuditLogConfig%' ) " {
}, "name": "String Authentication Disabled ",
{ "severity": "High",
"name": "String Authentication Disabled ", "query": "SELECT * FROM events WHERE ( Operation LIKE '%Disable Strong Authentication.%' ) "
"severity": "High", }
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Disable Strong Authentication.%' ) "
}
] ]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

@ -1,75 +1,72 @@
import csv import csv
import re import re
from netaddr import * # 导入netaddr库的所有内容用于处理网络地址 from netaddr import *
import xml.etree.ElementTree as ET # XML解析器 import xml.etree.ElementTree as ET
import pandas as pd # 数据分析库 import pandas as pd
from datetime import datetime, timezone # 日期时间处理 from datetime import datetime , timezone
from evtx import PyEvtxParser # 解析Windows事件日志文件的库 from evtx import PyEvtxParser
from dateutil.parser import parse, isoparse # 解析日期时间字符串 from dateutil.parser import parse
from pytz import timezone # 处理时区 from dateutil.parser import isoparse
minlength = 1000 # 可能用于某个字符串长度的检查,但在这个文件中未使用 from pytz import timezone
# 初始化一个字典列表,用于存储猎取的事件信息 minlength=1000
Hunting_events = [{'Date and Time': [], 'timestamp': [], 'Channel': [], 'Computer': [], 'Event ID': [], 'Original Event Log': []}]
# 正则表达式用于从事件日志中提取特定信息 Hunting_events=[{'Date and Time':[],'timestamp':[],'Channel':[],'Computer':[],'Event ID':[],'Original Event Log':[]}]
EventID_rex = re.compile('<EventID.*>(.*)<\/EventID>', re.IGNORECASE) EventID_rex = re.compile('<EventID.*>(.*)<\/EventID>', re.IGNORECASE)
Channel_rex = re.compile('<Channel.*>(.*)<\/Channel>', re.IGNORECASE) Channel_rex = re.compile('<Channel.*>(.*)<\/Channel>', re.IGNORECASE)
Computer_rex = re.compile('<Computer.*>(.*)<\/Computer>', re.IGNORECASE) Computer_rex = re.compile('<Computer.*>(.*)<\/Computer>', re.IGNORECASE)
def Evtx_hunt(files, str_regexes, eid, input_timzone, output, timestart, timeend): def Evtx_hunt(files,str_regexes,eid,input_timzone,output,timestart,timeend):
"""
解析并搜索Windows事件日志文件中的特定事件
参数:
- files: 要解析的事件日志文件列表
- str_regexes: 用于匹配事件数据的正则表达式列表
- eid: 事件ID如果提供则只搜索此ID的事件
- input_timzone: 输入日志的时区
- output: 输出文件名
- timestart, timeend: 搜索时间范围
"""
for file in files: for file in files:
file = str(file) file=str(file)
print("Analyzing " + file) print("Analyzing "+file)
try: try:
parser = PyEvtxParser(file) parser = PyEvtxParser(file)
except: except:
print("Issue analyzing " + file + "\nplease check if its not corrupted") print("Issue analyzing "+file +"\nplease check if its not corrupted")
continue continue
try:
for record in parser.records():
try: for record in parser.records():
# 提取事件ID
EventID = EventID_rex.findall(record['data']) EventID = EventID_rex.findall(record['data'])
# 如果提供了时间范围,则检查事件是否在该范围内
if timestart is not None and timeend is not None: if timestart is not None and timeend is not None:
timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())) timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))
if not (timestamp > timestart and timestamp < timeend): if not (timestamp > timestart and timestamp < timeend):
continue # 事件不在时间范围内,跳过 return
# 如果有EventID并且匹配eid如果eid不为None if len(EventID) > 0:
if len(EventID) > 0 and (eid is None or EventID[0] == eid): if eid is not None and EventID[0]!=eid:
continue
Computer = Computer_rex.findall(record['data']) Computer = Computer_rex.findall(record['data'])
Channel = Channel_rex.findall(record['data']) Channel = Channel_rex.findall(record['data'])
channel = Channel[0] if len(Channel) > 0 else " " if len(Channel)>0:
# 遍历所有提供的正则表达式 channel=Channel[0]
else:
channel=" "
#print(record['data'])
# if record['data'].lower().find(str_regex.lower())>-1:
#print(str_regexes)
for str_regex in str_regexes: for str_regex in str_regexes:
rex = re.compile(str_regex, re.IGNORECASE) rex=re.compile(str_regex, re.IGNORECASE)
#print(rex)
#print(rex.findall(record['data']))
if rex.findall(record['data']): if rex.findall(record['data']):
# 如果匹配到正则表达式,记录事件信息 #print("EventID : "+EventID[0]+" , Data : "+record['data'])
Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))) Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())))
Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat()) Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat())
Hunting_events[0]['Channel'].append(channel) Hunting_events[0]['Channel'].append(channel)
Hunting_events[0]['Event ID'].append(EventID[0]) Hunting_events[0]['Event ID'].append(EventID[0])
Hunting_events[0]['Computer'].append(Computer[0]) Hunting_events[0]['Computer'].append(Computer[0])
Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " ")) Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " "))
except Exception as e: except Exception as e:
print("issue searching log : " + record['data'] + "\n Error : " + str(e)) # 修正了错误的打印函数调用 print("issue searching log : "+record['data']+"\n Error : "+print(e))
hunt_report(output) hunt_report(output)
def hunt_report(output): def hunt_report(output):
"""
生成猎取事件的报告
参数:
- output: 输出CSV文件的前缀
"""
global Hunting_events global Hunting_events
Events = pd.DataFrame(Hunting_events[0]) Events = pd.DataFrame(Hunting_events[0])
print("Found " + str(len(Hunting_events[0]["timestamp"])) + " Events") print("Found "+str(len(Hunting_events[0]["timestamp"]))+" Events")
Events.to_csv(output + "_hunting.csv", index=False) Events.to_csv(output+"_hunting.csv", index=False)

@ -1,29 +1,31 @@
import json # 导入用于处理JSON数据的模块 import json
import sqlite3 # 导入用于操作SQLite数据库的模块 import sqlite3
import tempfile # 导入用于创建临时文件和目录的模块 import tempfile
import os # 导入用于操作系统功能的模块 import os
import time # 导入用于处理时间相关功能的模块 import time
import pandas as pd # 导入用于数据处理和分析的Pandas库 import pandas as pd
import geoip2.database # 导入用于GeoLite2数据库的GeoIP2库 import geoip2.database
import requests # 导入用于发送HTTP请求的模块 import requests
from dateutil import parser, tz # 导入用于解析和处理日期时间的模块 from dateutil import parser, tz
from pathlib import Path # 导入用于处理文件路径的模块 import pandas as pd
import json
# 初始化全局变量用于计时 import csv
start_time = 0 from pathlib import Path
end_time = 0
start_time=0
# SQL查询语句用于检测密码喷洒攻击 end_time=0
password_spray_query = ''' password_spray_query = '''
WITH FailedLogins AS ( WITH FailedLogins AS (
SELECT SELECT
UserId, UserId,
ClientIP, ClientIP,
datetime(CreationTime) AS LoginDate datetime(CreationTime) AS LoginDate
FROM FROM
events events
WHERE WHERE
Operation = 'UserLoginFailed' Operation = 'UserLoginFailed'
) )
SELECT SELECT
UserId, UserId,
@ -31,18 +33,18 @@ SELECT
COUNT(DISTINCT ClientIP) AS UniqueIPCount, COUNT(DISTINCT ClientIP) AS UniqueIPCount,
COUNT(*) AS FailedLoginAttempts, COUNT(*) AS FailedLoginAttempts,
LoginDate LoginDate
FROM FROM
FailedLogins FailedLogins
GROUP BY GROUP BY
UserId, UserId,
strftime('%Y-%m-%d %H', LoginDate) strftime('%Y-%m-%d %H', LoginDate)
HAVING HAVING
COUNT(*) > 5 AND UniqueIPCount > 3 COUNT(*) > 5 AND UniqueIPCount > 3
ORDER BY ORDER BY
FailedLoginAttempts DESC; FailedLoginAttempts DESC;
''' '''
# SQL查询语句用于跟踪用户登录活动
user_logon_query = ''' user_logon_query = '''
SELECT SELECT
UserId, UserId,
@ -50,19 +52,18 @@ SELECT
COUNT(*) AS TotalLoginAttempts, COUNT(*) AS TotalLoginAttempts,
SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins, SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins,
SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins
FROM FROM
events events
WHERE where
Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed' Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed'
GROUP BY GROUP BY
UserId, UserId,
LoginDate LoginDate
ORDER BY ORDER BY
LoginDate, LoginDate,
UserId; UserId;
''' '''
# SQL查询语句用于统计用户执行的操作
User_operations_query = ''' User_operations_query = '''
SELECT SELECT
UserId, UserId,
@ -76,13 +77,12 @@ ORDER BY
OperationCount DESC; OperationCount DESC;
''' '''
# SQL查询语句用于按天统计用户操作
user_operation_by_day_query = ''' user_operation_by_day_query = '''
SELECT SELECT
UserId, UserId,
DATE(CreationTime) AS OperationDate, DATE(CreationTime) AS OperationDate,
COUNT(DISTINCT Operation) AS OperationCount, COUNT(DISTINCT Operation) AS OperationCount,
GROUP_CONCAT(Operation, ', ') AS UniqueOperations GROUP_CONCAT( Operation, ', ') AS UniqueOperations
FROM FROM
events events
GROUP BY GROUP BY
@ -92,224 +92,175 @@ ORDER BY
OperationCount DESC OperationCount DESC
''' '''
def convert_csv(input_file, temp):
""" def convert_csv(input_file,temp):
将CSV文件转换为JSON格式的文件 with open(input_file, 'r', encoding='utf-8') as csv_file:
# Create a CSV reader
参数:
- input_file: 输入的CSV文件路径
- temp: 临时目录路径
返回:
- json_file: 生成的JSON文件路径
"""
# 创建一个新的JSON文件路径结合临时目录和文件名
json_file = os.path.join(temp, 'audit_data.json')
# 同时打开输入的CSV文件进行读取和新的JSON文件进行写入设置编码为UTF-8
# 使用上下文管理器确保文件正确关闭
with open(input_file, 'r', encoding='utf-8') as csv_file, open(json_file, 'w', encoding='utf-8') as jsonl_file:
# 使用csv.DictReader来读取CSV文件每行会转换为字典
reader = csv.DictReader(csv_file) reader = csv.DictReader(csv_file)
# 迭代读取CSV文件的每一行
for row in reader: json_file = 'audit_data.json'
# 将CSV文件中'AuditData'字段的字符串解析为JSON对象 json_file=os.path.join(temp, json_file)
json_data = json.loads(row['AuditData']) with open(json_file, 'w', encoding='utf-8') as jsonl_file:
# 将JSON对象再次转换为字符串 # Extract and write the AuditData column to a file as JSON Lines
json_string = json.dumps(json_data) for row in reader:
# 将转换后的JSON字符串写入json文件每行一个JSON对象以换行符结束 # Extract the AuditData which is already a JSON formatted string
jsonl_file.write(json_string + '\n') json_data = json.loads(row['AuditData'])
# 返回新创建的JSON文件的路径 # Convert the JSON object back to a string to store in the file
json_string = json.dumps(json_data)
# Write the JSON string to the file with a newline
jsonl_file.write(json_string + '\n')
return json_file return json_file
def flatten_json_file(input_file, timezone, chunk_size=10000): def flatten_json_file(input_file, timezone, chunk_size=10000):
""" # Read the JSON file in chunks
将JSON文件展平并处理时间戳
参数:
- input_file: 输入的JSON文件路径
- timezone: 目标时区
- chunk_size: 处理的块大小
返回:
- DataFrame: 展平后的数据
"""
# 初始化一个空列表用于存储数据块
chunks = [] chunks = []
# 打开输入的JSON文件进行读取
with open(input_file, 'r') as file: with open(input_file, 'r') as file:
# 读取所有行到一个列表中
lines = file.readlines() lines = file.readlines()
# 按块大小迭代处理行
for i in range(0, len(lines), chunk_size): for i in range(0, len(lines), chunk_size):
# 将当前块的每一行解析为JSON对象
chunk = [json.loads(line) for line in lines[i:i + chunk_size]] chunk = [json.loads(line) for line in lines[i:i + chunk_size]]
# 处理每个记录
# Convert the CreationTime to the desired timezone
for record in chunk: for record in chunk:
# 如果记录中包含'CreationTime'字段
if 'CreationTime' in record: if 'CreationTime' in record:
# 解析'CreationTime'字段为日期时间对象 # Parse the CreationTime
creation_time = parser.parse(record['CreationTime']) creation_time = parser.parse(record['CreationTime'])
# 如果日期时间对象没有时区信息设置为UTC
# Check if the datetime object is timezone aware
if creation_time.tzinfo is None: if creation_time.tzinfo is None:
# Assume the original time is in UTC if no timezone info is present
creation_time = creation_time.replace(tzinfo=tz.tzutc()) creation_time = creation_time.replace(tzinfo=tz.tzutc())
# 将日期时间对象转换为目标时区并格式化为ISO格式字符串
# Convert the CreationTime to the desired timezone
record['CreationTime'] = creation_time.astimezone(timezone).isoformat() record['CreationTime'] = creation_time.astimezone(timezone).isoformat()
# 将当前块展平并添加到数据块列表中
chunks.append(pd.json_normalize(chunk)) chunks.append(pd.json_normalize(chunk))
# 合并所有数据块为一个DataFrame并返回
return pd.concat(chunks, ignore_index=True) # Concatenate all chunks into a single DataFrame
flattened_records = pd.concat(chunks, ignore_index=True)
return flattened_records
def create_sqlite_db_from_dataframe(dataframe, db_name): def create_sqlite_db_from_dataframe(dataframe, db_name):
"""
从Pandas DataFrame创建SQLite数据库
参数:
- dataframe: 包含数据的Pandas DataFrame
- db_name: SQLite数据库文件名
"""
# 连接到SQLite数据库如果数据库不存在则会创建
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
# 将DataFrame中的所有列转换为字符串类型
# Convert all columns to string
dataframe = dataframe.astype(str) dataframe = dataframe.astype(str)
# 将DataFrame写入SQLite数据库中的'table'表,如果表已存在则替换
# Write the DataFrame to SQLite, treating all fields as text
dataframe.to_sql('events', conn, if_exists='replace', index=False, dataframe.to_sql('events', conn, if_exists='replace', index=False,
dtype={col_name: 'TEXT' for col_name in dataframe.columns}) dtype={col_name: 'TEXT' for col_name in dataframe.columns})
# 关闭数据库连接
conn.close()
def read_detection_rules(rule_file): conn.close()
"""
从文件中读取检测规则
参数:
- rule_file: 包含检测规则的JSON文件路径
返回: def read_detection_rules(rule_file):
- rules: 规则列表
"""
with open(rule_file, 'r') as file: with open(rule_file, 'r') as file:
return json.load(file) rules = json.load(file)
return rules
def apply_detection_logic_sqlite(db_name, rules):
"""
应用检测逻辑到SQLite数据库
参数:
- db_name: SQLite数据库文件名
- rules: 检测规则列表
返回: def apply_detection_logic_sqlite(db_name, rules):
- DataFrame: 检测到的异常事件
"""
# 连接到SQLite数据库
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
# 初始化一个空列表用于存储所有检测到的事件
all_detected_events = [] all_detected_events = []
# 遍历每个检测规则
for rule in rules: for rule in rules:
# 获取规则名称
rule_name = rule['name'] rule_name = rule['name']
# 获取规则严重性
severity = rule['severity'] severity = rule['severity']
# 获取规则的SQL查询
query = rule['query'] query = rule['query']
# 执行SQL查询并将结果存储到DataFrame中
detected_events = pd.read_sql_query(query, conn) detected_events = pd.read_sql_query(query, conn)
# 添加规则名称列到DataFrame
detected_events['RuleName'] = rule_name detected_events['RuleName'] = rule_name
# 添加严重性列到DataFrame
detected_events['Severity'] = severity detected_events['Severity'] = severity
# 将当前规则检测到的事件添加到列表中
all_detected_events.append(detected_events) all_detected_events.append(detected_events)
# 关闭数据库连接
conn.close() conn.close()
# 合并所有检测到的事件为一个DataFrame并返回如果没有检测到事件则返回空DataFrame
return pd.concat(all_detected_events, ignore_index=True) if all_detected_events else pd.DataFrame()
def download_geolite_db(geolite_db_path): if all_detected_events:
""" result = pd.concat(all_detected_events, ignore_index=True)
下载GeoLite2数据库用于IP地理定位 else:
result = pd.DataFrame()
参数: return result
- geolite_db_path: 保存GeoLite2数据库的路径
""" def download_geolite_db(geolite_db_path):
url = "https://git.io/GeoLite2-Country.mmdb" url = "https://git.io/GeoLite2-Country.mmdb"
print(f"Downloading GeoLite2 database from {url}...") print(f"Downloading GeoLite2 database from {url}...")
response = requests.get(url) response = requests.get(url)
response.raise_for_status() response.raise_for_status() # Check if the download was successful
with open(geolite_db_path, 'wb') as file: with open(geolite_db_path, 'wb') as file:
file.write(response.content) file.write(response.content)
print(f"GeoLite2 database downloaded and saved to {geolite_db_path}") print(f"GeoLite2 database downloaded and saved to {geolite_db_path}")
def get_country_from_ip(ip, reader): def get_country_from_ip(ip, reader):
"""
根据IP地址获取国家名称
参数:
- ip: IP地址
- reader: GeoLite2数据库的读取器
返回:
- str: 国家名称或'Unknown'如果无法解析
"""
try: try:
return reader.country(ip).country.name response = reader.country(ip)
return response.country.name
except Exception as e: except Exception as e:
print(f"Could not resolve IP {ip}: {e}") #print(f"Could not resolve IP {ip}: {e}")
return 'Unknown' return 'Unknown'
def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False, def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False,
geolite_db_path='GeoLite2-Country.mmdb'): geolite_db_path='GeoLite2-Country.mmdb'):
""" start_time = time.time()
分析Office 365审计日志并生成报告 temp_dir = ".temp"
if output is None or output == "":
参数: output = os.path.splitext(auditfile)[0]
- auditfile: Office 365审计日志文件路径
- rule_file: 检测规则文件路径
- output: 输出目录
- timezone: 目标时区
- include_flattened_data: 是否包含展平后的数据
- geolite_db_path: GeoLite2数据库文件路径
"""
global start_time, end_time # 声明全局变量start_time和end_time
start_time = time.time() # 记录开始时间
temp_dir = ".temp" # 设置临时目录路径
if output is None or output == "": # 如果输出目录未指定或为空
output = os.path.splitext(auditfile)[0] # 使用审计文件的基础名称作为输出目录
try: try:
os.makedirs(output, exist_ok=True) # 创建输出目录,如果不存在则创建 # Create necessary directories
os.makedirs(temp_dir, exist_ok=True) # 创建临时目录,如果不存在则创建 os.makedirs(output, exist_ok=True)
if not os.path.exists(geolite_db_path): # 如果GeoLite2数据库文件不存在 os.makedirs(temp_dir, exist_ok=True)
download_geolite_db(geolite_db_path) # 下载GeoLite2数据库
json_file = convert_csv(auditfile, temp_dir) # 将CSV文件转换为JSON文件 # Check if the GeoLite2 database exists, and download it if not
input_file = json_file # 设置输入文件路径为转换后的JSON文件 if not os.path.exists(geolite_db_path):
db_name = os.path.join(temp_dir, 'audit_data.db') # 设置SQLite数据库文件路径 download_geolite_db(geolite_db_path)
if rule_file is None: # 如果规则文件未指定
rule_file = 'O365_detection_rules.json' # 使用默认的规则文件名 # Convert CSV to JSON (assuming convert_csv is a valid function that you have)
output_file = f"{output}_o365_report.xlsx" # 设置输出的Excel报告文件路径 json_file = convert_csv(auditfile, temp_dir)
# 展平JSON数据并处理时间戳 # Input and output file paths
input_file = json_file
db_name = os.path.join(temp_dir, 'audit_data.db')
if rule_file is None:
rule_file = 'O365_detection_rules.json'
output_file = f"{output}_o365_report.xlsx"
# Measure the start time
# Flatten the JSON file
flattened_df = flatten_json_file(input_file, timezone) flattened_df = flatten_json_file(input_file, timezone)
# 创建SQLite数据库 # Create SQLite database from the flattened DataFrame
create_sqlite_db_from_dataframe(flattened_df, db_name) create_sqlite_db_from_dataframe(flattened_df, db_name)
# 使用GeoLite2数据库解析IP地址 # Open the GeoLite2 database
with geoip2.database.Reader(geolite_db_path) as reader: with geoip2.database.Reader(geolite_db_path) as reader:
# Resolve ClientIP to country names
if 'ClientIP' in flattened_df.columns: if 'ClientIP' in flattened_df.columns:
flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader)) flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader))
# 读取检测规则并应用 # Read detection rules
rules = read_detection_rules(rule_file) rules = read_detection_rules(rule_file)
# Apply detection logic using SQLite
detected_events = apply_detection_logic_sqlite(db_name, rules) detected_events = apply_detection_logic_sqlite(db_name, rules)
# 重新排序DataFrame列以便RuleName在前 # Reorder columns to make RuleName the first column
if not detected_events.empty: if not detected_events.empty:
columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if col not in ['RuleName', 'Severity']] columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if
col not in ['RuleName', 'Severity']]
detected_events = detected_events[columns] detected_events = detected_events[columns]
# 执行其他SQL查询 # Perform the brute-force detection query
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
try: try:
user_login_tracker_df = pd.read_sql_query(user_logon_query, conn) user_login_tracker_df = pd.read_sql_query(user_logon_query, conn)
password_spray_df = pd.read_sql_query(password_spray_query, conn) password_spray_df = pd.read_sql_query(password_spray_query, conn)
@ -318,19 +269,20 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
finally: finally:
conn.close() conn.close()
# 生成Excel报告 # Create a new workbook with the detection results
with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer: with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
if include_flattened_data: if include_flattened_data:
# 将展平后的数据分成多个工作表 # Split the flattened data into multiple sheets if needed
max_rows_per_sheet = 65000 max_rows_per_sheet = 65000
num_sheets = len(flattened_df) // max_rows_per_sheet + 1 num_sheets = len(flattened_df) // max_rows_per_sheet + 1
for i in range(num_sheets): for i in range(num_sheets):
start_row = i * max_rows_per_sheet start_row = i * max_rows_per_sheet
end_row = (i + 1) * max_rows_per_sheet end_row = (i + 1) * max_rows_per_sheet
sheet_name = f'Flattened Data {i + 1}' sheet_name = f'Flattened Data {i + 1}'
flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False) flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False)
# 写入各种统计信息到不同的工作表 # Write statistics for various fields
detected_events.to_excel(writer, sheet_name='Detection Results', index=False) detected_events.to_excel(writer, sheet_name='Detection Results', index=False)
user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False) user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False)
password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False) password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False)
@ -341,8 +293,10 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats') flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats')
flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats') flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats')
flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats') flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats')
flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, sheet_name='AuthenticationType Stats') flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer,
sheet_name='AuthenticationType Stats')
# Measure the end time
end_time = time.time() end_time = time.time()
print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds") print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds")
@ -350,12 +304,18 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
print(f"An error occurred during the analysis: {e}") print(f"An error occurred during the analysis: {e}")
finally: finally:
# 清理临时目录 #Clean up the temporary directory
if os.path.exists(temp_dir): if os.path.exists(temp_dir):
for file in Path(temp_dir).glob('*'): for file in Path(temp_dir).glob('*'):
file.unlink() file.unlink() # Delete the file
os.rmdir(temp_dir) os.rmdir(temp_dir) # Remove the directory
# Write the User Login Tracker results to a new sheet
# Measure the end time
end_time = time.time() end_time = time.time()
# Calculate and print the running time
running_time = end_time - start_time running_time = end_time - start_time
print(f"Office365 hunter finished in time: {running_time:.2f} seconds") print(f"Office365 hunter finished in time: {running_time:.2f} seconds")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 236 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 222 KiB

@ -1,107 +1,101 @@
# 尝试创建一个名为 "wineventlog" 的目录 try{
try { New-Item -ItemType "directory" -Path "wineventlog"
New-Item -ItemType "directory" -Path "wineventlog"
} }
catch { catch
# 如果创建目录失败,输出错误信息 {
echo "can't create a new directory" echo "can't create a new directory"
} }
# 尝试获取安全日志并导出为 CSV 文件 try{
try { get-eventlog -log Security | export-csv wineventlog/Security.csv
get-eventlog -log Security | export-csv wineventlog/Security.csv
} }
catch { catch
# 如果获取安全日志失败,输出错误信息 {
echo "Can't retrieve Security Logs" echo "Can't retrieve Security Logs"
} }
# 尝试获取系统日志并导出为 CSV 文件 try
try { {
Get-WinEvent -LogName System | export-csv wineventlog/System.csv Get-WinEvent -LogName System | export-csv wineventlog/System.csv
} }
catch { catch
# 如果获取系统日志失败,输出错误信息 {
echo "Can't retrieve System Logs" echo "Can't retrieve System Logs"
} }
# 尝试获取应用程序日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName Application | export-csv wineventlog/Application.csv
Get-WinEvent -LogName Application | export-csv wineventlog/Application.csv
} }
catch { catch
# 如果获取应用程序日志失败,输出错误信息 {
echo "Can't retrieve Application Logs" echo "Can't retrieve Application Logs"
} }
# 尝试获取 Windows PowerShell 日志并导出为 CSV 文件
try { try{
Get-WinEvent -LogName "Windows PowerShell" | export-csv wineventlog/Windows_PowerShell.csv Get-WinEvent -LogName "Windows PowerShell" | export-csv wineventlog/Windows_PowerShell.csv
} }
catch { catch
# 如果获取 Windows PowerShell 日志失败,输出错误信息 {
echo "Can't retrieve Windows PowerShell Logs" echo "Can't retrieve Windows PowerShell Logs"
} }
# 尝试获取 Microsoft-Windows-TerminalServices-LocalSessionManager/Operational 日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" | export-csv wineventlog/LocalSessionManager.csv
Get-WinEvent -LogName "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" | export-csv wineventlog/LocalSessionManager.csv
} }
catch { catch
# 如果获取 LocalSessionManager 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs" echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-Windows Defender/Operational 日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName "Microsoft-Windows-Windows Defender/Operational" | export-csv wineventlog/Windows_Defender.csv
Get-WinEvent -LogName "Microsoft-Windows-Windows Defender/Operational" | export-csv wineventlog/Windows_Defender.csv
} }
catch { catch
# 如果获取 Windows Defender 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs" echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-TaskScheduler/Operational 日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName Microsoft-Windows-TaskScheduler/Operational | export-csv wineventlog/TaskScheduler.csv
Get-WinEvent -LogName Microsoft-Windows-TaskScheduler/Operational | export-csv wineventlog/TaskScheduler.csv
} }
catch { catch
# 如果获取 TaskScheduler 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs" echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-WinRM/Operational 日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName Microsoft-Windows-WinRM/Operational | export-csv wineventlog/WinRM.csv
Get-WinEvent -LogName Microsoft-Windows-WinRM/Operational | export-csv wineventlog/WinRM.csv
} }
catch { catch
# 如果获取 WinRM 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs" echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-Sysmon/Operational 日志并导出为 CSV 文件 try{
try { Get-WinEvent -LogName Microsoft-Windows-Sysmon/Operational | export-csv wineventlog/Sysmon.csv
Get-WinEvent -LogName Microsoft-Windows-Sysmon/Operational | export-csv wineventlog/Sysmon.csv
} }
catch { catch
# 如果获取 Sysmon 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs" echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-PowerShell/Operational 日志并导出为 CSV 文件
try { try{
Get-WinEvent -LogName Microsoft-Windows-PowerShell/Operational | export-csv wineventlog/Powershell_Operational.csv Get-WinEvent -LogName Microsoft-Windows-PowerShell/Operational | export-csv wineventlog/Powershell_Operational.csv
} }
catch { catch
# 如果获取 PowerShell Operational 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs" echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs"
} }
# 尝试压缩 "wineventlog" 目录为 logs.zip
try { try
Compress-Archive -Path wineventlog -DestinationPath ./logs.zip {
Compress-Archive -Path wineventlog -DestinationPath ./logs.zip
} }
catch { catch
# 如果压缩失败,输出错误信息 {
echo "couldn't compress the log folder" echo "couldn't compress the the log folder "
} }

@ -1,107 +1,101 @@
# 尝试创建一个名为 "wineventlog" 的目录 try{
try { New-Item -ItemType "directory" -Path "wineventlog"
New-Item -ItemType "directory" -Path "wineventlog"
} }
catch { catch
# 如果创建目录失败,输出错误信息 {
echo "can't create a new directory" echo "can't create a new directory"
} }
# 尝试导出安全日志到指定的 EVTX 文件 try{
try { wevtutil epl Security wineventlog/Security.evtx
wevtutil epl Security wineventlog/Security.evtx
} }
catch { catch
# 如果导出安全日志失败,输出错误信息 {
echo "Can't retrieve Security Logs" echo "Can't retrieve Security Logs"
} }
# 尝试导出系统日志到指定的 EVTX 文件 try
try { {
wevtutil epl System wineventlog/System.evtx wevtutil epl System wineventlog/System.evtx
} }
catch { catch
# 如果导出系统日志失败,输出错误信息 {
echo "Can't retrieve System Logs" echo "Can't retrieve System Logs"
} }
# 尝试导出应用程序日志到指定的 EVTX 文件 try{
try { wevtutil epl Application wineventlog/Application.evtx
wevtutil epl Application wineventlog/Application.evtx
} }
catch { catch
# 如果导出应用程序日志失败,输出错误信息 {
echo "Can't retrieve Application Logs" echo "Can't retrieve Application Logs"
} }
# 尝试导出 Windows PowerShell 日志到指定的 EVTX 文件
try { try{
wevtutil epl "Windows PowerShell" wineventlog/Windows_PowerShell.evtx wevtutil epl "Windows PowerShell" wineventlog/Windows_PowerShell.evtx
} }
catch { catch
# 如果导出 Windows PowerShell 日志失败,输出错误信息 {
echo "Can't retrieve Windows PowerShell Logs" echo "Can't retrieve Windows PowerShell Logs"
} }
# 尝试导出 Microsoft-Windows-TerminalServices-LocalSessionManager/Operational 日志到指定的 EVTX 文件 try{
try { wevtutil epl "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" wineventlog/LocalSessionManager.evtx
wevtutil epl "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" wineventlog/LocalSessionManager.evtx
} }
catch { catch
# 如果导出 LocalSessionManager 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs" echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-Windows Defender/Operational 日志到指定的 EVTX 文件 try{
try { wevtutil epl "Microsoft-Windows-Windows Defender/Operational" wineventlog/Windows_Defender.evtx
wevtutil epl "Microsoft-Windows-Windows Defender/Operational" wineventlog/Windows_Defender.evtx
} }
catch { catch
# 如果导出 Windows Defender 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs" echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-TaskScheduler/Operational 日志到指定的 EVTX 文件 try{
try { wevtutil epl Microsoft-Windows-TaskScheduler/Operational wineventlog/TaskScheduler.evtx
wevtutil epl Microsoft-Windows-TaskScheduler/Operational wineventlog/TaskScheduler.evtx
} }
catch { catch
# 如果导出 TaskScheduler 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs" echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-WinRM/Operational 日志到指定的 EVTX 文件 try{
try { wevtutil epl Microsoft-Windows-WinRM/Operational wineventlog/WinRM.evtx
wevtutil epl Microsoft-Windows-WinRM/Operational wineventlog/WinRM.evtx
} }
catch { catch
# 如果导出 WinRM 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs" echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-Sysmon/Operational 日志到指定的 EVTX 文件 try{
try { wevtutil epl Microsoft-Windows-Sysmon/Operational wineventlog/Sysmon.evtx
wevtutil epl Microsoft-Windows-Sysmon/Operational wineventlog/Sysmon.evtx
} }
catch { catch
# 如果导出 Sysmon 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs" echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-PowerShell/Operational 日志到指定的 EVTX 文件
try { try{
wevtutil epl Microsoft-Windows-PowerShell/Operational wineventlog/Powershell_Operational.evtx wevtutil epl Microsoft-Windows-PowerShell/Operational wineventlog/Powershell_Operational.evtx
} }
catch { catch
# 如果导出 PowerShell Operational 日志失败,输出错误信息 {
echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs" echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs"
} }
# 尝试压缩 "wineventlog" 目录为 logs.zip
try { try
Compress-Archive -Path wineventlog -DestinationPath ./logs.zip {
} Compress-Archive -Path wineventlog -DestinationPath ./logs.zip
catch { }
# 如果压缩失败,输出错误信息 catch
echo "couldn't compress the log folder" {
} echo "couldn't compress the the log folder "
}

@ -0,0 +1,38 @@
一、源代码结构与功能
APT-Hunter的源代码主要由Python编写包含多个模块和脚本用于实现日志收集、解析、分析以及结果输出等功能。
日志收集:
源代码中包含了用于收集Windows事件日志的PowerShell脚本windows-log-collector-full-v3-CSV && windows-log-collector-full-v3-EVTX脚本能够提取CSV和EVTX格式的日志。
用户可以通过运行这些脚本来自动收集所需的日志,而无需手动查找和提取。
日志解析:
APT-Hunter使用内置库如csv库来解析CSV日志文件使用外部库如evtx库来解析EVTX日志文件。
解析过程中APT-Hunter会使用正则表达式Regex为每个事件提取字段以便后续分析。
日志分析:
源代码中包含了用于分析日志的逻辑这些逻辑基于Mitre ATT&CK战术和技术将攻击指标映射到Windows事件日志中。
分析过程中APT-Hunter会检测各种可疑活动如恶意软件的安装、未授权的网络连接等并生成相应的报告。
结果输出:
分析结果可以以Excel工作表和CSV文件的形式输出便于用户查看和分析。
其中Excel工作表包含了从每个Windows日志中检测到的所有事件而CSV文件则可以用于时间线分析。
二、关键模块与代码分析
日志收集模块:
该模块主要包含PowerShell脚本用于从Windows系统中提取日志。
脚本中使用了Windows事件日志API或PowerShell命令来获取日志数据并将其保存为CSV或EVTX格式。
日志解析模块:
该模块使用Python编写包含了用于解析CSV和EVTX日志文件的函数。
在解析CSV文件时使用了Python的csv库来读取文件并提取字段。
在解析EVTX文件时使用了外部库如pyevtx来读取文件并解析事件。
日志分析模块:
该模块是APT-Hunter的核心部分包含了用于检测可疑活动的逻辑。
逻辑中定义了多个检测规则这些规则基于Mitre ATT&CK战术和技术用于识别各种APT攻击指标。
分析过程中APT-Hunter会遍历日志文件中的事件并根据检测规则进行判断和分类。
结果输出模块:
该模块负责将分析结果输出为用户可读的格式。
在输出Excel工作表时使用了Python的pandas库来创建和填充工作表。
在输出CSV文件时则直接使用了Python的文件操作函数来写入数据。
三、技术亮点与优势
高效性APT-Hunter能够快速地收集、解析和分析大量的Windows事件日志提高了威胁检测的效率和准确性。
易用性:该工具提供了友好的用户界面和简洁的操作流程,使得用户能够轻松上手并快速掌握其使用方法。
兼容性APT-Hunter支持多种格式的日志解析和输出配置使得用户能够灵活地将其集成到现有的安全监控系统中。
开源性作为一款开源工具APT-Hunter的源代码是公开的用户可以根据需要进行二次开发或定制。
四、结论与展望
通过对APT-Hunter源代码的分析可以看出该工具在Windows事件日志的威胁搜寻方面具有较高的效率和准确性。其友好的用户界面、简洁的操作流程以及灵活的日志解析和输出配置使得用户能够轻松地使用该工具进行威胁检测和分析。然而随着APT攻击的不断发展和变化APT-Hunter也需要不断更新和完善其检测规则和功能以应对新的威胁和挑战。未来可以进一步优化APT-Hunter的性能和效率提高其适用性和易用性并探索与其他安全监控系统的集成和联动以实现更加全面和高效的安全防护。
Loading…
Cancel
Save