Compare commits

..

28 Commits

Author SHA1 Message Date
lancymorry f238aeaf05 项目阅读报告和代码注释均已完成,有劳过目
7 months ago
郭吉民 32e37585cd from 郭吉民
7 months ago
DengYanjia 4cbd982f74 from 邓焱嘉
7 months ago
lly 71f46cb052 from 李林原
7 months ago
lly 1f9b203953 from 李林原
7 months ago
lly 3356a0b7bc from 李林原
7 months ago
DengYanjia 2b5e393a58 from 邓焱嘉
7 months ago
林协鸿 4629e94830 from 林协鸿
7 months ago
郭吉民 7e5af04ca5 by 郭吉民
7 months ago
黎星辰 09007270c3 from 黎星辰
7 months ago
lancymorry aadd24d025 重新把所有代码文件清空了
7 months ago
pex7hfbnt 58eb17b849 Merge pull request 'Linn' (#8) from testL into main
7 months ago
linxiehong 8ad48ca390 Fin
7 months ago
pex7hfbnt 340a7f96e3 issue5
7 months ago
郭吉民 fc00532c2d guojimin
7 months ago
pex7hfbnt e222c39967 Delete 'lly.txt'
7 months ago
pex7hfbnt bf5a070658 issue2
7 months ago
pex7hfbnt 8bc3507637 issue4
7 months ago
pex7hfbnt e9cff0e553 Merge pull request '为了方便点击所以不得不取了一个非常长的标题.jpg' (#4) from NineLamp into main
7 months ago
lly 7424167841 finish
7 months ago
DengYanjia 602d857b22 2024.12.17第二次修改
8 months ago
郭吉民 541df41774 到此一游
8 months ago
pex7hfbnt 865d77c51c Merge pull request '1' (#3) from NineLamp into main
8 months ago
2813826100@qq.com ca84da7bc3 try
8 months ago
lly c32c950dc4 lly first time
8 months ago
linxiehong 19501d4439 123 added
8 months ago
linxiehong 2cb80c58a1 123456
8 months ago
DengYanjia 51da9a2377 2024.12.16第一次修改_AizenSousuke
8 months ago

@ -1,15 +1,24 @@
#!/bin/bash #!/bin/bash
# 检查脚本是否只有一个参数输入
if [ "$#" -ne 1 ]; then if [ "$#" -ne 1 ]; then
echo "Please enter rules path as argument " echo "Please enter rules path as argument "
exit 1 exit 1
fi fi
# 输出正在克隆Sigma转换工具的信息
echo "Getting Sigma Converter Toot" echo "Getting Sigma Converter Toot"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/legacy-sigmatools.git git clone https://github.com/SigmaHQ/legacy-sigmatools.git
# 输出正在转换sigma规则的信息
echo "Converting sigma rules " echo "Converting sigma rules "
# 执行Sigma转换工具将sigma规则文件转换为json格式
# --recurse: 递归处理指定目录下的所有规则文件
# --target sqlite: 指定转换的目标格式为sqlite
# --backend-option table=Events: 指定输出的表名为Events
# -d $1: 指定sigma规则文件的目录为脚本的第一个参数
# -c lib/config/sigma-converter-rules-config.yml: 指定配置文件路径
# -o rules.json: 指定输出文件名为rules.json
# --output-fields: 指定输出的字段内容
legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d $1 -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d $1 -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status
# 输出转换完成的信息,包括生成的文件名
echo "Rules created with file name : rules.json " echo "Rules created with file name : rules.json "

@ -1,11 +1,23 @@
#!/bin/bash #!/bin/bash
# 输出转换完成的信息,包括生成的文件名
echo "Getting Sigma Converter Toot" echo "Getting Sigma Converter Toot"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/legacy-sigmatools.git git clone https://github.com/SigmaHQ/legacy-sigmatools.git
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
echo "Getting Sigma Rules" echo "Getting Sigma Rules"
# 使用git克隆SigmaHQ的legacy-sigmatools仓库到当前目录
git clone https://github.com/SigmaHQ/sigma.git git clone https://github.com/SigmaHQ/sigma.git
# 输出正在转换sigma规则的信息
echo "Converting sigma rules " echo "Converting sigma rules "
# 执行Sigma转换工具将sigma规则文件转换为json格式
# --recurse: 递归处理指定目录下的所有规则文件
# --target sqlite: 指定转换的目标格式为sqlite
# --backend-option table=Events: 指定输出的表名为Events
# -d sigma/rules/windows/: 指定sigma规则文件的目录为sigma仓库中的windows规则目录
# -c lib/config/sigma-converter-rules-config.yml: 指定配置文件路径
# -o rules.json: 指定输出文件名为rules.json
# --output-fields: 指定输出的字段内容
legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d sigma/rules/windows/ -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status legacy-sigmatools/tools/sigmac --recurse --target sqlite --backend-option table=Events --output-format json -d sigma/rules/windows/ -c lib/config/sigma-converter-rules-config.yml -o rules.json --output-fields title,id,description,author,tags,level,falsepositives,filename,status
# 输出转换完成的信息,包括生成的文件名
echo "Rules created with file name : rules.json " echo "Rules created with file name : rules.json "

@ -1,99 +1,100 @@
[ {
{ "description": "此 JSON 文件包含与 O365 安全检测相关的规则,每条规则包括名称、严重性等级和查询语句。",
"name": "Suspicious User Agent", "rules": [
"severity": "High", {
"query": "SELECT * FROM events WHERE UserAgent LIKE '%python%' OR UserAgent LIKE '%ruler%' OR UserAgent LIKE '%curl%' OR UserAgent LIKE '%Wget%' OR UserAgent LIKE '%python-requests%' OR UserAgent LIKE '%AADInternals%' OR UserAgent LIKE '%azurehound%' OR UserAgent LIKE '%axios%' OR UserAgent LIKE '%BAV2ROPC%' " "name": "Suspicious User Agent",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE UserAgent LIKE '%python%' OR UserAgent LIKE '%ruler%' OR UserAgent LIKE '%curl%' OR UserAgent LIKE '%Wget%' OR UserAgent LIKE '%python-requests%' OR UserAgent LIKE '%AADInternals%' OR UserAgent LIKE '%azurehound%' OR UserAgent LIKE '%axios%' OR UserAgent LIKE '%BAV2ROPC%' "
"name": "User adding or removing Inbox Rule", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE Operation LIKE '%InboxRule%' OR Operation LIKE 'Set-Mailbox' OR Operation LIKE '%DeliverToMailboxAndForward%' OR Operation LIKE '%ForwardingAddress%' OR Operation LIKE '%ForwardingAddress%' " "name": "User adding or removing Inbox Rule",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE Operation LIKE '%InboxRule%' OR Operation LIKE 'Set-Mailbox' OR Operation LIKE '%DeliverToMailboxAndForward%' OR Operation LIKE '%ForwardingAddress%' OR Operation LIKE '%ForwardingAddress%' "
"name": "After Hours Activity", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE (CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END >= 20 OR CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END < 6) AND NOT (Operation LIKE 'File%' OR Operation LIKE 'List%' OR Operation LIKE 'Page%' OR Operation LIKE '%UserLogin%');" "name": "After Hours Activity",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE (CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END >= 20 OR CASE WHEN CAST(substr(CreationTime, 12, 2) AS INTEGER) < 0 THEN 24 + (CAST(substr(CreationTime, 12, 2) AS INTEGER)) ELSE CAST(substr(CreationTime, 12, 2) AS INTEGER) END < 6) AND NOT (Operation LIKE 'File%' OR Operation LIKE 'List%' OR Operation LIKE 'Page%' OR Operation LIKE '%UserLogin%');"
"name": "Possible file exfiltration", },
"severity": "Low", {
"query": "SELECT * FROM events WHERE Operation LIKE '%FileUploaded%' " "name": "Possible file exfiltration",
}, "severity": "Low",
{ "query": "SELECT * FROM events WHERE Operation LIKE '%FileUploaded%' "
"name": "Admin searching in emails of other users", },
"severity": "Low", {
"query": "SELECT * FROM events WHERE Operation LIKE '%SearchStarted%' OR Operation LIKE '%SearchExportDownloaded%' OR Operation LIKE '%ViewedSearchExported%' " "name": "Admin searching in emails of other users",
}, "severity": "Low",
{ "query": "SELECT * FROM events WHERE Operation LIKE '%SearchStarted%' OR Operation LIKE '%SearchExportDownloaded%' OR Operation LIKE '%ViewedSearchExported%' "
"name": "Strong Authentication Disabled", },
"severity": "medium", {
"query": "SELECT * FROM events WHERE Operation LIKE '%disable strong authentication%'" "name": "Strong Authentication Disabled",
}, "severity": "medium",
{ "query": "SELECT * FROM events WHERE Operation LIKE '%disable strong authentication%'"
"name": "User added to admin group", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%add member to group%' AND ModifiedProperties Like '%admin%') OR ( Operation LIKE '%AddedToGroup%' AND TargetUserOrGroupName Like '%admin%') " "name": "User added to admin group",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%add member to group%' AND ModifiedProperties Like '%admin%') OR ( Operation LIKE '%AddedToGroup%' AND TargetUserOrGroupName Like '%admin%') "
"name": "New Policy created", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%add policy%' ) " "name": "New Policy created",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%add policy%' ) "
"name": "Security Alert triggered", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%AlertTriggered%' AND NOT Severity Like '%Low%') " "name": "Security Alert triggered",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%AlertTriggered%' AND NOT Severity Like '%Low%') "
"name": "Transport rules ( mail flow rules ) modified", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%TransportRule%') " "name": "Transport rules ( mail flow rules ) modified",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%TransportRule%') "
"name": "An application was registered in Azure AD", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Add service principal.%') " "name": "An application was registered in Azure AD",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%Add service principal.%') "
"name": "Add app role assignment grant to user", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Add app role assignment grant to user.%') " "name": "Add app role assignment grant to user",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%Add app role assignment grant to user.%') "
"name": "eDiscovery Abuse", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%New-ComplianceSearch%') " "name": "eDiscovery Abuse",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%New-ComplianceSearch%') "
"name": "Operations affecting OAuth Applications", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation = 'Add application.' OR Operation = 'Update application' OR Operation = 'Add service principal.' OR Operation = 'Update application Certificates and secrets management' OR Operation = 'Update applicationUpdate service principal.' OR Operation = 'Add app role assignment grant to user.' OR Operation = 'Add delegated permission grant.' OR Operation = 'Add owner to application.' OR Operation = 'Add owner to service principal.') " "name": "Operations affecting OAuth Applications",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation = 'Add application.' OR Operation = 'Update application' OR Operation = 'Add service principal.' OR Operation = 'Update application Certificates and secrets management' OR Operation = 'Update applicationUpdate service principal.' OR Operation = 'Add app role assignment grant to user.' OR Operation = 'Add delegated permission grant.' OR Operation = 'Add owner to application.' OR Operation = 'Add owner to service principal.') "
"name": "Suspicious Operations affecting Mailbox ", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation = 'Set-MailboxJunkEmailConfiguration' OR Operation = 'SoftDelete' OR Operation = 'SendAs' OR Operation = 'HardDelete' OR Operation = 'MoveToDeletedItems' ) " "name": "Suspicious Operations affecting Mailbox ",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation = 'Set-MailboxJunkEmailConfiguration' OR Operation = 'SoftDelete' OR Operation = 'SendAs' OR Operation = 'HardDelete' OR Operation = 'MoveToDeletedItems' ) "
"name": "Suspicious Operations affecting SharePoint ", },
"severity": "Medium", {
"query": "SELECT * FROM events WHERE ( Operation = 'AddedToSecureLink' OR Operation = 'SearchQueryPerformed' OR Operation = 'SecureLinkCreated' OR Operation = 'SecureLinkUpdated' OR Operation = 'SharingInvitationCreated' ) " "name": "Suspicious Operations affecting SharePoint ",
}, "severity": "Medium",
{ "query": "SELECT * FROM events WHERE ( Operation = 'AddedToSecureLink' OR Operation = 'SearchQueryPerformed' OR Operation = 'SecureLinkCreated' OR Operation = 'SecureLinkUpdated' OR Operation = 'SharingInvitationCreated' ) "
"name": "User Modifying RetentionPolicy ", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%UnifiedAuditLogRetentionPolicy%' ) " "name": "User Modifying RetentionPolicy ",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%UnifiedAuditLogRetentionPolicy%' ) "
"name": "User Modifying Audit Logging ", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%AdminAuditLogConfig%' ) " "name": "User Modifying Audit Logging ",
}, "severity": "High",
{ "query": "SELECT * FROM events WHERE ( Operation LIKE '%AdminAuditLogConfig%' ) "
"name": "String Authentication Disabled ", },
"severity": "High", {
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Disable Strong Authentication.%' ) " "name": "String Authentication Disabled ",
} "severity": "High",
"query": "SELECT * FROM events WHERE ( Operation LIKE '%Disable Strong Authentication.%' ) "
}
] ]
}

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

