|
|
#!/usr/bin/env python3
|
|
|
# -*- coding: utf-8 -*-
|
|
|
"""
|
|
|
多模态网络流图数据抽取 - 数据预处理流水线(修改权重计算)
|
|
|
功能:数据清洗、流量聚合、三元组生成
|
|
|
"""
|
|
|
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
import re
|
|
|
import ipaddress
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Dict, List, Tuple, Any
|
|
|
import json
|
|
|
import csv
|
|
|
from collections import defaultdict, Counter
|
|
|
|
|
|
|
|
|
class DataPreprocessor:
|
|
|
"""数据预处理流水线类"""
|
|
|
|
|
|
def __init__(self, csv_file: str):
|
|
|
"""
|
|
|
初始化数据预处理器
|
|
|
|
|
|
Args:
|
|
|
csv_file (str): CSV文件路径
|
|
|
"""
|
|
|
self.csv_file = csv_file
|
|
|
self.df = None
|
|
|
self.cleaned_df = None
|
|
|
self.flow_stats = {}
|
|
|
self.triplets = []
|
|
|
|
|
|
def load_data(self) -> pd.DataFrame:
|
|
|
"""
|
|
|
加载CSV数据
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: 原始数据框
|
|
|
"""
|
|
|
print(f"正在加载CSV文件: {self.csv_file}")
|
|
|
|
|
|
try:
|
|
|
self.df = pd.read_csv(self.csv_file, encoding='utf-8')
|
|
|
# 兼容两列表CSV(仅源IP、目标IP)
|
|
|
cols_lower = {str(c).lower(): c for c in self.df.columns}
|
|
|
possible_src = [
|
|
|
'源ip','源ip地址','源ip ','源ip地址 ','源ip地址 ','源ip地址\t','源ip\t','源ip地址\n',
|
|
|
'源ip\n','源ip地址\r','源ip\r','源ip\r\n','源ip地址\r\n','源ip地址\f',
|
|
|
'源ip地址\v','源ip地址\u3000','源ip地址 ','源ip地址 ','源ip地址 ',
|
|
|
'源ip地址\t','源ip地址 ', '源ip地址 ', '源ip地址 ',
|
|
|
'源ip', '源ip ', '源IP', 'source', 'src', 'source_ip'
|
|
|
]
|
|
|
possible_dst = ['目的ip','目标ip','目标IP','dest','dst','destination_ip']
|
|
|
def pick(cands):
|
|
|
for k in cands:
|
|
|
lk = str(k).lower()
|
|
|
if lk in cols_lower:
|
|
|
return cols_lower[lk]
|
|
|
return None
|
|
|
if ('源IP' not in self.df.columns or '目标IP' not in self.df.columns) and len(self.df.columns) >= 2:
|
|
|
src_col = pick(possible_src) or self.df.columns[0]
|
|
|
dst_col = pick(possible_dst) or self.df.columns[1]
|
|
|
# 仅保留并重命名为标准字段
|
|
|
self.df = self.df[[src_col, dst_col]].rename(columns={src_col: '源IP', dst_col: '目标IP'})
|
|
|
# 补全必须字段,保持后续流程兼容
|
|
|
self.df['数据包ID'] = range(1, len(self.df)+1)
|
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
self.df['时间戳'] = now
|
|
|
self.df['协议'] = 'Unknown'
|
|
|
self.df['源端口'] = 0
|
|
|
self.df['目标端口'] = 0
|
|
|
self.df['数据包大小'] = 0
|
|
|
self.df['载荷大小'] = 0
|
|
|
self.df['标志'] = ''
|
|
|
print(f"成功加载 {len(self.df)} 条记录")
|
|
|
print(f"数据列: {list(self.df.columns)}")
|
|
|
return self.df
|
|
|
except Exception as e:
|
|
|
print(f"加载CSV文件时出错: {e}")
|
|
|
return None
|
|
|
|
|
|
def clean_data(self) -> pd.DataFrame:
|
|
|
"""
|
|
|
数据清洗
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: 清洗后的数据框
|
|
|
"""
|
|
|
if self.df is None:
|
|
|
self.load_data()
|
|
|
|
|
|
print("开始数据清洗...")
|
|
|
original_count = len(self.df)
|
|
|
|
|
|
# 1. 过滤无效数据包
|
|
|
print("1. 过滤无效数据包...")
|
|
|
# 过滤掉源IP或目标IP为N/A的记录
|
|
|
valid_ip_mask = (self.df['源IP'] != 'N/A') & (self.df['目标IP'] != 'N/A')
|
|
|
self.df = self.df[valid_ip_mask]
|
|
|
|
|
|
# 对两列表输入,数据包大小可能全为0,不能直接过滤掉所有行
|
|
|
if (self.df['数据包大小'] > 0).any():
|
|
|
self.df = self.df[self.df['数据包大小'] > 0]
|
|
|
|
|
|
# 2. 不进行重复数据去重(按需求保留所有记录)
|
|
|
print("2. 跳过重复数据处理(不去重)...")
|
|
|
|
|
|
# 3. 处理异常数据
|
|
|
print("3. 处理异常数据...")
|
|
|
# 过滤掉异常大的数据包(超过MTU的包)
|
|
|
max_packet_size = 65535 # 最大IP包大小
|
|
|
self.df = self.df[self.df['数据包大小'] <= max_packet_size]
|
|
|
|
|
|
# 过滤掉异常的时间戳(未来时间或过于久远的时间)
|
|
|
if '时间戳' in self.df.columns:
|
|
|
current_time = datetime.now()
|
|
|
self.df['时间戳'] = pd.to_datetime(self.df['时间戳'], errors='coerce')
|
|
|
# 两列表场景中使用统一填充值,可能全部相同,这里只做非空过滤
|
|
|
time_mask = self.df['时间戳'].notna()
|
|
|
self.df = self.df[time_mask]
|
|
|
|
|
|
# 4. IP地址标准化
|
|
|
print("4. IP地址标准化...")
|
|
|
self.df = self._standardize_ip_addresses()
|
|
|
|
|
|
# 5. 数据类型转换
|
|
|
print("5. 数据类型转换...")
|
|
|
for col in ['源端口','目标端口','数据包大小','载荷大小']:
|
|
|
if col in self.df.columns:
|
|
|
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
|
|
|
|
|
|
# 移除转换失败的行
|
|
|
required_numeric = [c for c in ['源端口', '目标端口', '数据包大小', '载荷大小'] if c in self.df.columns]
|
|
|
if required_numeric:
|
|
|
self.df = self.df.dropna(subset=required_numeric)
|
|
|
|
|
|
self.cleaned_df = self.df.copy()
|
|
|
cleaned_count = len(self.df)
|
|
|
|
|
|
print(f"数据清洗完成:")
|
|
|
print(f" 原始记录数: {original_count}")
|
|
|
print(f" 清洗后记录数: {cleaned_count}")
|
|
|
print(f" 移除记录数: {original_count - cleaned_count}")
|
|
|
|
|
|
return self.df
|
|
|
|
|
|
def _standardize_ip_addresses(self) -> pd.DataFrame:
|
|
|
"""
|
|
|
IP地址标准化
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: 标准化后的数据框
|
|
|
"""
|
|
|
def is_valid_ip(ip_str):
|
|
|
try:
|
|
|
ipaddress.ip_address(ip_str)
|
|
|
return True
|
|
|
except ValueError:
|
|
|
return False
|
|
|
|
|
|
# 过滤无效IP地址
|
|
|
valid_src_ip = self.df['源IP'].astype(str).apply(is_valid_ip)
|
|
|
valid_dst_ip = self.df['目标IP'].astype(str).apply(is_valid_ip)
|
|
|
self.df = self.df[valid_src_ip & valid_dst_ip]
|
|
|
|
|
|
# 标准化IP地址格式
|
|
|
self.df['源IP'] = self.df['源IP'].apply(lambda x: str(ipaddress.ip_address(x)))
|
|
|
self.df['目标IP'] = self.df['目标IP'].apply(lambda x: str(ipaddress.ip_address(x)))
|
|
|
|
|
|
return self.df
|
|
|
|
|
|
def aggregate_flows(self, time_window: int = 60) -> Dict[str, Any]:
|
|
|
"""
|
|
|
流量聚合
|
|
|
|
|
|
Args:
|
|
|
time_window (int): 时间窗口(秒)
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: 聚合后的流量统计
|
|
|
"""
|
|
|
if self.cleaned_df is None:
|
|
|
self.clean_data()
|
|
|
|
|
|
print(f"开始流量聚合,时间窗口: {time_window}秒...")
|
|
|
|
|
|
# 1. 实现双向流合并算法
|
|
|
print("1. 实现双向流合并算法...")
|
|
|
flows = self._merge_bidirectional_flows()
|
|
|
|
|
|
# 2. 计算流量统计信息
|
|
|
print("2. 计算流量统计信息...")
|
|
|
flow_stats = self._calculate_flow_statistics(flows, time_window)
|
|
|
|
|
|
# 3. 生成流量三元组
|
|
|
print("3. 生成流量三元组...")
|
|
|
triplets = self._generate_triplets(flows)
|
|
|
|
|
|
self.flow_stats = flow_stats
|
|
|
self.triplets = triplets
|
|
|
|
|
|
print(f"流量聚合完成:")
|
|
|
print(f" 总流数: {len(flows)}")
|
|
|
print(f" 三元组数: {len(triplets)}")
|
|
|
|
|
|
return flow_stats
|
|
|
|
|
|
def _merge_bidirectional_flows(self) -> Dict[str, Dict]:
|
|
|
"""
|
|
|
双向流合并算法
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Dict]: 合并后的流信息
|
|
|
"""
|
|
|
flows = defaultdict(lambda: {
|
|
|
'packet_count': 0,
|
|
|
'total_bytes': 0,
|
|
|
'payload_bytes': 0,
|
|
|
'start_time': None,
|
|
|
'end_time': None,
|
|
|
'protocols': set(),
|
|
|
'src_ports': set(),
|
|
|
'dst_ports': set(),
|
|
|
'flags': set()
|
|
|
})
|
|
|
|
|
|
for _, row in self.cleaned_df.iterrows():
|
|
|
# 创建流的标识符(双向)
|
|
|
src_ip = row['源IP']
|
|
|
dst_ip = row['目标IP']
|
|
|
src_port = row['源端口']
|
|
|
dst_port = row['目标端口']
|
|
|
|
|
|
# 使用较小的IP地址作为流的起始点,确保双向流的一致性
|
|
|
if src_ip < dst_ip:
|
|
|
flow_key = f"{src_ip}:{src_port}-{dst_ip}:{dst_port}"
|
|
|
else:
|
|
|
flow_key = f"{dst_ip}:{dst_port}-{src_ip}:{src_port}"
|
|
|
|
|
|
# 更新流统计信息
|
|
|
flows[flow_key]['packet_count'] += 1
|
|
|
flows[flow_key]['total_bytes'] += row['数据包大小']
|
|
|
flows[flow_key]['payload_bytes'] += row['载荷大小']
|
|
|
flows[flow_key]['protocols'].add(row['协议'])
|
|
|
flows[flow_key]['src_ports'].add(src_port)
|
|
|
flows[flow_key]['dst_ports'].add(dst_port)
|
|
|
flows[flow_key]['flags'].add(str(row['标志']))
|
|
|
|
|
|
# 更新时间范围
|
|
|
timestamp = row['时间戳']
|
|
|
if flows[flow_key]['start_time'] is None or timestamp < flows[flow_key]['start_time']:
|
|
|
flows[flow_key]['start_time'] = timestamp
|
|
|
if flows[flow_key]['end_time'] is None or timestamp > flows[flow_key]['end_time']:
|
|
|
flows[flow_key]['end_time'] = timestamp
|
|
|
|
|
|
# 转换set为list以便JSON序列化
|
|
|
for flow_key, flow_data in flows.items():
|
|
|
flow_data['protocols'] = list(flow_data['protocols'])
|
|
|
flow_data['src_ports'] = list(flow_data['src_ports'])
|
|
|
flow_data['dst_ports'] = list(flow_data['dst_ports'])
|
|
|
flow_data['flags'] = list(flow_data['flags'])
|
|
|
|
|
|
return dict(flows)
|
|
|
|
|
|
def _calculate_flow_statistics(self, flows: Dict[str, Dict], time_window: int) -> Dict[str, Any]:
|
|
|
"""
|
|
|
计算流量统计信息
|
|
|
|
|
|
Args:
|
|
|
flows (Dict[str, Dict]): 流信息
|
|
|
time_window (int): 时间窗口
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: 流量统计信息
|
|
|
"""
|
|
|
stats = {
|
|
|
'total_flows': len(flows),
|
|
|
'total_packets': sum(flow['packet_count'] for flow in flows.values()),
|
|
|
'total_bytes': sum(flow['total_bytes'] for flow in flows.values()),
|
|
|
'total_payload_bytes': sum(flow['payload_bytes'] for flow in flows.values()),
|
|
|
'time_windows': [],
|
|
|
'protocol_distribution': Counter(),
|
|
|
'top_flows': [],
|
|
|
'flow_duration_stats': {}
|
|
|
}
|
|
|
|
|
|
# 协议分布统计
|
|
|
for flow in flows.values():
|
|
|
for protocol in flow['protocols']:
|
|
|
stats['protocol_distribution'][protocol] += 1
|
|
|
|
|
|
# 计算时间窗口统计
|
|
|
if flows:
|
|
|
start_time = min(flow['start_time'] for flow in flows.values())
|
|
|
end_time = max(flow['end_time'] for flow in flows.values())
|
|
|
|
|
|
current_time = start_time
|
|
|
while current_time < end_time:
|
|
|
window_end = current_time + timedelta(seconds=time_window)
|
|
|
window_flows = []
|
|
|
|
|
|
for flow_key, flow in flows.items():
|
|
|
if (flow['start_time'] <= window_end and flow['end_time'] >= current_time):
|
|
|
window_flows.append({
|
|
|
'flow_key': flow_key,
|
|
|
'packet_count': flow['packet_count'],
|
|
|
'total_bytes': flow['total_bytes']
|
|
|
})
|
|
|
|
|
|
stats['time_windows'].append({
|
|
|
'start_time': current_time.isoformat(),
|
|
|
'end_time': window_end.isoformat(),
|
|
|
'flow_count': len(window_flows),
|
|
|
'total_packets': sum(f['packet_count'] for f in window_flows),
|
|
|
'total_bytes': sum(f['total_bytes'] for f in window_flows)
|
|
|
})
|
|
|
|
|
|
current_time = window_end
|
|
|
|
|
|
# 找出最活跃的流
|
|
|
sorted_flows = sorted(flows.items(), key=lambda x: x[1]['total_bytes'], reverse=True)
|
|
|
stats['top_flows'] = [
|
|
|
{
|
|
|
'flow_key': flow_key,
|
|
|
'packet_count': flow['packet_count'],
|
|
|
'total_bytes': flow['total_bytes'],
|
|
|
'duration': (flow['end_time'] - flow['start_time']).total_seconds()
|
|
|
}
|
|
|
for flow_key, flow in sorted_flows[:10]
|
|
|
]
|
|
|
|
|
|
# 流持续时间统计
|
|
|
durations = [(flow['end_time'] - flow['start_time']).total_seconds() for flow in flows.values()]
|
|
|
if durations:
|
|
|
stats['flow_duration_stats'] = {
|
|
|
'min_duration': min(durations),
|
|
|
'max_duration': max(durations),
|
|
|
'avg_duration': np.mean(durations),
|
|
|
'median_duration': np.median(durations)
|
|
|
}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
def _generate_triplets(self, flows: Dict[str, Dict]) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
生成流量三元组 (源IP, 目标IP, 权重)
|
|
|
权重=在清洗后的明细数据中,该有向(IP_src, IP_dst)对出现的次数
|
|
|
|
|
|
Args:
|
|
|
flows (Dict[str, Dict]): 流信息
|
|
|
|
|
|
Returns:
|
|
|
List[Dict[str, Any]]: 三元组列表
|
|
|
"""
|
|
|
if self.cleaned_df is None:
|
|
|
self.clean_data()
|
|
|
|
|
|
# 基于清洗后的明细,按有向对(源IP, 目标IP)计数
|
|
|
grouped = self.cleaned_df.groupby(['源IP', '目标IP']).size().reset_index(name='count')
|
|
|
|
|
|
triplets = []
|
|
|
for _, row in grouped.iterrows():
|
|
|
src_ip = str(row['源IP'])
|
|
|
dst_ip = str(row['目标IP'])
|
|
|
count = int(row['count'])
|
|
|
triplets.append({
|
|
|
'source_ip': src_ip,
|
|
|
'target_ip': dst_ip,
|
|
|
'weight': count,
|
|
|
'connection_count': count,
|
|
|
'ip_pair': f"{src_ip}-{dst_ip}"
|
|
|
})
|
|
|
|
|
|
# 按权重排序
|
|
|
triplets.sort(key=lambda x: x['weight'], reverse=True)
|
|
|
|
|
|
return triplets
|
|
|
|
|
|
def export_results(self, output_dir: str = "preprocessing_results") -> Dict[str, str]:
|
|
|
"""
|
|
|
导出预处理结果
|
|
|
|
|
|
Args:
|
|
|
output_dir (str): 输出目录
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, str]: 输出文件路径
|
|
|
"""
|
|
|
import os
|
|
|
|
|
|
# 创建输出目录
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
output_files = {}
|
|
|
|
|
|
# 1. 导出清洗后的数据
|
|
|
if self.cleaned_df is not None:
|
|
|
cleaned_file = os.path.join(output_dir, f"cleaned_data_{timestamp}.csv")
|
|
|
self.cleaned_df.to_csv(cleaned_file, index=False, encoding='utf-8')
|
|
|
output_files['cleaned_data'] = cleaned_file
|
|
|
print(f"清洗后数据已导出: {cleaned_file}")
|
|
|
|
|
|
# 2. 导出流量统计信息(CSV扁平化)
|
|
|
if self.flow_stats:
|
|
|
stats_file = os.path.join(output_dir, f"flow_statistics_{timestamp}.csv")
|
|
|
# 将嵌套结构扁平化为多行键值形式
|
|
|
flat_rows = []
|
|
|
# 基本汇总
|
|
|
base_keys = ['total_flows','total_packets','total_bytes','total_payload_bytes']
|
|
|
# 汇总键值逐项展开
|
|
|
for k in base_keys:
|
|
|
flat_rows.append({'类别': '汇总', '键': k, '值': self.flow_stats.get(k, '')})
|
|
|
# 协议分布
|
|
|
for proto, cnt in (self.flow_stats.get('protocol_distribution', {}) or {}).items():
|
|
|
flat_rows.append({'类别': '协议分布', '键': str(proto), '值': cnt})
|
|
|
# 时间窗口
|
|
|
for idx, w in enumerate(self.flow_stats.get('time_windows', []) or []):
|
|
|
flat_rows.append({'类别': '时间窗口', '键': f"{idx}_start", '值': w.get('start_time','')})
|
|
|
flat_rows.append({'类别': '时间窗口', '键': f"{idx}_end", '值': w.get('end_time','')})
|
|
|
flat_rows.append({'类别': '时间窗口', '键': f"{idx}_flow_count", '值': w.get('flow_count',0)})
|
|
|
flat_rows.append({'类别': '时间窗口', '键': f"{idx}_total_packets", '值': w.get('total_packets',0)})
|
|
|
flat_rows.append({'类别': '时间窗口', '键': f"{idx}_total_bytes", '值': w.get('total_bytes',0)})
|
|
|
# Top流
|
|
|
for idx, item in enumerate(self.flow_stats.get('top_flows', []) or []):
|
|
|
flat_rows.append({'类别': '最活跃流', '键': f"{idx}_flow_key", '值': item.get('flow_key','')})
|
|
|
flat_rows.append({'类别': '最活跃流', '键': f"{idx}_packet_count", '值': item.get('packet_count',0)})
|
|
|
flat_rows.append({'类别': '最活跃流', '键': f"{idx}_total_bytes", '值': item.get('total_bytes',0)})
|
|
|
flat_rows.append({'类别': '最活跃流', '键': f"{idx}_duration", '值': item.get('duration',0)})
|
|
|
# 持续时间统计
|
|
|
for k, v in (self.flow_stats.get('flow_duration_stats', {}) or {}).items():
|
|
|
flat_rows.append({'类别': '流持续时间', '键': k, '值': v})
|
|
|
pd.DataFrame(flat_rows).to_csv(stats_file, index=False, encoding='utf-8')
|
|
|
output_files['flow_statistics'] = stats_file
|
|
|
print(f"流量统计(CSV)已导出: {stats_file}")
|
|
|
|
|
|
# 3. 导出三元组数据
|
|
|
if self.triplets:
|
|
|
triplets_file = os.path.join(output_dir, f"flow_triplets_{timestamp}.csv")
|
|
|
triplets_df = pd.DataFrame(self.triplets)
|
|
|
triplets_df.to_csv(triplets_file, index=False, encoding='utf-8')
|
|
|
output_files['flow_triplets'] = triplets_file
|
|
|
print(f"流量三元组已导出: {triplets_file}")
|
|
|
|
|
|
# 仅保留CSV导出
|
|
|
|
|
|
return output_files
|
|
|
|
|
|
def print_summary(self):
|
|
|
"""打印预处理摘要"""
|
|
|
print("\n" + "="*60)
|
|
|
print("数据预处理摘要")
|
|
|
print("="*60)
|
|
|
|
|
|
if self.cleaned_df is not None:
|
|
|
print(f"清洗后数据记录数: {len(self.cleaned_df)}")
|
|
|
print(f"数据列: {list(self.cleaned_df.columns)}")
|
|
|
|
|
|
if self.flow_stats:
|
|
|
print(f"\n流量统计:")
|
|
|
print(f" 总流数: {self.flow_stats.get('total_flows', 0)}")
|
|
|
print(f" 总数据包数: {self.flow_stats.get('total_packets', 0)}")
|
|
|
print(f" 总字节数: {self.flow_stats.get('total_bytes', 0)}")
|
|
|
print(f" 总载荷字节数: {self.flow_stats.get('total_payload_bytes', 0)}")
|
|
|
|
|
|
print(f"\n协议分布:")
|
|
|
for protocol, count in self.flow_stats.get('protocol_distribution', {}).items():
|
|
|
print(f" {protocol}: {count}")
|
|
|
|
|
|
if self.triplets:
|
|
|
print(f"\n流量三元组:")
|
|
|
print(f" 三元组数量: {len(self.triplets)}")
|
|
|
print(f" 前5个最活跃的连接:")
|
|
|
for i, triplet in enumerate(self.triplets[:5]):
|
|
|
print(f" {i+1}. {triplet['source_ip']} -> {triplet['target_ip']} (权重: {triplet['weight']})")
|
|
|
|
|
|
print("="*60)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
print("多模态网络流图数据抽取 - 数据预处理流水线(修改权重计算)")
|
|
|
print("="*70)
|
|
|
|
|
|
# 查找最新的CSV文件
|
|
|
import glob
|
|
|
import os
|
|
|
csv_files = glob.glob("规范化结果_*.csv")
|
|
|
if not csv_files:
|
|
|
print("错误:找不到PCAP分析CSV文件")
|
|
|
return
|
|
|
|
|
|
# 使用最新的CSV文件
|
|
|
latest_csv = max(csv_files, key=os.path.getctime)
|
|
|
print(f"使用CSV文件: {latest_csv}")
|
|
|
|
|
|
try:
|
|
|
# 创建预处理器
|
|
|
preprocessor = DataPreprocessor(latest_csv)
|
|
|
|
|
|
# 执行预处理流水线
|
|
|
print("\n1. 加载数据...")
|
|
|
preprocessor.load_data()
|
|
|
|
|
|
print("\n2. 数据清洗...")
|
|
|
preprocessor.clean_data()
|
|
|
|
|
|
print("\n3. 流量聚合...")
|
|
|
preprocessor.aggregate_flows(time_window=60)
|
|
|
|
|
|
print("\n4. 导出结果...")
|
|
|
output_files = preprocessor.export_results()
|
|
|
|
|
|
print("\n5. 生成摘要...")
|
|
|
preprocessor.print_summary()
|
|
|
|
|
|
print(f"\n预处理完成!")
|
|
|
print("输出文件:")
|
|
|
for key, file_path in output_files.items():
|
|
|
print(f" {key}: {file_path}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"预处理过程中出错: {e}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|