You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dsj/data_preprocessor.py

532 lines
21 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多模态网络流图数据抽取 - 数据预处理流水线(修改权重计算)
功能:数据清洗、流量聚合、三元组生成
"""
import pandas as pd
import numpy as np
import re
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Any
import json
import csv
from collections import defaultdict, Counter
class DataPreprocessor:
"""数据预处理流水线类"""
def __init__(self, csv_file: str):
"""
初始化数据预处理器
Args:
csv_file (str): CSV文件路径
"""
self.csv_file = csv_file
self.df = None
self.cleaned_df = None
self.flow_stats = {}
self.triplets = []
def load_data(self) -> pd.DataFrame:
"""
加载CSV数据
Returns:
pd.DataFrame: 原始数据框
"""
print(f"正在加载CSV文件: {self.csv_file}")
try:
self.df = pd.read_csv(self.csv_file, encoding='utf-8')
# 兼容两列表CSV仅源IP、目标IP
cols_lower = {str(c).lower(): c for c in self.df.columns}
possible_src = [
'源ip','源ip地址','源ip ','源ip地址 ','源ip地址 ','源ip地址\t','源ip\t','源ip地址\n',
'源ip\n','源ip地址\r','源ip\r','源ip\r\n','源ip地址\r\n','源ip地址\f',
'源ip地址\v','源ip地址\u3000','源ip地址 ','源ip地址 ','源ip地址 ',
'源ip地址\t','源ip地址 ', '源ip地址 ', '源ip地址 ',
'源ip', '源ip ', '源IP', 'source', 'src', 'source_ip'
]
possible_dst = ['目的ip','目标ip','目标IP','dest','dst','destination_ip']
def pick(cands):
for k in cands:
lk = str(k).lower()
if lk in cols_lower:
return cols_lower[lk]
return None
if ('源IP' not in self.df.columns or '目标IP' not in self.df.columns) and len(self.df.columns) >= 2:
src_col = pick(possible_src) or self.df.columns[0]
dst_col = pick(possible_dst) or self.df.columns[1]
# 仅保留并重命名为标准字段
self.df = self.df[[src_col, dst_col]].rename(columns={src_col: '源IP', dst_col: '目标IP'})
# 补全必须字段,保持后续流程兼容
self.df['数据包ID'] = range(1, len(self.df)+1)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.df['时间戳'] = now
self.df['协议'] = 'Unknown'
self.df['源端口'] = 0
self.df['目标端口'] = 0
self.df['数据包大小'] = 0
self.df['载荷大小'] = 0
self.df['标志'] = ''
print(f"成功加载 {len(self.df)} 条记录")
print(f"数据列: {list(self.df.columns)}")
return self.df
except Exception as e:
print(f"加载CSV文件时出错: {e}")
return None
def clean_data(self) -> pd.DataFrame:
"""
数据清洗
Returns:
pd.DataFrame: 清洗后的数据框
"""
if self.df is None:
self.load_data()
print("开始数据清洗...")
original_count = len(self.df)
# 1. 过滤无效数据包
print("1. 过滤无效数据包...")
# 过滤掉源IP或目标IP为N/A的记录
valid_ip_mask = (self.df['源IP'] != 'N/A') & (self.df['目标IP'] != 'N/A')
self.df = self.df[valid_ip_mask]
# 对两列表输入数据包大小可能全为0不能直接过滤掉所有行
if (self.df['数据包大小'] > 0).any():
self.df = self.df[self.df['数据包大小'] > 0]
# 2. 不进行重复数据去重(按需求保留所有记录)
print("2. 跳过重复数据处理(不去重)...")
# 3. 处理异常数据
print("3. 处理异常数据...")
# 过滤掉异常大的数据包超过MTU的包
max_packet_size = 65535 # 最大IP包大小
self.df = self.df[self.df['数据包大小'] <= max_packet_size]
# 过滤掉异常的时间戳(未来时间或过于久远的时间)
if '时间戳' in self.df.columns:
current_time = datetime.now()
self.df['时间戳'] = pd.to_datetime(self.df['时间戳'], errors='coerce')
# 两列表场景中使用统一填充值,可能全部相同,这里只做非空过滤
time_mask = self.df['时间戳'].notna()
self.df = self.df[time_mask]
# 4. IP地址标准化
print("4. IP地址标准化...")
self.df = self._standardize_ip_addresses()
# 5. 数据类型转换
print("5. 数据类型转换...")
for col in ['源端口','目标端口','数据包大小','载荷大小']:
if col in self.df.columns:
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
# 移除转换失败的行
required_numeric = [c for c in ['源端口', '目标端口', '数据包大小', '载荷大小'] if c in self.df.columns]
if required_numeric:
self.df = self.df.dropna(subset=required_numeric)
self.cleaned_df = self.df.copy()
cleaned_count = len(self.df)
print(f"数据清洗完成:")
print(f" 原始记录数: {original_count}")
print(f" 清洗后记录数: {cleaned_count}")
print(f" 移除记录数: {original_count - cleaned_count}")
return self.df
def _standardize_ip_addresses(self) -> pd.DataFrame:
"""
IP地址标准化
Returns:
pd.DataFrame: 标准化后的数据框
"""
def is_valid_ip(ip_str):
try:
ipaddress.ip_address(ip_str)
return True
except ValueError:
return False
# 过滤无效IP地址
valid_src_ip = self.df['源IP'].astype(str).apply(is_valid_ip)
valid_dst_ip = self.df['目标IP'].astype(str).apply(is_valid_ip)
self.df = self.df[valid_src_ip & valid_dst_ip]
# 标准化IP地址格式
self.df['源IP'] = self.df['源IP'].apply(lambda x: str(ipaddress.ip_address(x)))
self.df['目标IP'] = self.df['目标IP'].apply(lambda x: str(ipaddress.ip_address(x)))
return self.df
def aggregate_flows(self, time_window: int = 60) -> Dict[str, Any]:
"""
流量聚合
Args:
time_window (int): 时间窗口(秒)
Returns:
Dict[str, Any]: 聚合后的流量统计
"""
if self.cleaned_df is None:
self.clean_data()
print(f"开始流量聚合,时间窗口: {time_window}秒...")
# 1. 实现双向流合并算法
print("1. 实现双向流合并算法...")
flows = self._merge_bidirectional_flows()
# 2. 计算流量统计信息
print("2. 计算流量统计信息...")
flow_stats = self._calculate_flow_statistics(flows, time_window)
# 3. 生成流量三元组
print("3. 生成流量三元组...")
triplets = self._generate_triplets(flows)
self.flow_stats = flow_stats
self.triplets = triplets
print(f"流量聚合完成:")
print(f" 总流数: {len(flows)}")
print(f" 三元组数: {len(triplets)}")
return flow_stats
def _merge_bidirectional_flows(self) -> Dict[str, Dict]:
"""
双向流合并算法
Returns:
Dict[str, Dict]: 合并后的流信息
"""
flows = defaultdict(lambda: {
'packet_count': 0,
'total_bytes': 0,
'payload_bytes': 0,
'start_time': None,
'end_time': None,
'protocols': set(),
'src_ports': set(),
'dst_ports': set(),
'flags': set()
})
for _, row in self.cleaned_df.iterrows():
# 创建流的标识符(双向)
src_ip = row['源IP']
dst_ip = row['目标IP']
src_port = row['源端口']
dst_port = row['目标端口']
# 使用较小的IP地址作为流的起始点确保双向流的一致性
if src_ip < dst_ip:
flow_key = f"{src_ip}:{src_port}-{dst_ip}:{dst_port}"
else:
flow_key = f"{dst_ip}:{dst_port}-{src_ip}:{src_port}"
# 更新流统计信息
flows[flow_key]['packet_count'] += 1
flows[flow_key]['total_bytes'] += row['数据包大小']
flows[flow_key]['payload_bytes'] += row['载荷大小']
flows[flow_key]['protocols'].add(row['协议'])
flows[flow_key]['src_ports'].add(src_port)
flows[flow_key]['dst_ports'].add(dst_port)
flows[flow_key]['flags'].add(str(row['标志']))
# 更新时间范围
timestamp = row['时间戳']
if flows[flow_key]['start_time'] is None or timestamp < flows[flow_key]['start_time']:
flows[flow_key]['start_time'] = timestamp
if flows[flow_key]['end_time'] is None or timestamp > flows[flow_key]['end_time']:
flows[flow_key]['end_time'] = timestamp
# 转换set为list以便JSON序列化
for flow_key, flow_data in flows.items():
flow_data['protocols'] = list(flow_data['protocols'])
flow_data['src_ports'] = list(flow_data['src_ports'])
flow_data['dst_ports'] = list(flow_data['dst_ports'])
flow_data['flags'] = list(flow_data['flags'])
return dict(flows)
def _calculate_flow_statistics(self, flows: Dict[str, Dict], time_window: int) -> Dict[str, Any]:
"""
计算流量统计信息
Args:
flows (Dict[str, Dict]): 流信息
time_window (int): 时间窗口
Returns:
Dict[str, Any]: 流量统计信息
"""
stats = {
'total_flows': len(flows),
'total_packets': sum(flow['packet_count'] for flow in flows.values()),
'total_bytes': sum(flow['total_bytes'] for flow in flows.values()),
'total_payload_bytes': sum(flow['payload_bytes'] for flow in flows.values()),
'time_windows': [],
'protocol_distribution': Counter(),
'top_flows': [],
'flow_duration_stats': {}
}
# 协议分布统计
for flow in flows.values():
for protocol in flow['protocols']:
stats['protocol_distribution'][protocol] += 1
# 计算时间窗口统计
if flows:
start_time = min(flow['start_time'] for flow in flows.values())
end_time = max(flow['end_time'] for flow in flows.values())
current_time = start_time
while current_time < end_time:
window_end = current_time + timedelta(seconds=time_window)
window_flows = []
for flow_key, flow in flows.items():
if (flow['start_time'] <= window_end and flow['end_time'] >= current_time):
window_flows.append({
'flow_key': flow_key,
'packet_count': flow['packet_count'],
'total_bytes': flow['total_bytes']
})
stats['time_windows'].append({
'start_time': current_time.isoformat(),
'end_time': window_end.isoformat(),
'flow_count': len(window_flows),
'total_packets': sum(f['packet_count'] for f in window_flows),
'total_bytes': sum(f['total_bytes'] for f in window_flows)
})
current_time = window_end
# 找出最活跃的流
sorted_flows = sorted(flows.items(), key=lambda x: x[1]['total_bytes'], reverse=True)
stats['top_flows'] = [
{
'flow_key': flow_key,
'packet_count': flow['packet_count'],
'total_bytes': flow['total_bytes'],
'duration': (flow['end_time'] - flow['start_time']).total_seconds()
}
for flow_key, flow in sorted_flows[:10]
]
# 流持续时间统计
durations = [(flow['end_time'] - flow['start_time']).total_seconds() for flow in flows.values()]
if durations:
stats['flow_duration_stats'] = {
'min_duration': min(durations),
'max_duration': max(durations),
'avg_duration': np.mean(durations),
'median_duration': np.median(durations)
}
return stats
def _generate_triplets(self, flows: Dict[str, Dict]) -> List[Dict[str, Any]]:
"""
生成流量三元组 (源IP, 目标IP, 权重)
权重=在清洗后的明细数据中,该有向(IP_src, IP_dst)对出现的次数
Args:
flows (Dict[str, Dict]): 流信息
Returns:
List[Dict[str, Any]]: 三元组列表
"""
if self.cleaned_df is None:
self.clean_data()
# 基于清洗后的明细,按有向对(源IP, 目标IP)计数
grouped = self.cleaned_df.groupby(['源IP', '目标IP']).size().reset_index(name='count')
triplets = []
for _, row in grouped.iterrows():
src_ip = str(row['源IP'])
dst_ip = str(row['目标IP'])
count = int(row['count'])
triplets.append({
'source_ip': src_ip,
'target_ip': dst_ip,
'weight': count,
'connection_count': count,
'ip_pair': f"{src_ip}-{dst_ip}"
})
# 按权重排序
triplets.sort(key=lambda x: x['weight'], reverse=True)
return triplets
def export_results(self, output_dir: str = "preprocessing_results") -> Dict[str, str]:
"""
导出预处理结果
Args:
output_dir (str): 输出目录
Returns:
Dict[str, str]: 输出文件路径
"""
import os
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_files = {}
# 1. 导出清洗后的数据
if self.cleaned_df is not None:
cleaned_file = os.path.join(output_dir, f"cleaned_data_{timestamp}.csv")
self.cleaned_df.to_csv(cleaned_file, index=False, encoding='utf-8')
output_files['cleaned_data'] = cleaned_file
print(f"清洗后数据已导出: {cleaned_file}")
# 2. 导出流量统计信息CSV扁平化
if self.flow_stats:
stats_file = os.path.join(output_dir, f"flow_statistics_{timestamp}.csv")
# 将嵌套结构扁平化为多行键值形式
flat_rows = []
# 基本汇总
base_keys = ['total_flows','total_packets','total_bytes','total_payload_bytes']
# 汇总键值逐项展开
for k in base_keys:
flat_rows.append({'类别': '汇总', '': k, '': self.flow_stats.get(k, '')})
# 协议分布
for proto, cnt in (self.flow_stats.get('protocol_distribution', {}) or {}).items():
flat_rows.append({'类别': '协议分布', '': str(proto), '': cnt})
# 时间窗口
for idx, w in enumerate(self.flow_stats.get('time_windows', []) or []):
flat_rows.append({'类别': '时间窗口', '': f"{idx}_start", '': w.get('start_time','')})
flat_rows.append({'类别': '时间窗口', '': f"{idx}_end", '': w.get('end_time','')})
flat_rows.append({'类别': '时间窗口', '': f"{idx}_flow_count", '': w.get('flow_count',0)})
flat_rows.append({'类别': '时间窗口', '': f"{idx}_total_packets", '': w.get('total_packets',0)})
flat_rows.append({'类别': '时间窗口', '': f"{idx}_total_bytes", '': w.get('total_bytes',0)})
# Top流
for idx, item in enumerate(self.flow_stats.get('top_flows', []) or []):
flat_rows.append({'类别': '最活跃流', '': f"{idx}_flow_key", '': item.get('flow_key','')})
flat_rows.append({'类别': '最活跃流', '': f"{idx}_packet_count", '': item.get('packet_count',0)})
flat_rows.append({'类别': '最活跃流', '': f"{idx}_total_bytes", '': item.get('total_bytes',0)})
flat_rows.append({'类别': '最活跃流', '': f"{idx}_duration", '': item.get('duration',0)})
# 持续时间统计
for k, v in (self.flow_stats.get('flow_duration_stats', {}) or {}).items():
flat_rows.append({'类别': '流持续时间', '': k, '': v})
pd.DataFrame(flat_rows).to_csv(stats_file, index=False, encoding='utf-8')
output_files['flow_statistics'] = stats_file
print(f"流量统计(CSV)已导出: {stats_file}")
# 3. 导出三元组数据
if self.triplets:
triplets_file = os.path.join(output_dir, f"flow_triplets_{timestamp}.csv")
triplets_df = pd.DataFrame(self.triplets)
triplets_df.to_csv(triplets_file, index=False, encoding='utf-8')
output_files['flow_triplets'] = triplets_file
print(f"流量三元组已导出: {triplets_file}")
# 仅保留CSV导出
return output_files
def print_summary(self):
"""打印预处理摘要"""
print("\n" + "="*60)
print("数据预处理摘要")
print("="*60)
if self.cleaned_df is not None:
print(f"清洗后数据记录数: {len(self.cleaned_df)}")
print(f"数据列: {list(self.cleaned_df.columns)}")
if self.flow_stats:
print(f"\n流量统计:")
print(f" 总流数: {self.flow_stats.get('total_flows', 0)}")
print(f" 总数据包数: {self.flow_stats.get('total_packets', 0)}")
print(f" 总字节数: {self.flow_stats.get('total_bytes', 0)}")
print(f" 总载荷字节数: {self.flow_stats.get('total_payload_bytes', 0)}")
print(f"\n协议分布:")
for protocol, count in self.flow_stats.get('protocol_distribution', {}).items():
print(f" {protocol}: {count}")
if self.triplets:
print(f"\n流量三元组:")
print(f" 三元组数量: {len(self.triplets)}")
print(f" 前5个最活跃的连接:")
for i, triplet in enumerate(self.triplets[:5]):
print(f" {i+1}. {triplet['source_ip']} -> {triplet['target_ip']} (权重: {triplet['weight']})")
print("="*60)
def main():
"""主函数"""
print("多模态网络流图数据抽取 - 数据预处理流水线(修改权重计算)")
print("="*70)
# 查找最新的CSV文件
import glob
import os
csv_files = glob.glob("规范化结果_*.csv")
if not csv_files:
print("错误找不到PCAP分析CSV文件")
return
# 使用最新的CSV文件
latest_csv = max(csv_files, key=os.path.getctime)
print(f"使用CSV文件: {latest_csv}")
try:
# 创建预处理器
preprocessor = DataPreprocessor(latest_csv)
# 执行预处理流水线
print("\n1. 加载数据...")
preprocessor.load_data()
print("\n2. 数据清洗...")
preprocessor.clean_data()
print("\n3. 流量聚合...")
preprocessor.aggregate_flows(time_window=60)
print("\n4. 导出结果...")
output_files = preprocessor.export_results()
print("\n5. 生成摘要...")
preprocessor.print_summary()
print(f"\n预处理完成!")
print("输出文件:")
for key, file_path in output_files.items():
print(f" {key}: {file_path}")
except Exception as e:
print(f"预处理过程中出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()