@ -1,72 +1,75 @@
import csv import csv
import re import re
from netaddr import * from netaddr import * # 导入netaddr库的所有内容用于处理网络地址
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET # XML解析器
import pandas as pd import pandas as pd # 数据分析库
from datetime import datetime , timezone from datetime import datetime, timezone # 日期时间处理
from evtx import PyEvtxParser from evtx import PyEvtxParser # 解析Windows事件日志文件的库
from dateutil.parser import parse from dateutil.parser import parse, isoparse # 解析日期时间字符串
from dateutil.parser import isoparse from pytz import timezone # 处理时区
from pytz import timezone minlength = 1000 # 可能用于某个字符串长度的检查,但在这个文件中未使用
minlength=1000 # 初始化一个字典列表,用于存储猎取的事件信息
Hunting_events = [{'Date and Time': [], 'timestamp': [], 'Channel': [], 'Computer': [], 'Event ID': [], 'Original Event Log': []}]
Hunting_events=[{'Date and Time':[],'timestamp':[],'Channel':[],'Computer':[],'Event ID':[],'Original Event Log':[]}] # 正则表达式用于从事件日志中提取特定信息
EventID_rex = re.compile('<EventID.*>(.*)<\/EventID>', re.IGNORECASE) EventID_rex = re.compile('<EventID.*>(.*)<\/EventID>', re.IGNORECASE)
Channel_rex = re.compile('<Channel.*>(.*)<\/Channel>', re.IGNORECASE) Channel_rex = re.compile('<Channel.*>(.*)<\/Channel>', re.IGNORECASE)
Computer_rex = re.compile('<Computer.*>(.*)<\/Computer>', re.IGNORECASE) Computer_rex = re.compile('<Computer.*>(.*)<\/Computer>', re.IGNORECASE)
def Evtx_hunt(files,str_regexes,eid,input_timzone,output,timestart,timeend): def Evtx_hunt(files, str_regexes, eid, input_timzone, output, timestart, timeend):
"""
解析并搜索Windows事件日志文件中的特定事件
参数:
- files: 要解析的事件日志文件列表
- str_regexes: 用于匹配事件数据的正则表达式列表
- eid: 事件ID如果提供则只搜索此ID的事件
- input_timzone: 输入日志的时区
- output: 输出文件名
- timestart, timeend: 搜索时间范围
"""
for file in files: for file in files:
file=str(file) file = str(file)
print("Analyzing "+file) print("Analyzing " + file)
try: try:
parser = PyEvtxParser(file) parser = PyEvtxParser(file)
except: except:
print("Issue analyzing "+file +"\nplease check if its not corrupted") print("Issue analyzing " + file + "\nplease check if its not corrupted")
continue continue
try:
for record in parser.records():
for record in parser.records(): try:
# 提取事件ID
EventID = EventID_rex.findall(record['data']) EventID = EventID_rex.findall(record['data'])
# 如果提供了时间范围,则检查事件是否在该范围内
if timestart is not None and timeend is not None: if timestart is not None and timeend is not None:
timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())) timestamp = datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))
if not (timestamp > timestart and timestamp < timeend): if not (timestamp > timestart and timestamp < timeend):
return continue # 事件不在时间范围内,跳过
if len(EventID) > 0: # 如果有EventID并且匹配eid如果eid不为None
if eid is not None and EventID[0]!=eid: if len(EventID) > 0 and (eid is None or EventID[0] == eid):
continue
Computer = Computer_rex.findall(record['data']) Computer = Computer_rex.findall(record['data'])
Channel = Channel_rex.findall(record['data']) Channel = Channel_rex.findall(record['data'])
if len(Channel)>0: channel = Channel[0] if len(Channel) > 0 else " "
channel=Channel[0] # 遍历所有提供的正则表达式
else:
channel=" "
#print(record['data'])
# if record['data'].lower().find(str_regex.lower())>-1:
#print(str_regexes)
for str_regex in str_regexes: for str_regex in str_regexes:
rex=re.compile(str_regex, re.IGNORECASE) rex = re.compile(str_regex, re.IGNORECASE)
#print(rex)
#print(rex.findall(record['data']))
if rex.findall(record['data']): if rex.findall(record['data']):
#print("EventID : "+EventID[0]+" , Data : "+record['data']) # 如果匹配到正则表达式,记录事件信息
Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat()))) Hunting_events[0]['timestamp'].append(datetime.timestamp(isoparse(parse(record["timestamp"]).astimezone(input_timzone).isoformat())))
Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat()) Hunting_events[0]['Date and Time'].append(parse(record["timestamp"]).astimezone(input_timzone).isoformat())
Hunting_events[0]['Channel'].append(channel) Hunting_events[0]['Channel'].append(channel)
Hunting_events[0]['Event ID'].append(EventID[0]) Hunting_events[0]['Event ID'].append(EventID[0])
Hunting_events[0]['Computer'].append(Computer[0]) Hunting_events[0]['Computer'].append(Computer[0])
Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " ")) Hunting_events[0]['Original Event Log'].append(str(record['data']).replace("\r", " ").replace("\n", " "))
except Exception as e: except Exception as e:
print("issue searching log : "+record['data']+"\n Error : "+print(e)) print("issue searching log : " + record['data'] + "\n Error : " + str(e)) # 修正了错误的打印函数调用
hunt_report(output) hunt_report(output)
def hunt_report(output): def hunt_report(output):
"""
生成猎取事件的报告
参数:
- output: 输出CSV文件的前缀
"""
global Hunting_events global Hunting_events
Events = pd.DataFrame(Hunting_events[0]) Events = pd.DataFrame(Hunting_events[0])
print("Found "+str(len(Hunting_events[0]["timestamp"]))+" Events") print("Found " + str(len(Hunting_events[0]["timestamp"])) + " Events")
Events.to_csv(output+"_hunting.csv", index=False) Events.to_csv(output + "_hunting.csv", index=False)

@ -1,31 +1,29 @@
import json import json # 导入用于处理JSON数据的模块
import sqlite3 import sqlite3 # 导入用于操作SQLite数据库的模块
import tempfile import tempfile # 导入用于创建临时文件和目录的模块
import os import os # 导入用于操作系统功能的模块
import time import time # 导入用于处理时间相关功能的模块
import pandas as pd import pandas as pd # 导入用于数据处理和分析的Pandas库
import geoip2.database import geoip2.database # 导入用于GeoLite2数据库的GeoIP2库
import requests import requests # 导入用于发送HTTP请求的模块
from dateutil import parser, tz from dateutil import parser, tz # 导入用于解析和处理日期时间的模块
import pandas as pd from pathlib import Path # 导入用于处理文件路径的模块
import json
import csv # 初始化全局变量用于计时
from pathlib import Path start_time = 0
end_time = 0
start_time=0
end_time=0 # SQL查询语句用于检测密码喷洒攻击
password_spray_query = ''' password_spray_query = '''
WITH FailedLogins AS ( WITH FailedLogins AS (
SELECT SELECT
UserId, UserId,
ClientIP, ClientIP,
datetime(CreationTime) AS LoginDate datetime(CreationTime) AS LoginDate
FROM FROM
events events
WHERE WHERE
Operation = 'UserLoginFailed' Operation = 'UserLoginFailed'
) )
SELECT SELECT
UserId, UserId,
@ -33,18 +31,18 @@ SELECT
COUNT(DISTINCT ClientIP) AS UniqueIPCount, COUNT(DISTINCT ClientIP) AS UniqueIPCount,
COUNT(*) AS FailedLoginAttempts, COUNT(*) AS FailedLoginAttempts,
LoginDate LoginDate
FROM FROM
FailedLogins FailedLogins
GROUP BY GROUP BY
UserId, UserId,
strftime('%Y-%m-%d %H', LoginDate) strftime('%Y-%m-%d %H', LoginDate)
HAVING HAVING
COUNT(*) > 5 AND UniqueIPCount > 3 COUNT(*) > 5 AND UniqueIPCount > 3
ORDER BY ORDER BY
FailedLoginAttempts DESC; FailedLoginAttempts DESC;
''' '''
# SQL查询语句用于跟踪用户登录活动
user_logon_query = ''' user_logon_query = '''
SELECT SELECT
UserId, UserId,
@ -52,18 +50,19 @@ SELECT
COUNT(*) AS TotalLoginAttempts, COUNT(*) AS TotalLoginAttempts,
SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins, SUM(CASE WHEN Operation = 'UserLoggedIn' THEN 1 ELSE 0 END) AS SuccessfulLogins,
SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins SUM(CASE WHEN Operation = 'UserLoginFailed' THEN 1 ELSE 0 END) AS FailedLogins
FROM FROM
events events
where WHERE
Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed' Operation = 'UserLoggedIn' OR Operation = 'UserLoginFailed'
GROUP BY GROUP BY
UserId, UserId,
LoginDate LoginDate
ORDER BY ORDER BY
LoginDate, LoginDate,
UserId; UserId;
''' '''
# SQL查询语句用于统计用户执行的操作
User_operations_query = ''' User_operations_query = '''
SELECT SELECT
UserId, UserId,
@ -77,12 +76,13 @@ ORDER BY
OperationCount DESC; OperationCount DESC;
''' '''
# SQL查询语句用于按天统计用户操作
user_operation_by_day_query = ''' user_operation_by_day_query = '''
SELECT SELECT
UserId, UserId,
DATE(CreationTime) AS OperationDate, DATE(CreationTime) AS OperationDate,
COUNT(DISTINCT Operation) AS OperationCount, COUNT(DISTINCT Operation) AS OperationCount,
GROUP_CONCAT( Operation, ', ') AS UniqueOperations GROUP_CONCAT(Operation, ', ') AS UniqueOperations
FROM FROM
events events
GROUP BY GROUP BY
@ -92,175 +92,224 @@ ORDER BY
OperationCount DESC OperationCount DESC
''' '''
def convert_csv(input_file, temp):
def convert_csv(input_file,temp): """
with open(input_file, 'r', encoding='utf-8') as csv_file: 将CSV文件转换为JSON格式的文件
# Create a CSV reader
参数:
- input_file: 输入的CSV文件路径
- temp: 临时目录路径
返回:
- json_file: 生成的JSON文件路径
"""
# 创建一个新的JSON文件路径结合临时目录和文件名
json_file = os.path.join(temp, 'audit_data.json')
# 同时打开输入的CSV文件进行读取和新的JSON文件进行写入设置编码为UTF-8
# 使用上下文管理器确保文件正确关闭
with open(input_file, 'r', encoding='utf-8') as csv_file, open(json_file, 'w', encoding='utf-8') as jsonl_file:
# 使用csv.DictReader来读取CSV文件每行会转换为字典
reader = csv.DictReader(csv_file) reader = csv.DictReader(csv_file)
# 迭代读取CSV文件的每一行
json_file = 'audit_data.json' for row in reader:
json_file=os.path.join(temp, json_file) # 将CSV文件中'AuditData'字段的字符串解析为JSON对象
with open(json_file, 'w', encoding='utf-8') as jsonl_file: json_data = json.loads(row['AuditData'])
# Extract and write the AuditData column to a file as JSON Lines # 将JSON对象再次转换为字符串
for row in reader: json_string = json.dumps(json_data)
# Extract the AuditData which is already a JSON formatted string # 将转换后的JSON字符串写入json文件每行一个JSON对象以换行符结束
json_data = json.loads(row['AuditData']) jsonl_file.write(json_string + '\n')
# Convert the JSON object back to a string to store in the file # 返回新创建的JSON文件的路径
json_string = json.dumps(json_data)
# Write the JSON string to the file with a newline
jsonl_file.write(json_string + '\n')
return json_file return json_file
def flatten_json_file(input_file, timezone, chunk_size=10000): def flatten_json_file(input_file, timezone, chunk_size=10000):
# Read the JSON file in chunks """
将JSON文件展平并处理时间戳
参数:
- input_file: 输入的JSON文件路径
- timezone: 目标时区
- chunk_size: 处理的块大小
返回:
- DataFrame: 展平后的数据
"""
# 初始化一个空列表用于存储数据块
chunks = [] chunks = []
# 打开输入的JSON文件进行读取
with open(input_file, 'r') as file: with open(input_file, 'r') as file:
# 读取所有行到一个列表中
lines = file.readlines() lines = file.readlines()
# 按块大小迭代处理行
for i in range(0, len(lines), chunk_size): for i in range(0, len(lines), chunk_size):
# 将当前块的每一行解析为JSON对象
chunk = [json.loads(line) for line in lines[i:i + chunk_size]] chunk = [json.loads(line) for line in lines[i:i + chunk_size]]
# 处理每个记录
# Convert the CreationTime to the desired timezone
for record in chunk: for record in chunk:
# 如果记录中包含'CreationTime'字段
if 'CreationTime' in record: if 'CreationTime' in record:
# Parse the CreationTime # 解析'CreationTime'字段为日期时间对象
creation_time = parser.parse(record['CreationTime']) creation_time = parser.parse(record['CreationTime'])
# 如果日期时间对象没有时区信息设置为UTC
# Check if the datetime object is timezone aware
if creation_time.tzinfo is None: if creation_time.tzinfo is None:
# Assume the original time is in UTC if no timezone info is present
creation_time = creation_time.replace(tzinfo=tz.tzutc()) creation_time = creation_time.replace(tzinfo=tz.tzutc())
# 将日期时间对象转换为目标时区并格式化为ISO格式字符串
# Convert the CreationTime to the desired timezone
record['CreationTime'] = creation_time.astimezone(timezone).isoformat() record['CreationTime'] = creation_time.astimezone(timezone).isoformat()
# 将当前块展平并添加到数据块列表中
chunks.append(pd.json_normalize(chunk)) chunks.append(pd.json_normalize(chunk))
# 合并所有数据块为一个DataFrame并返回
# Concatenate all chunks into a single DataFrame return pd.concat(chunks, ignore_index=True)
flattened_records = pd.concat(chunks, ignore_index=True)
return flattened_records
def create_sqlite_db_from_dataframe(dataframe, db_name): def create_sqlite_db_from_dataframe(dataframe, db_name):
"""
从Pandas DataFrame创建SQLite数据库
参数:
- dataframe: 包含数据的Pandas DataFrame
- db_name: SQLite数据库文件名
"""
# 连接到SQLite数据库如果数据库不存在则会创建
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
# 将DataFrame中的所有列转换为字符串类型
# Convert all columns to string
dataframe = dataframe.astype(str) dataframe = dataframe.astype(str)
# 将DataFrame写入SQLite数据库中的'table'表,如果表已存在则替换
# Write the DataFrame to SQLite, treating all fields as text
dataframe.to_sql('events', conn, if_exists='replace', index=False, dataframe.to_sql('events', conn, if_exists='replace', index=False,
dtype={col_name: 'TEXT' for col_name in dataframe.columns}) dtype={col_name: 'TEXT' for col_name in dataframe.columns})
# 关闭数据库连接
conn.close() conn.close()
def read_detection_rules(rule_file): def read_detection_rules(rule_file):
with open(rule_file, 'r') as file: """
rules = json.load(file) 从文件中读取检测规则
return rules
参数:
- rule_file: 包含检测规则的JSON文件路径
返回:
- rules: 规则列表
"""
with open(rule_file, 'r') as file:
return json.load(file)
def apply_detection_logic_sqlite(db_name, rules): def apply_detection_logic_sqlite(db_name, rules):
"""
应用检测逻辑到SQLite数据库
参数:
- db_name: SQLite数据库文件名
- rules: 检测规则列表
返回:
- DataFrame: 检测到的异常事件
"""
# 连接到SQLite数据库
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
# 初始化一个空列表用于存储所有检测到的事件
all_detected_events = [] all_detected_events = []
# 遍历每个检测规则
for rule in rules: for rule in rules:
# 获取规则名称
rule_name = rule['name'] rule_name = rule['name']
# 获取规则严重性
severity = rule['severity'] severity = rule['severity']
# 获取规则的SQL查询
query = rule['query'] query = rule['query']
# 执行SQL查询并将结果存储到DataFrame中
detected_events = pd.read_sql_query(query, conn) detected_events = pd.read_sql_query(query, conn)
# 添加规则名称列到DataFrame
detected_events['RuleName'] = rule_name detected_events['RuleName'] = rule_name
# 添加严重性列到DataFrame
detected_events['Severity'] = severity detected_events['Severity'] = severity
# 将当前规则检测到的事件添加到列表中
all_detected_events.append(detected_events) all_detected_events.append(detected_events)
# 关闭数据库连接
conn.close() conn.close()
# 合并所有检测到的事件为一个DataFrame并返回如果没有检测到事件则返回空DataFrame
if all_detected_events: return pd.concat(all_detected_events, ignore_index=True) if all_detected_events else pd.DataFrame()
result = pd.concat(all_detected_events, ignore_index=True)
else:
result = pd.DataFrame()
return result
def download_geolite_db(geolite_db_path): def download_geolite_db(geolite_db_path):
"""
下载GeoLite2数据库用于IP地理定位
参数:
- geolite_db_path: 保存GeoLite2数据库的路径
"""
url = "https://git.io/GeoLite2-Country.mmdb" url = "https://git.io/GeoLite2-Country.mmdb"
print(f"Downloading GeoLite2 database from {url}...") print(f"Downloading GeoLite2 database from {url}...")
response = requests.get(url) response = requests.get(url)
response.raise_for_status() # Check if the download was successful response.raise_for_status()
with open(geolite_db_path, 'wb') as file: with open(geolite_db_path, 'wb') as file:
file.write(response.content) file.write(response.content)
print(f"GeoLite2 database downloaded and saved to {geolite_db_path}") print(f"GeoLite2 database downloaded and saved to {geolite_db_path}")
def get_country_from_ip(ip, reader): def get_country_from_ip(ip, reader):
"""
根据IP地址获取国家名称
参数:
- ip: IP地址
- reader: GeoLite2数据库的读取器
返回:
- str: 国家名称或'Unknown'如果无法解析
"""
try: try:
response = reader.country(ip) return reader.country(ip).country.name
return response.country.name
except Exception as e: except Exception as e:
#print(f"Could not resolve IP {ip}: {e}") print(f"Could not resolve IP {ip}: {e}")
return 'Unknown' return 'Unknown'
def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False, def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data=False,
geolite_db_path='GeoLite2-Country.mmdb'): geolite_db_path='GeoLite2-Country.mmdb'):
start_time = time.time() """
temp_dir = ".temp" 分析Office 365审计日志并生成报告
if output is None or output == "":
output = os.path.splitext(auditfile)[0] 参数:
- auditfile: Office 365审计日志文件路径
- rule_file: 检测规则文件路径
- output: 输出目录
- timezone: 目标时区
- include_flattened_data: 是否包含展平后的数据
- geolite_db_path: GeoLite2数据库文件路径
"""
global start_time, end_time # 声明全局变量start_time和end_time
start_time = time.time() # 记录开始时间
temp_dir = ".temp" # 设置临时目录路径
if output is None or output == "": # 如果输出目录未指定或为空
output = os.path.splitext(auditfile)[0] # 使用审计文件的基础名称作为输出目录
try: try:
# Create necessary directories os.makedirs(output, exist_ok=True) # 创建输出目录,如果不存在则创建
os.makedirs(output, exist_ok=True) os.makedirs(temp_dir, exist_ok=True) # 创建临时目录,如果不存在则创建
os.makedirs(temp_dir, exist_ok=True) if not os.path.exists(geolite_db_path): # 如果GeoLite2数据库文件不存在
download_geolite_db(geolite_db_path) # 下载GeoLite2数据库
# Check if the GeoLite2 database exists, and download it if not json_file = convert_csv(auditfile, temp_dir) # 将CSV文件转换为JSON文件
if not os.path.exists(geolite_db_path): input_file = json_file # 设置输入文件路径为转换后的JSON文件
download_geolite_db(geolite_db_path) db_name = os.path.join(temp_dir, 'audit_data.db') # 设置SQLite数据库文件路径
if rule_file is None: # 如果规则文件未指定
# Convert CSV to JSON (assuming convert_csv is a valid function that you have) rule_file = 'O365_detection_rules.json' # 使用默认的规则文件名
json_file = convert_csv(auditfile, temp_dir) output_file = f"{output}_o365_report.xlsx" # 设置输出的Excel报告文件路径
# Input and output file paths # 展平JSON数据并处理时间戳
input_file = json_file
db_name = os.path.join(temp_dir, 'audit_data.db')
if rule_file is None:
rule_file = 'O365_detection_rules.json'
output_file = f"{output}_o365_report.xlsx"
# Measure the start time
# Flatten the JSON file
flattened_df = flatten_json_file(input_file, timezone) flattened_df = flatten_json_file(input_file, timezone)
# Create SQLite database from the flattened DataFrame # 创建SQLite数据库
create_sqlite_db_from_dataframe(flattened_df, db_name) create_sqlite_db_from_dataframe(flattened_df, db_name)
# Open the GeoLite2 database # 使用GeoLite2数据库解析IP地址
with geoip2.database.Reader(geolite_db_path) as reader: with geoip2.database.Reader(geolite_db_path) as reader:
# Resolve ClientIP to country names
if 'ClientIP' in flattened_df.columns: if 'ClientIP' in flattened_df.columns:
flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader)) flattened_df['Country'] = flattened_df['ClientIP'].apply(lambda ip: get_country_from_ip(ip, reader))
# Read detection rules # 读取检测规则并应用
rules = read_detection_rules(rule_file) rules = read_detection_rules(rule_file)
# Apply detection logic using SQLite
detected_events = apply_detection_logic_sqlite(db_name, rules) detected_events = apply_detection_logic_sqlite(db_name, rules)
# Reorder columns to make RuleName the first column # 重新排序DataFrame列以便RuleName在前
if not detected_events.empty: if not detected_events.empty:
columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if columns = ['RuleName', 'Severity'] + [col for col in detected_events.columns if col not in ['RuleName', 'Severity']]
col not in ['RuleName', 'Severity']]
detected_events = detected_events[columns] detected_events = detected_events[columns]
# Perform the brute-force detection query # 执行其他SQL查询
conn = sqlite3.connect(db_name) conn = sqlite3.connect(db_name)
try: try:
user_login_tracker_df = pd.read_sql_query(user_logon_query, conn) user_login_tracker_df = pd.read_sql_query(user_logon_query, conn)
password_spray_df = pd.read_sql_query(password_spray_query, conn) password_spray_df = pd.read_sql_query(password_spray_query, conn)
@ -269,20 +318,19 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
finally: finally:
conn.close() conn.close()
# Create a new workbook with the detection results # 生成Excel报告
with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer: with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
if include_flattened_data: if include_flattened_data:
# Split the flattened data into multiple sheets if needed # 将展平后的数据分成多个工作表
max_rows_per_sheet = 65000 max_rows_per_sheet = 65000
num_sheets = len(flattened_df) // max_rows_per_sheet + 1 num_sheets = len(flattened_df) // max_rows_per_sheet + 1
for i in range(num_sheets): for i in range(num_sheets):
start_row = i * max_rows_per_sheet start_row = i * max_rows_per_sheet
end_row = (i + 1) * max_rows_per_sheet end_row = (i + 1) * max_rows_per_sheet
sheet_name = f'Flattened Data {i + 1}' sheet_name = f'Flattened Data {i + 1}'
flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False) flattened_df.iloc[start_row:end_row].to_excel(writer, sheet_name=sheet_name, index=False)
# Write statistics for various fields # 写入各种统计信息到不同的工作表
detected_events.to_excel(writer, sheet_name='Detection Results', index=False) detected_events.to_excel(writer, sheet_name='Detection Results', index=False)
user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False) user_login_tracker_df.to_excel(writer, sheet_name='User Login Tracker', index=False)
password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False) password_spray_df.to_excel(writer, sheet_name='Password Spray Attacks', index=False)
@ -293,10 +341,8 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats') flattened_df['Country'].value_counts().to_frame().to_excel(writer, sheet_name='Country Stats')
flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats') flattened_df['UserAgent'].value_counts().to_frame().to_excel(writer, sheet_name='UserAgent Stats')
flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats') flattened_df['UserId'].value_counts().to_frame().to_excel(writer, sheet_name='UserId Stats')
flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, flattened_df['AuthenticationType'].value_counts().to_frame().to_excel(writer, sheet_name='AuthenticationType Stats')
sheet_name='AuthenticationType Stats')
# Measure the end time
end_time = time.time() end_time = time.time()
print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds") print(f"Office365 analysis finished in time: {end_time - start_time:.2f} seconds")
@ -304,18 +350,12 @@ def analyzeoff365(auditfile, rule_file, output, timezone, include_flattened_data
print(f"An error occurred during the analysis: {e}") print(f"An error occurred during the analysis: {e}")
finally: finally:
#Clean up the temporary directory # 清理临时目录
if os.path.exists(temp_dir): if os.path.exists(temp_dir):
for file in Path(temp_dir).glob('*'): for file in Path(temp_dir).glob('*'):
file.unlink() # Delete the file file.unlink()
os.rmdir(temp_dir) # Remove the directory os.rmdir(temp_dir)
# Write the User Login Tracker results to a new sheet
# Measure the end time
end_time = time.time() end_time = time.time()
# Calculate and print the running time
running_time = end_time - start_time running_time = end_time - start_time
print(f"Office365 hunter finished in time: {running_time:.2f} seconds") print(f"Office365 hunter finished in time: {running_time:.2f} seconds")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 236 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 222 KiB

@ -1,101 +1,107 @@
try{ # 尝试创建一个名为 "wineventlog" 的目录
New-Item -ItemType "directory" -Path "wineventlog" try {
New-Item -ItemType "directory" -Path "wineventlog"
} }
catch catch {
{ # 如果创建目录失败,输出错误信息
echo "can't create a new directory" echo "can't create a new directory"
} }
try{ # 尝试获取安全日志并导出为 CSV 文件
get-eventlog -log Security | export-csv wineventlog/Security.csv try {
get-eventlog -log Security | export-csv wineventlog/Security.csv
} }
catch catch {
{ # 如果获取安全日志失败,输出错误信息
echo "Can't retrieve Security Logs" echo "Can't retrieve Security Logs"
} }
try # 尝试获取系统日志并导出为 CSV 文件
{ try {
Get-WinEvent -LogName System | export-csv wineventlog/System.csv Get-WinEvent -LogName System | export-csv wineventlog/System.csv
} }
catch catch {
{ # 如果获取系统日志失败,输出错误信息
echo "Can't retrieve System Logs" echo "Can't retrieve System Logs"
} }
try{ # 尝试获取应用程序日志并导出为 CSV 文件
Get-WinEvent -LogName Application | export-csv wineventlog/Application.csv try {
Get-WinEvent -LogName Application | export-csv wineventlog/Application.csv
} }
catch catch {
{ # 如果获取应用程序日志失败,输出错误信息
echo "Can't retrieve Application Logs" echo "Can't retrieve Application Logs"
} }
# 尝试获取 Windows PowerShell 日志并导出为 CSV 文件
try{ try {
Get-WinEvent -LogName "Windows PowerShell" | export-csv wineventlog/Windows_PowerShell.csv Get-WinEvent -LogName "Windows PowerShell" | export-csv wineventlog/Windows_PowerShell.csv
} }
catch catch {
{ # 如果获取 Windows PowerShell 日志失败,输出错误信息
echo "Can't retrieve Windows PowerShell Logs" echo "Can't retrieve Windows PowerShell Logs"
} }
try{ # 尝试获取 Microsoft-Windows-TerminalServices-LocalSessionManager/Operational 日志并导出为 CSV 文件
Get-WinEvent -LogName "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" | export-csv wineventlog/LocalSessionManager.csv try {
Get-WinEvent -LogName "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" | export-csv wineventlog/LocalSessionManager.csv
} }
catch catch {
{ # 如果获取 LocalSessionManager 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs" echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs"
} }
try{ # 尝试获取 Microsoft-Windows-Windows Defender/Operational 日志并导出为 CSV 文件
Get-WinEvent -LogName "Microsoft-Windows-Windows Defender/Operational" | export-csv wineventlog/Windows_Defender.csv try {
Get-WinEvent -LogName "Microsoft-Windows-Windows Defender/Operational" | export-csv wineventlog/Windows_Defender.csv
} }
catch catch {
{ # 如果获取 Windows Defender 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs" echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs"
} }
try{ # 尝试获取 Microsoft-Windows-TaskScheduler/Operational 日志并导出为 CSV 文件
Get-WinEvent -LogName Microsoft-Windows-TaskScheduler/Operational | export-csv wineventlog/TaskScheduler.csv try {
Get-WinEvent -LogName Microsoft-Windows-TaskScheduler/Operational | export-csv wineventlog/TaskScheduler.csv
} }
catch catch {
{ # 如果获取 TaskScheduler 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs" echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs"
} }
try{ # 尝试获取 Microsoft-Windows-WinRM/Operational 日志并导出为 CSV 文件
Get-WinEvent -LogName Microsoft-Windows-WinRM/Operational | export-csv wineventlog/WinRM.csv try {
Get-WinEvent -LogName Microsoft-Windows-WinRM/Operational | export-csv wineventlog/WinRM.csv
} }
catch catch {
{ # 如果获取 WinRM 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs" echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs"
} }
try{ # 尝试获取 Microsoft-Windows-Sysmon/Operational 日志并导出为 CSV 文件
Get-WinEvent -LogName Microsoft-Windows-Sysmon/Operational | export-csv wineventlog/Sysmon.csv try {
Get-WinEvent -LogName Microsoft-Windows-Sysmon/Operational | export-csv wineventlog/Sysmon.csv
} }
catch catch {
{ # 如果获取 Sysmon 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs" echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs"
} }
# 尝试获取 Microsoft-Windows-PowerShell/Operational 日志并导出为 CSV 文件
try{ try {
Get-WinEvent -LogName Microsoft-Windows-PowerShell/Operational | export-csv wineventlog/Powershell_Operational.csv Get-WinEvent -LogName Microsoft-Windows-PowerShell/Operational | export-csv wineventlog/Powershell_Operational.csv
} }
catch catch {
{ # 如果获取 PowerShell Operational 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs" echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs"
} }
# 尝试压缩 "wineventlog" 目录为 logs.zip
try try {
{ Compress-Archive -Path wineventlog -DestinationPath ./logs.zip
Compress-Archive -Path wineventlog -DestinationPath ./logs.zip
} }
catch catch {
{ # 如果压缩失败,输出错误信息
echo "couldn't compress the the log folder " echo "couldn't compress the log folder"
} }

@ -1,101 +1,107 @@
try{ # 尝试创建一个名为 "wineventlog" 的目录
New-Item -ItemType "directory" -Path "wineventlog" try {
New-Item -ItemType "directory" -Path "wineventlog"
} }
catch catch {
{ # 如果创建目录失败,输出错误信息
echo "can't create a new directory" echo "can't create a new directory"
} }
try{ # 尝试导出安全日志到指定的 EVTX 文件
wevtutil epl Security wineventlog/Security.evtx try {
wevtutil epl Security wineventlog/Security.evtx
} }
catch catch {
{ # 如果导出安全日志失败,输出错误信息
echo "Can't retrieve Security Logs" echo "Can't retrieve Security Logs"
} }
try # 尝试导出系统日志到指定的 EVTX 文件
{ try {
wevtutil epl System wineventlog/System.evtx wevtutil epl System wineventlog/System.evtx
} }
catch catch {
{ # 如果导出系统日志失败,输出错误信息
echo "Can't retrieve System Logs" echo "Can't retrieve System Logs"
} }
try{ # 尝试导出应用程序日志到指定的 EVTX 文件
wevtutil epl Application wineventlog/Application.evtx try {
wevtutil epl Application wineventlog/Application.evtx
} }
catch catch {
{ # 如果导出应用程序日志失败,输出错误信息
echo "Can't retrieve Application Logs" echo "Can't retrieve Application Logs"
} }
# 尝试导出 Windows PowerShell 日志到指定的 EVTX 文件
try{ try {
wevtutil epl "Windows PowerShell" wineventlog/Windows_PowerShell.evtx wevtutil epl "Windows PowerShell" wineventlog/Windows_PowerShell.evtx
} }
catch catch {
{ # 如果导出 Windows PowerShell 日志失败,输出错误信息
echo "Can't retrieve Windows PowerShell Logs" echo "Can't retrieve Windows PowerShell Logs"
} }
try{ # 尝试导出 Microsoft-Windows-TerminalServices-LocalSessionManager/Operational 日志到指定的 EVTX 文件
wevtutil epl "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" wineventlog/LocalSessionManager.evtx try {
wevtutil epl "Microsoft-Windows-TerminalServices-LocalSessionManager/Operational" wineventlog/LocalSessionManager.evtx
} }
catch catch {
{ # 如果导出 LocalSessionManager 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs" echo "Can't retrieve Microsoft-Windows-TerminalServices-LocalSessionManager/Operational Logs"
} }
try{ # 尝试导出 Microsoft-Windows-Windows Defender/Operational 日志到指定的 EVTX 文件
wevtutil epl "Microsoft-Windows-Windows Defender/Operational" wineventlog/Windows_Defender.evtx try {
wevtutil epl "Microsoft-Windows-Windows Defender/Operational" wineventlog/Windows_Defender.evtx
} }
catch catch {
{ # 如果导出 Windows Defender 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs" echo "Can't retrieve Microsoft-Windows-Windows Defender/Operational Logs"
} }
try{ # 尝试导出 Microsoft-Windows-TaskScheduler/Operational 日志到指定的 EVTX 文件
wevtutil epl Microsoft-Windows-TaskScheduler/Operational wineventlog/TaskScheduler.evtx try {
wevtutil epl Microsoft-Windows-TaskScheduler/Operational wineventlog/TaskScheduler.evtx
} }
catch catch {
{ # 如果导出 TaskScheduler 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs" echo "Can't retrieve Microsoft-Windows-TaskScheduler/Operational Logs"
} }
try{ # 尝试导出 Microsoft-Windows-WinRM/Operational 日志到指定的 EVTX 文件
wevtutil epl Microsoft-Windows-WinRM/Operational wineventlog/WinRM.evtx try {
wevtutil epl Microsoft-Windows-WinRM/Operational wineventlog/WinRM.evtx
} }
catch catch {
{ # 如果导出 WinRM 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs" echo "Can't retrieve Microsoft-Windows-WinRM/Operational Logs"
} }
try{ # 尝试导出 Microsoft-Windows-Sysmon/Operational 日志到指定的 EVTX 文件
wevtutil epl Microsoft-Windows-Sysmon/Operational wineventlog/Sysmon.evtx try {
wevtutil epl Microsoft-Windows-Sysmon/Operational wineventlog/Sysmon.evtx
} }
catch catch {
{ # 如果导出 Sysmon 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs" echo "Can't retrieve Microsoft-Windows-Sysmon/Operational Logs"
} }
# 尝试导出 Microsoft-Windows-PowerShell/Operational 日志到指定的 EVTX 文件
try{ try {
wevtutil epl Microsoft-Windows-PowerShell/Operational wineventlog/Powershell_Operational.evtx wevtutil epl Microsoft-Windows-PowerShell/Operational wineventlog/Powershell_Operational.evtx
} }
catch catch {
{ # 如果导出 PowerShell Operational 日志失败,输出错误信息
echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs" echo "Can't retrieve Microsoft-Windows-PowerShell/Operational Logs"
} }
# 尝试压缩 "wineventlog" 目录为 logs.zip
try try {
{ Compress-Archive -Path wineventlog -DestinationPath ./logs.zip
Compress-Archive -Path wineventlog -DestinationPath ./logs.zip }
} catch {
catch # 如果压缩失败,输出错误信息
{ echo "couldn't compress the log folder"
echo "couldn't compress the the log folder " }
}

@ -1,38 +0,0 @@
一、源代码结构与功能
APT-Hunter的源代码主要由Python编写包含多个模块和脚本用于实现日志收集、解析、分析以及结果输出等功能。
日志收集:
源代码中包含了用于收集Windows事件日志的PowerShell脚本windows-log-collector-full-v3-CSV && windows-log-collector-full-v3-EVTX脚本能够提取CSV和EVTX格式的日志。
用户可以通过运行这些脚本来自动收集所需的日志,而无需手动查找和提取。
日志解析:
APT-Hunter使用内置库如csv库来解析CSV日志文件使用外部库如evtx库来解析EVTX日志文件。
解析过程中APT-Hunter会使用正则表达式Regex为每个事件提取字段以便后续分析。
日志分析:
源代码中包含了用于分析日志的逻辑这些逻辑基于Mitre ATT&CK战术和技术将攻击指标映射到Windows事件日志中。
分析过程中APT-Hunter会检测各种可疑活动如恶意软件的安装、未授权的网络连接等并生成相应的报告。
结果输出:
分析结果可以以Excel工作表和CSV文件的形式输出便于用户查看和分析。
其中Excel工作表包含了从每个Windows日志中检测到的所有事件而CSV文件则可以用于时间线分析。
二、关键模块与代码分析
日志收集模块:
该模块主要包含PowerShell脚本用于从Windows系统中提取日志。
脚本中使用了Windows事件日志API或PowerShell命令来获取日志数据并将其保存为CSV或EVTX格式。
日志解析模块:
该模块使用Python编写包含了用于解析CSV和EVTX日志文件的函数。
在解析CSV文件时使用了Python的csv库来读取文件并提取字段。
在解析EVTX文件时使用了外部库如pyevtx来读取文件并解析事件。
日志分析模块:
该模块是APT-Hunter的核心部分包含了用于检测可疑活动的逻辑。
逻辑中定义了多个检测规则这些规则基于Mitre ATT&CK战术和技术用于识别各种APT攻击指标。
分析过程中APT-Hunter会遍历日志文件中的事件并根据检测规则进行判断和分类。
结果输出模块:
该模块负责将分析结果输出为用户可读的格式。
在输出Excel工作表时使用了Python的pandas库来创建和填充工作表。
在输出CSV文件时则直接使用了Python的文件操作函数来写入数据。
三、技术亮点与优势
高效性APT-Hunter能够快速地收集、解析和分析大量的Windows事件日志提高了威胁检测的效率和准确性。
易用性:该工具提供了友好的用户界面和简洁的操作流程,使得用户能够轻松上手并快速掌握其使用方法。
兼容性APT-Hunter支持多种格式的日志解析和输出配置使得用户能够灵活地将其集成到现有的安全监控系统中。
开源性作为一款开源工具APT-Hunter的源代码是公开的用户可以根据需要进行二次开发或定制。
四、结论与展望
通过对APT-Hunter源代码的分析可以看出该工具在Windows事件日志的威胁搜寻方面具有较高的效率和准确性。其友好的用户界面、简洁的操作流程以及灵活的日志解析和输出配置使得用户能够轻松地使用该工具进行威胁检测和分析。然而随着APT攻击的不断发展和变化APT-Hunter也需要不断更新和完善其检测规则和功能以应对新的威胁和挑战。未来可以进一步优化APT-Hunter的性能和效率提高其适用性和易用性并探索与其他安全监控系统的集成和联动以实现更加全面和高效的安全防护。
Loading…
Cancel
Save