算法端优化脚本及前端样例增设 #22

Merged
hnu202326010204 merged 4 commits from hufan_branch into develop 1 month ago

@ -1,18 +1,4 @@
"""Stable Diffusion 注意力热力图差异可视化工具 (可靠版 - 语义阶段聚合)。
本模块使用一种健壮的方法通过在 Stable Diffusion 扩散模型U-Net
**早期时间步 (语义阶段)** 捕获并累加交叉注意力权重这种方法能确保捕获到的
注意力图信号集中且可靠用于对比分析干净输入和扰动输入生成的图像对模型
注意力机制的影响差异
典型用法:
python eva_gen_heatmap.py \\
--model_path /path/to/sd_model \\
--image_path_a /path/to/clean_image.png \\
--image_path_b /path/to/noisy_image.png \\
--prompt_text "a photo of sks person" \\
--target_word "sks" \\
--output_dir output/heatmap_reports
"""Stable Diffusion 双模态注意力热力图差异可视化工具。
"""
# 通用参数解析与文件路径管理
@ -25,7 +11,6 @@ from typing import Dict, Any, List, Tuple
import torch
import torch.nn.functional as F
import numpy as np
import itertools
import warnings
# 可视化依赖
@ -48,29 +33,14 @@ warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
# ============== 核心模块:注意力捕获与聚合 ==============
# ============== 核心模块:双模态注意力捕获 ==============
class AttentionMapProcessor:
"""自定义注意力处理器,用于捕获 U-Net 交叉注意力层的权重。
通过替换原始的 `Attention` 模块处理器该类在模型前向传播过程中
将所有交叉注意力层的注意力权重`attention_probs`捕获并存储
Attributes:
attention_maps (Dict[str, List[torch.Tensor]]): 存储捕获到的注意力图
键为层名称值为该层在不同时间步捕获到的注意力图列表
pipeline (StableDiffusionPipeline): 正在处理的 Stable Diffusion 管线
original_processors (Dict[str, Any]): 存储原始的注意力处理器用于恢复
current_layer_name (Optional[str]): 当前正在处理的注意力层的名称
"""
"""自定义注意力处理器,用于同时捕获 U-Net 的交叉注意力和自注意力权重。"""
def __init__(self, pipeline: StableDiffusionPipeline):
"""初始化注意力处理器。
Args:
pipeline: Stable Diffusion 模型管线实例
"""
self.attention_maps: Dict[str, List[torch.Tensor]] = {}
self.cross_attention_maps: Dict[str, List[torch.Tensor]] = {}
self.self_attention_maps: Dict[str, List[torch.Tensor]] = {}
self.pipeline = pipeline
self.original_processors = {}
self.current_layer_name = None
@ -83,35 +53,18 @@ class AttentionMapProcessor:
encoder_hidden_states: torch.Tensor = None,
attention_mask: torch.Tensor = None
) -> torch.Tensor:
"""重载 __call__ 方法,执行注意力计算并捕获权重。
此方法替代了原始的 `Attention.processor`在计算交叉注意力时进行捕获
Args:
attn: 当前的 `Attention` 模块实例
hidden_states: U-Net 隐状态 (query)
encoder_hidden_states: 文本编码器输出 (key/value)即交叉注意力输入
attention_mask: 注意力掩码
Returns:
计算后的输出隐状态
"""
# 如果不是交叉注意力(即 encoder_hidden_states 为 None则调用原始处理器
if encoder_hidden_states is None:
return attn.processor(
attn, hidden_states, encoder_hidden_states, attention_mask
)
# 1. 计算 Q, K, V
"""重载执行注意力计算并捕获权重 (支持 Self 和 Cross)。"""
is_cross = encoder_hidden_states is not None
sequence_input = encoder_hidden_states if is_cross else hidden_states
query = attn.to_q(hidden_states)
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
key = attn.to_k(sequence_input)
value = attn.to_v(sequence_input)
# 2. 准备矩阵乘法
query = attn.head_to_batch_dim(query)
key = attn.head_to_batch_dim(key)
# 3. 计算 Attention Scores (Q @ K^T)
attention_scores = torch.baddbmm(
torch.empty(
query.shape[0], query.shape[1], key.shape[1],
@ -123,398 +76,328 @@ class AttentionMapProcessor:
alpha=attn.scale,
)
# 4. 计算 Attention Probabilities
attention_probs = attention_scores.softmax(dim=-1)
layer_name = self.current_layer_name
map_to_store = attention_probs.detach().cpu()
# 5. 存储捕获的注意力图
if layer_name not in self.attention_maps:
self.attention_maps[layer_name] = []
# 存储当前时间步的注意力权重
self.attention_maps[layer_name].append(attention_probs.detach().cpu())
if is_cross:
if layer_name not in self.cross_attention_maps:
self.cross_attention_maps[layer_name] = []
self.cross_attention_maps[layer_name].append(map_to_store)
else:
# 内存保护:仅捕获中低分辨率层的自注意力 (防止 4096*4096 矩阵爆内存)
spatial_size = map_to_store.shape[-2]
if spatial_size <= 1024:
if layer_name not in self.self_attention_maps:
self.self_attention_maps[layer_name] = []
self.self_attention_maps[layer_name].append(map_to_store)
# 6. 计算输出 (Attention @ V)
value = attn.head_to_batch_dim(value)
hidden_states = torch.bmm(attention_probs, value)
hidden_states = attn.batch_to_head_dim(hidden_states)
# 7. 输出层
hidden_states = attn.to_out[0](hidden_states)
hidden_states = attn.to_out[1](hidden_states)
return hidden_states
def _set_processors(self):
"""注册自定义处理器,捕获 U-Net 中所有交叉注意力层的权重。
遍历 U-Net 的所有子模块找到所有交叉注意力层`Attention` 且名称包含 `attn2`
并将其处理器替换为当前的实例
"""
for name, module in self.pipeline.unet.named_modules():
if isinstance(module, Attention) and 'attn2' in name:
# 存储原始处理器以便后续恢复
self.original_processors[name] = module.processor
# 定义一个新的闭包函数,用于在调用前设置当前层的名称
def set_layer_name(current_name):
def new_call(*args, **kwargs):
self.current_layer_name = current_name
return self.__call__(*args, **kwargs)
return new_call
module.processor = set_layer_name(name)
if isinstance(module, Attention):
if 'attn1' in name or 'attn2' in name:
self.original_processors[name] = module.processor
def set_layer_name(current_name):
def new_call(*args, **kwargs):
self.current_layer_name = current_name
return self.__call__(*args, **kwargs)
return new_call
module.processor = set_layer_name(name)
def remove(self):
"""恢复 U-Net 的原始注意力处理器,清理钩子。"""
for name, original_processor in self.original_processors.items():
module = self.pipeline.unet.get_submodule(name)
module.processor = original_processor
self.attention_maps = {}
self.cross_attention_maps = {}
self.self_attention_maps = {}
# ============== 聚合逻辑 ==============
def aggregate_word_attention(
def aggregate_cross_attention(
attention_maps: Dict[str, List[torch.Tensor]],
tokenizer: CLIPTokenizer,
target_word: str,
input_ids: torch.Tensor
) -> np.ndarray:
"""聚合所有层和语义时间步中目标词汇的注意力图,并进行归一化。
聚合步骤
1. 识别目标词汇对应的 Token 索引
2. 对每个层将所有捕获时间步的注意力图求平均
3. 提取目标 Token 对应的注意力子图并对 Token 维度求和 Attention Heads 求平均
4. 将不同分辨率的注意力图上采样到统一尺寸64x64
5. 对所有层的结果进行累加求和
6. 最终归一化到 [0, 1]
Args:
attention_maps: 包含各层和时间步捕获的注意力图的字典
tokenizer: CLIP 分词器实例
target_word: 需要聚焦的关键词
input_ids: Prompt 对应的 Token ID 张量
Returns:
最终聚合并上采样到 64x64 尺寸的注意力热力图 (NumPy 数组)
Raises:
ValueError: 如果无法在 Prompt 中找到目标词汇
RuntimeError: 如果未捕获到任何注意力数据
"""
# 1. 识别目标词汇的 Token 索引
prompt_tokens = tokenizer.convert_ids_to_tokens(
input_ids.squeeze().cpu().tolist()
)
"""聚合交叉注意力:关注 Prompt 中的特定 Target Word。"""
prompt_tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze().cpu().tolist())
target_lower = target_word.lower()
target_indices = []
for i, token in enumerate(prompt_tokens):
cleaned_token = token.replace('Ġ', '').replace('_', '').lower()
# 查找目标词汇或以目标词汇开头的 token 索引,并排除特殊 token
if (input_ids.squeeze()[i] not in tokenizer.all_special_ids and
(target_lower in cleaned_token or
cleaned_token.startswith(target_lower))):
(target_lower in cleaned_token or cleaned_token.startswith(target_lower))):
target_indices.append(i)
if not target_indices:
print(f"[WARN] 目标词汇 '{target_word}' 未识别。请检查 Prompt 或 Target Word")
raise ValueError("无法识别目标词汇的 token 索引。")
print(f"[WARN] Cross-Attn: 目标词汇 '{target_word}' 未识别。")
return np.zeros((64, 64))
# 2. 聚合逻辑
all_attention_data = []
# U-Net 输出的最大分辨率64x64总像素点数
TARGET_SPATIAL_SIZE = 4096
TARGET_MAP_SIZE = 64
for layer_name, step_maps in attention_maps.items():
if not step_maps:
if not step_maps: continue
avg_map = torch.stack(step_maps).mean(dim=0)
if avg_map.dim() == 4: avg_map = avg_map.squeeze(0)
target_map = avg_map[:, :, target_indices].sum(dim=-1).mean(dim=0).float()
if target_map.shape[0] != TARGET_SPATIAL_SIZE:
map_size = int(np.sqrt(target_map.shape[0]))
map_2d = target_map.reshape(map_size, map_size).unsqueeze(0).unsqueeze(0)
resized = F.interpolate(map_2d, size=(TARGET_MAP_SIZE, TARGET_MAP_SIZE), mode='bilinear', align_corners=False)
all_attention_data.append(resized.squeeze().flatten())
else:
all_attention_data.append(target_map)
if not all_attention_data: return np.zeros((64, 64))
final_map_flat = torch.stack(all_attention_data).sum(dim=0).cpu().numpy()
final_map_flat = final_map_flat / (final_map_flat.max() + 1e-6)
return final_map_flat.reshape(TARGET_MAP_SIZE, TARGET_MAP_SIZE)
def aggregate_self_attention(
attention_maps: Dict[str, List[torch.Tensor]]
) -> np.ndarray:
"""聚合自注意力:计算高频空间能量 (Laplacian High-Frequency Energy)。
原理
风格和纹理通常体现为注意力图中的高频变化
通过对每个 Query Attention Map 应用拉普拉斯算子Laplacian Kernel
我们可以提取出那些变化剧烈的区域边缘纹理接缝
最后聚合这些高频能量得到的图在空间结构上与原图对齐但亮度代表了纹理/风格复杂度
"""
all_attention_data = []
TARGET_MAP_SIZE = 64
# 定义拉普拉斯卷积核用于提取高频信息
laplacian_kernel = torch.tensor([
[0, 1, 0],
[1, -4, 1],
[0, 1, 0]
], dtype=torch.float32).view(1, 1, 3, 3)
for layer_name, step_maps in attention_maps.items():
if not step_maps: continue
# [Heads, H*W, H*W] -> [H*W, H*W] 取平均
avg_matrix = torch.stack(step_maps).mean(dim=0).mean(dim=0).float()
# 获取当前层尺寸
current_pixels = avg_matrix.shape[0]
map_size = int(np.sqrt(current_pixels))
# 如果尺寸太小,高频信息没有意义,跳过极小层
if map_size < 16:
continue
# 重塑为图像形式: [Batch(Pixels), Channels(1), H, W]
# 这里我们将 avg_matrix 视为:对于每一个 query pixel (行),它关注的 spatial map (列)
# 我们想知道每个 pixel 关注的区域是不是包含很多高频纹理
attn_maps = avg_matrix.reshape(current_pixels, 1, map_size, map_size) # [N, 1, H, W]
# 对该层捕获的所有时间步求平均,形状: (batch, heads, spatial_res, target_tokens_len)
avg_map_over_time = torch.stack(step_maps).mean(dim=0)
# 将 Kernel 移到同一设备
kernel = laplacian_kernel.to(avg_matrix.device)
# 移除批次维度 (假设 batch size = 1),形状: (heads, spatial_res, target_tokens_len)
attention_map = avg_map_over_time.squeeze(0)
# 批量卷积计算高频响应 (High-Pass Filter)
# padding=1 保持尺寸不变
high_freq_response = F.conv2d(attn_maps, kernel, padding=1)
# 提取目标 token 的注意力图。形状: (heads, spatial_res, target_indices_len)
target_token_maps = attention_map[:, :, target_indices]
# 计算能量 (取绝对值或平方),这里取绝对值代表梯度的强度
high_freq_energy = torch.abs(high_freq_response)
# 对目标 token 求和 (dim=-1),对注意力头求平均 (dim=0),形状: (spatial_res,)
aggregated_map_flat = target_token_maps.sum(dim=-1).mean(dim=0).float()
# 3. 跨分辨率上采样
if aggregated_map_flat.shape[0] != TARGET_SPATIAL_SIZE:
# 当前图的尺寸16x16 (256) 或 32x32 (1024)
map_size = int(np.sqrt(aggregated_map_flat.shape[0]))
map_2d = aggregated_map_flat.reshape(map_size, map_size)
map_to_interp = map_2d.unsqueeze(0).unsqueeze(0) # [1, 1, H, W]
# 使用双线性插值上采样到 64x64
resized_map_2d = F.interpolate(
map_to_interp,
size=(TARGET_MAP_SIZE, TARGET_MAP_SIZE),
mode='bilinear',
align_corners=False
)
resized_map_flat = resized_map_2d.squeeze().flatten()
all_attention_data.append(resized_map_flat)
else:
# 如果已经是 64x64直接使用
all_attention_data.append(aggregated_map_flat)
# 现在我们得到了 [N, 1, H, W] 的高频能量图。
# 我们需要将其聚合回一张 [H, W] 的图。
# 含义:对于图像上的位置 (i, j),其作为 Query 时,所关注的区域包含了多少高频信息?
# 或者:作为 Key 时,它贡献了多少高频信息?
# 这里采用 "Query-based Aggregation"
# 计算每个 Query pixel 对高频信息的总响应
# shape: [N, 1, H, W] -> sum(dim=(2,3)) -> [N]
# 这表示:位置 N 的像素,其注意力主要集中在高频纹理区域的程度。
spatial_score_flat = high_freq_energy.sum(dim=(2, 3)).squeeze() # [H*W]
# 归一化这一层的分数,防止数值爆炸
spatial_score_flat = spatial_score_flat / (spatial_score_flat.max() + 1e-6)
# 重塑为 2D 空间图
map_2d = spatial_score_flat.reshape(map_size, map_size).unsqueeze(0).unsqueeze(0)
# 插值统一到目标尺寸
resized = F.interpolate(map_2d, size=(TARGET_MAP_SIZE, TARGET_MAP_SIZE), mode='bilinear', align_corners=False)
all_attention_data.append(resized.squeeze().flatten())
if not all_attention_data:
raise RuntimeError("未捕获到注意力数据。可能模型或参数设置有误。")
if not all_attention_data: return np.zeros((64, 64))
# 4. 对所有层的结果进行累加 (求和)
# 聚合所有层
final_map_flat = torch.stack(all_attention_data).sum(dim=0).cpu().numpy()
# 5. 最终归一化到 [0, 1]
final_map_flat = final_map_flat / (final_map_flat.max() + 1e-6)
map_size = int(np.sqrt(final_map_flat.shape[0]))
final_map_np = final_map_flat.reshape(map_size, map_size) # 64x64
return final_map_np
# 最终归一化,保持 0-1 范围,方便可视化
final_map_flat = (final_map_flat - final_map_flat.min()) / (final_map_flat.max() - final_map_flat.min() + 1e-6)
return final_map_flat.reshape(TARGET_MAP_SIZE, TARGET_MAP_SIZE)
def get_attention_map_from_image(
def get_dual_attention_maps(
pipeline: StableDiffusionPipeline,
image_path: str,
prompt_text: str,
target_word: str
) -> Tuple[Image.Image, np.ndarray]:
"""执行多时间步前向传播,捕获指定图片和 Prompt 的注意力图。
通过只运行扩散过程中的语义阶段早期时间步来确保捕获到的注意力权重
具有高信号质量
Args:
pipeline: Stable Diffusion 模型管线实例
image_path: 待处理的输入图片路径
prompt_text: 用于生成图片的 Prompt 文本
target_word: 需要聚焦和可视化的关键词
Returns:
包含 (原始图片, 最终上采样后的注意力图) 的元组
"""
) -> Tuple[Image.Image, np.ndarray, np.ndarray]:
"""同时获取 Cross-Attention 和 Self-Attention 热力图。"""
print(f"\n-> 正在处理图片: {Path(image_path).name}")
image = Image.open(image_path).convert("RGB").resize((512, 512))
image_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]),
])
image_tensor = (
image_transform(image)
.unsqueeze(0)
.to(pipeline.device)
.to(pipeline.unet.dtype)
)
# 1. 编码到 Latent 空间
with torch.no_grad():
latent = (
pipeline.vae.encode(image_tensor).latent_dist.sample() *
pipeline.vae.config.scaling_factor
)
image_tensor = transforms.Compose([
transforms.ToTensor(), transforms.Normalize([0.5], [0.5])
])(image).unsqueeze(0).to(pipeline.device).to(pipeline.unet.dtype)
# 2. 编码 Prompt
text_input = pipeline.tokenizer(
prompt_text,
padding="max_length",
max_length=pipeline.tokenizer.model_max_length,
truncation=True,
return_tensors="pt"
)
input_ids = text_input.input_ids
with torch.no_grad():
latent = (pipeline.vae.encode(image_tensor).latent_dist.sample() * pipeline.vae.config.scaling_factor)
text_input = pipeline.tokenizer(prompt_text, padding="max_length", max_length=pipeline.tokenizer.model_max_length, truncation=True, return_tensors="pt")
with torch.no_grad():
# 获取文本嵌入
prompt_embeds = pipeline.text_encoder(
input_ids.to(pipeline.device)
)[0]
prompt_embeds = pipeline.text_encoder(text_input.input_ids.to(pipeline.device))[0]
# 3. 定义语义时间步
scheduler = pipeline.scheduler
# 设置扩散步数 (例如 50 步)
scheduler.set_timesteps(50, device=pipeline.device)
# 只选择语义最丰富的早期 10 步进行捕获
scheduler.set_timesteps(50, device=pipeline.device)
semantic_steps = scheduler.timesteps[:10]
print(f"-> 正在对语义阶段的 {len(semantic_steps)} 个时间步进行注意力捕获...")
processor = AttentionMapProcessor(pipeline)
try:
# 4. 运行多步 UNet Forward Pass
with torch.no_grad():
# 在选定的语义时间步上运行 U-Net 预测
for t in semantic_steps:
pipeline.unet(latent, t, prompt_embeds, return_dict=False)
# 5. 聚合捕获到的数据
raw_map_np = aggregate_word_attention(
processor.attention_maps,
pipeline.tokenizer,
target_word,
input_ids
cross_map_raw = aggregate_cross_attention(
processor.cross_attention_maps, pipeline.tokenizer, target_word, text_input.input_ids
)
self_map_raw = aggregate_self_attention(processor.self_attention_maps)
except Exception as e:
print(f"[ERROR] 注意力聚合失败: {e}")
# 确保清理钩子
raw_map_np = np.zeros(image.size)
# import traceback
# traceback.print_exc()
cross_map_raw = np.zeros((64, 64))
self_map_raw = np.zeros((64, 64))
finally:
processor.remove()
# 6. 注意力图上采样到图片尺寸 (512x512)
# PIL 进行上采样
heat_map_pil = Image.fromarray((raw_map_np * 255).astype(np.uint8))
heat_map_np_resized = (
np.array(heat_map_pil.resize(
image.size,
resample=Image.Resampling.LANCZOS # 使用高质量的 Lanczos 滤波器
)) / 255.0
)
def upsample(map_np):
pil_img = Image.fromarray((map_np * 255).astype(np.uint8))
return np.array(pil_img.resize(image.size, resample=Image.Resampling.LANCZOS)) / 255.0
return image, heat_map_np_resized
return image, upsample(cross_map_raw), upsample(self_map_raw)
def main():
"""主函数,负责解析参数,加载模型,计算差异并生成可视化报告。"""
parser = argparse.ArgumentParser(description="SD 图片注意力差异可视化报告生成")
parser.add_argument("--model_path", type=str, required=True,
help="Stable Diffusion 模型本地路径。")
parser.add_argument("--image_path_a", type=str, required=True,
help="干净输入图片 (X) 路径。")
parser.add_argument("--image_path_b", type=str, required=True,
help="扰动输入图片 (X') 路径。")
parser.add_argument("--prompt_text", type=str, default="a photo of sks person",
help="用于生成图片的 Prompt 文本。")
parser.add_argument("--target_word", type=str, default="sks",
help="需要在注意力图中聚焦和可视化的关键词。")
parser.add_argument("--output_dir", type=str, default="output",
help="报告 PNG 文件的输出目录。")
parser = argparse.ArgumentParser(description="SD 双模态注意力差异分析报告")
parser.add_argument("--model_path", type=str, required=True, help="Stable Diffusion 模型路径")
parser.add_argument("--image_path_a", type=str, required=True, help="Clean Image")
parser.add_argument("--image_path_b", type=str, required=True, help="Noisy Image")
parser.add_argument("--prompt_text", type=str, default="a photo of sks person")
parser.add_argument("--target_word", type=str, default="sks")
parser.add_argument("--output_dir", type=str, default="output")
args = parser.parse_args()
print(f"--- 正在生成 Stable Diffusion 注意力差异报告 ---")
print(f"--- 正在生成 Museguard 双模态分析报告 (High-Freq Energy Mode) ---")
# ---------------- 准备模型 ----------------
device = 'cuda' if torch.cuda.is_available() else 'cpu'
dtype = torch.float16 if device == 'cuda' else torch.float32
try:
# 加载 Stable Diffusion 管线
pipe = StableDiffusionPipeline.from_pretrained(
args.model_path,
torch_dtype=dtype,
local_files_only=True,
safety_checker=None,
# 从子文件夹加载调度器配置
args.model_path, torch_dtype=dtype, local_files_only=True, safety_checker=None,
scheduler=DPMSolverMultistepScheduler.from_pretrained(args.model_path, subfolder="scheduler")
).to(device)
except Exception as e:
print(f"[ERROR] 模型加载失败,请检查路径和环境依赖: {e}")
return
# ---------------- 获取数据 ----------------
# 获取干净图片 A 的注意力图 M_A
img_A, map_A = get_attention_map_from_image(pipe, args.image_path_a, args.prompt_text, args.target_word)
# 获取扰动图片 B 的注意力图 M_B
img_B, map_B = get_attention_map_from_image(pipe, args.image_path_b, args.prompt_text, args.target_word)
if map_A.shape != map_B.shape:
print("错误:注意力图尺寸不匹配。中止处理。")
return
# 计算差异图: Delta = M_A - M_B
diff_map = map_A - map_B
# 计算 L2 范数(差异距离)
l2_diff = np.linalg.norm(diff_map)
print(f"\n计算完毕,注意力图的 L2 范数差异值: {l2_diff:.4f}")
# ---------------- 绘制专业报告 ----------------
print(f"[ERROR] 模型加载失败: {e}"); return
img_A, cross_A, self_A = get_dual_attention_maps(pipe, args.image_path_a, args.prompt_text, args.target_word)
img_B, cross_B, self_B = get_dual_attention_maps(pipe, args.image_path_b, args.prompt_text, args.target_word)
diff_cross = cross_A - cross_B
l2_cross = np.linalg.norm(diff_cross)
diff_self = self_A - self_B
l2_self = np.linalg.norm(diff_self)
# 设置 Matplotlib 字体样式
plt.rcParams.update({
'font.family': 'serif',
'font.serif': ['DejaVu Serif', 'Times New Roman', 'serif'],
'mathtext.fontset': 'cm'
})
fig = plt.figure(figsize=(12, 16), dpi=120)
print(f"\nCross-Attn L2 Diff: {l2_cross:.4f}")
print(f"Self-Attn L2 Diff: {l2_self:.4f}")
# ---------------- 绘制增强版报告 ----------------
plt.rcParams.update({'font.family': 'serif', 'mathtext.fontset': 'cm'})
# 3行 x 4列 网格布局,用于图片和图例的精确控制
gs = gridspec.GridSpec(3, 4, figure=fig,
height_ratios=[1, 1, 1.3],
hspace=0.3, wspace=0.1)
fig = plt.figure(figsize=(14, 22), dpi=100)
gs = gridspec.GridSpec(4, 4, figure=fig, height_ratios=[1, 1, 1, 1.2], hspace=0.3, wspace=0.1)
# --- 第一行:原始图片 ---
# Row 1: Images
ax_img_a = fig.add_subplot(gs[0, 0:2])
ax_img_b = fig.add_subplot(gs[0, 2:4])
ax_img_a.imshow(img_A); ax_img_a.set_title("Clean Image ($X$)", fontsize=14); ax_img_a.axis('off')
ax_img_b.imshow(img_B); ax_img_b.set_title("Noisy Image ($X'$)", fontsize=14); ax_img_b.axis('off')
# Row 2: Cross Attention
ax_cA = fig.add_subplot(gs[1, 0:2])
ax_cB = fig.add_subplot(gs[1, 2:4])
ax_cA.imshow(cross_A, cmap='jet', vmin=0, vmax=1)
ax_cA.set_title(f"Cross-Attn ($M^{{cross}}_X$)\nTarget: \"{args.target_word}\"", fontsize=14); ax_cA.axis('off')
im_cB = ax_cB.imshow(cross_B, cmap='jet', vmin=0, vmax=1)
ax_cB.set_title(f"Cross-Attn ($M^{{cross}}_{{X'}}$)", fontsize=14); ax_cB.axis('off')
divider = make_axes_locatable(ax_cB)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im_cB, cax=cax, label='Semantic Alignment')
# 干净图片
ax_img_a.imshow(img_A)
ax_img_a.set_title(f"Clean Image ($X$)\nFilename: {Path(args.image_path_a).name}", fontsize=14, pad=10)
ax_img_a.axis('off')
# 扰动图片
ax_img_b.imshow(img_B)
ax_img_b.set_title(f"Noisy Image ($X'$)\nFilename: {Path(args.image_path_b).name}", fontsize=14, pad=10)
ax_img_b.axis('off')
# --- 第二行:注意力热力图 (Jet配色) ---
ax_map_a = fig.add_subplot(gs[1, 0:2])
ax_map_b = fig.add_subplot(gs[1, 2:4])
# 注意力图 A
im_map_a = ax_map_a.imshow(map_A, cmap='jet', vmin=0, vmax=1)
ax_map_a.set_title(f"Attention Heatmap ($M_X$)\nTarget: \"{args.target_word}\"", fontsize=14, pad=10)
ax_map_a.axis('off')
# 注意力图 B
im_map_b = ax_map_b.imshow(map_B, cmap='jet', vmin=0, vmax=1)
ax_map_b.set_title(f"Attention Heatmap ($M_{{X'}}$)\nTarget: \"{args.target_word}\"", fontsize=14, pad=10)
ax_map_b.axis('off')
# 为注意力图 B 绘制颜色指示条
divider = make_axes_locatable(ax_map_b)
cax_map = divider.append_axes("right", size="5%", pad=0.05)
cbar1 = fig.colorbar(im_map_b, cax=cax_map)
cbar1.set_label('Attention Intensity', fontsize=10)
# --- 第三行:差异对比 (完美居中) ---
# 差异图在网格的中间两列
ax_diff = fig.add_subplot(gs[2, 1:3])
vmax_diff = np.max(np.abs(diff_map))
# 使用 TwoSlopeNorm 确保 0 值位于色条中央
norm_diff = TwoSlopeNorm(vmin=-vmax_diff, vcenter=0., vmax=vmax_diff)
# 使用 Coolwarm 配色,蓝色表示负差异 (M_X' > M_X),红色表示正差异 (M_X > M_X')
im_diff = ax_diff.imshow(diff_map, cmap='coolwarm', norm=norm_diff)
# Row 3: Self Attention (High-Frequency Energy Mode)
ax_sA = fig.add_subplot(gs[2, 0:2])
ax_sB = fig.add_subplot(gs[2, 2:4])
title_text = (
r"Difference Map: $\Delta = M_X - M_{X'}$" +
f"\n$L_2$ Norm Distance: $\mathbf{{{l2_diff:.4f}}}$"
)
ax_diff.set_title(title_text, fontsize=16, pad=12)
ax_diff.axis('off')
# 差异图颜色指示条 (居中对齐)
cbar2 = fig.colorbar(im_diff, ax=ax_diff, fraction=0.046, pad=0.04)
cbar2.set_label(r'Scale: Red ($+$) $\leftrightarrow$ Blue ($-$)', fontsize=12)
# ---------------- 整体修饰与保存 ----------------
fig.suptitle(f"Museguard: SD Attention Analysis Report", fontsize=20, fontweight='bold', y=0.95)
# 恢复使用与 Cross Attention 一致的 'jet' colormap
ax_sA.imshow(self_A, cmap='jet', vmin=0, vmax=1)
ax_sA.set_title(f"Self-Attn ($M^{{self}}_X$)\nHigh-Freq Energy (Texture)", fontsize=14); ax_sA.axis('off')
output_filename = "heatmap_dif.png"
output_path = Path(args.output_dir) / output_filename
output_path.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(output_path, bbox_inches='tight', facecolor='white')
print(f"\n专业分析报告已保存至:\n{output_path.resolve()}")
im_sB = ax_sB.imshow(self_B, cmap='jet', vmin=0, vmax=1)
ax_sB.set_title(f"Self-Attn ($M^{{self}}_{{X'}}$)", fontsize=14); ax_sB.axis('off')
divider = make_axes_locatable(ax_sB)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im_sB, cax=cax, label='Texture Intensity')
# Row 4: Differences
ax_diff_c = fig.add_subplot(gs[3, 0:2])
ax_diff_s = fig.add_subplot(gs[3, 2:4])
vmax_c = max(np.max(np.abs(diff_cross)), 0.1)
norm_c = TwoSlopeNorm(vmin=-vmax_c, vcenter=0., vmax=vmax_c)
im_dc = ax_diff_c.imshow(diff_cross, cmap='coolwarm', norm=norm_c)
ax_diff_c.set_title(f"Cross Diff ($\Delta_{{cross}}$)\n$L_2$: {l2_cross:.4f}", fontsize=14); ax_diff_c.axis('off')
plt.colorbar(im_dc, ax=ax_diff_c, fraction=0.046, pad=0.04)
vmax_s = max(np.max(np.abs(diff_self)), 0.1)
norm_s = TwoSlopeNorm(vmin=-vmax_s, vcenter=0., vmax=vmax_s)
im_ds = ax_diff_s.imshow(diff_self, cmap='coolwarm', norm=norm_s)
ax_diff_s.set_title(f"Self Diff ($\Delta_{{self}}$)\n$L_2$: {l2_self:.4f}", fontsize=14); ax_diff_s.axis('off')
plt.colorbar(im_ds, ax=ax_diff_s, fraction=0.046, pad=0.04)
fig.suptitle(f"Museguard: Dual-Mode Analysis (High-Freq Energy)", fontsize=20, fontweight='bold', y=0.92)
out_path = Path(args.output_dir) / "dual_heatmap_report.png"
out_path.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(out_path, bbox_inches='tight', facecolor='white')
print(f"\n报告已保存至: {out_path}")
if __name__ == "__main__":
main()

@ -448,6 +448,40 @@ def generate_visual_report(
full_text += conclusion
# ---------------------------------------------------------------------
# 4. Metric definitions (ASCII-only / English-only to avoid font issues)
# ---------------------------------------------------------------------
metric_definitions = [
"",
"",
">>> METRIC DEFINITIONS (Detailed Explanations):",
"",
"1) FID (Frechet Inception Distance) [Goal: LOWER is better]",
" - Meaning: Measures how far the generated image distribution is from the reference image distribution in a deep feature space.",
" - What it represents: Overall realism + diversity at the dataset level; smaller means the generated set is closer to the reference set.",
"",
"2) SSIM (Structural Similarity Index) [Goal: HIGHER is better]",
" - Meaning: Compares two images using luminance, contrast, and structural similarity components.",
" - What it represents: Structural consistency (edges, textures, layouts); closer to 1 means more similar structure.",
"",
"3) PSNR (Peak Signal-to-Noise Ratio) [Goal: HIGHER is better]",
" - Meaning: Pixel-domain signal-to-noise measure derived from MSE, typically reported in dB.",
" - What it represents: Pixel-level closeness to the reference; higher means lower average pixel error.",
"",
"4) FDS (Face Detection Similarity) [Goal: HIGHER is better]",
" - Meaning: Face-identity similarity based on detected face embeddings.",
" - What it represents: Whether the generated faces preserve identity-like characteristics relative to the reference set; higher means more similar identity features.",
"",
"5) CLIP_IQS (CLIP Image Quality Score; text prompt = 'good image') [Goal: HIGHER is better]",
" - Meaning: Similarity between the image embedding and the embedding of the text concept 'good image' in CLIP space.",
" - What it represents: A coarse proxy of \"looks like a good image\" according to CLIP priors (semantic/aesthetic heuristic).",
"",
"6) BRISQUE (Blind/Referenceless Image Spatial Quality Evaluator) [Goal: LOWER is better]",
" - Meaning: A no-reference image quality metric based on natural scene statistics (NSS) features.",
" - What it represents: Distortion level without using a reference image; lower is usually interpreted as better perceptual quality.",
]
full_text += "\n" + "\n".join(metric_definitions)
ax_data.text(
0.05,
0.30,

@ -83,21 +83,27 @@ def parse_args(input_args=None):
"--enable_xformers_memory_efficient_attention", action="store_true", help="Whether or not to use xformers."
)
parser.add_argument(
'--eps',
"--eps",
type=float,
default=12.75,
help='pertubation budget'
help="pertubation budget",
)
parser.add_argument(
'--step_size',
"--step_size",
type=float,
default=1/255,
help='step size of each update'
default=1 / 255,
help="step size of each update",
)
parser.add_argument(
'--attack_type',
choices=['var', 'mean', 'KL', 'add-log', 'latent_vector', 'add'],
help='what is the attack target'
"--save_every",
type=int,
default=25,
help="Save all perturbed images every N steps (default=25 to keep original behavior).",
)
parser.add_argument(
"--attack_type",
choices=["var", "mean", "KL", "add-log", "latent_vector", "add"],
help="what is the attack target",
)
if input_args is not None:
@ -108,9 +114,9 @@ def parse_args(input_args=None):
env_local_rank = int(os.environ.get("LOCAL_RANK", -1))
if env_local_rank != -1 and env_local_rank != args.local_rank:
args.local_rank = env_local_rank
return args
class PIDDataset(Dataset):
"""
@ -118,20 +124,18 @@ class PIDDataset(Dataset):
It pre-processes the images and the tokenizes prompts.
"""
def __init__(
self,
instance_data_root,
size=512,
center_crop=False
):
def __init__(self, instance_data_root, size=512, center_crop=False):
self.size = size
self.center_crop = center_crop
self.instance_images_path = list(Path(instance_data_root).iterdir())
self.num_instance_images = len(self.instance_images_path)
self.image_transforms = transforms.Compose([
self.image_transforms = transforms.Compose(
[
transforms.Resize(size, interpolation=transforms.InterpolationMode.BILINEAR),
transforms.CenterCrop(size) if center_crop else transforms.RandomCrop(size),
transforms.ToTensor(),])
transforms.ToTensor(),
]
)
def __len__(self):
return self.num_instance_images
@ -144,8 +148,8 @@ class PIDDataset(Dataset):
if not instance_image.mode == "RGB":
instance_image = instance_image.convert("RGB")
example['index'] = index % self.num_instance_images
example['pixel_values'] = self.image_transforms(instance_image)
example["index"] = index % self.num_instance_images
example["pixel_values"] = self.image_transforms(instance_image)
return example
@ -154,11 +158,10 @@ def main(args):
if args.seed is not None:
torch.manual_seed(args.seed)
weight_dtype = torch.float32
device = torch.device('cuda')
device = torch.device("cuda")
# VAE encoder
vae = AutoencoderKL.from_pretrained(
args.pretrained_model_name_or_path, subfolder="vae", revision=args.revision)
vae = AutoencoderKL.from_pretrained(args.pretrained_model_name_or_path, subfolder="vae", revision=args.revision)
vae.requires_grad_(False)
vae.to(device, dtype=weight_dtype)
@ -170,103 +173,105 @@ def main(args):
)
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=1, # some parts of code don't support batching
batch_size=1, # some parts of code don't support batching
shuffle=True,
num_workers=args.dataloader_num_workers,
)
# Wrapper of the perturbations generator
class AttackModel(torch.nn.Module):
def __init__(self):
super().__init__()
to_tensor = transforms.ToTensor()
self.epsilon = args.eps/255
self.delta = [torch.empty_like(to_tensor(Image.open(path))).uniform_(-self.epsilon, self.epsilon)
for path in dataset.instance_images_path]
self.epsilon = args.eps / 255
self.delta = [
torch.empty_like(to_tensor(Image.open(path))).uniform_(-self.epsilon, self.epsilon)
for path in dataset.instance_images_path
]
self.size = dataset.size
def forward(self, vae, x, index, poison=False):
# Check whether we need to add perturbation
if poison:
self.delta[index].requires_grad_(True)
x = x + self.delta[index].to(dtype=weight_dtype)
# Normalize to [-1, 1]
input_x = 2 * x - 1
return vae.encode(input_x.to(device))
attackmodel = AttackModel()
# Just to zero-out the gradient
optimizer = torch.optim.SGD(attackmodel.delta, lr=0)
# Progress bar
progress_bar = tqdm(range(0, args.max_train_steps), desc="Steps")
# Make sure the dir exists
os.makedirs(args.output_dir, exist_ok=True)
# Start optimizing the perturbation
for step in progress_bar:
total_loss = 0.0
for batch in dataloader:
# Save images
if step%25 == 0:
# Save images (unchanged behavior by default: save_every=25)
if args.save_every > 0 and step % args.save_every == 0:
to_image = transforms.ToPILImage()
for i in range(0, len(dataset.instance_images_path)):
img = dataset[i]['pixel_values']
img = dataset[i]["pixel_values"]
img = to_image(img + attackmodel.delta[i])
img.save(os.path.join(args.output_dir, f"{i}.png"))
# Select target loss
clean_embedding = attackmodel(vae, batch['pixel_values'], batch['index'], False)
poison_embedding = attackmodel(vae, batch['pixel_values'], batch['index'], True)
clean_embedding = attackmodel(vae, batch["pixel_values"], batch["index"], False)
poison_embedding = attackmodel(vae, batch["pixel_values"], batch["index"], True)
clean_latent = clean_embedding.latent_dist
poison_latent = poison_embedding.latent_dist
if args.attack_type == 'var':
loss = F.mse_loss(clean_latent.std, poison_latent.std, reduction="mean")
elif args.attack_type == 'mean':
loss = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction="mean")
elif args.attack_type == 'KL':
if args.attack_type == "var":
loss = F.mse_loss(clean_latent.std, poison_latent.std, reduction="mean")
elif args.attack_type == "mean":
loss = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction="mean")
elif args.attack_type == "KL":
sigma_2, mu_2 = poison_latent.std, poison_latent.mean
sigma_1, mu_1 = clean_latent.std, clean_latent.mean
KL_diver = torch.log(sigma_2 / sigma_1) - 0.5 + (sigma_1 ** 2 + (mu_1 - mu_2) ** 2) / (2 * sigma_2 ** 2)
KL_diver = torch.log(sigma_2 / sigma_1) - 0.5 + (sigma_1**2 + (mu_1 - mu_2) ** 2) / (
2 * sigma_2**2
)
loss = KL_diver.flatten().mean()
elif args.attack_type == 'latent_vector':
elif args.attack_type == "latent_vector":
clean_vector = clean_latent.sample()
poison_vector = poison_latent.sample()
loss = F.mse_loss(clean_vector, poison_vector, reduction="mean")
elif args.attack_type == 'add':
loss_2 = F.mse_loss(clean_latent.std, poison_latent.std, reduction="mean")
loss_1 = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction="mean")
loss = F.mse_loss(clean_vector, poison_vector, reduction="mean")
elif args.attack_type == "add":
loss_2 = F.mse_loss(clean_latent.std, poison_latent.std, reduction="mean")
loss_1 = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction="mean")
loss = loss_1 + loss_2
elif args.attack_type == 'add-log':
elif args.attack_type == "add-log":
loss_1 = F.mse_loss(clean_latent.var.log(), poison_latent.var.log(), reduction="mean")
loss_2 = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction='mean')
loss_2 = F.mse_loss(clean_latent.mean, poison_latent.mean, reduction="mean")
loss = loss_1 + loss_2
optimizer.zero_grad()
loss.backward()
# Perform PGD update on the loss
delta = attackmodel.delta[batch['index']]
# Perform PGD update on the loss (make --step_size effective)
delta = attackmodel.delta[batch["index"]]
delta.requires_grad_(False)
delta += delta.grad.sign() * 1/255
delta += delta.grad.sign() * args.step_size
delta = torch.clamp(delta, -attackmodel.epsilon, attackmodel.epsilon)
delta = torch.clamp(delta, -batch['pixel_values'].detach().cpu(), 1-batch['pixel_values'].detach().cpu())
attackmodel.delta[batch['index']] = delta.detach().squeeze(0)
delta = torch.clamp(delta, -batch["pixel_values"].detach().cpu(), 1 - batch["pixel_values"].detach().cpu())
attackmodel.delta[batch["index"]] = delta.detach().squeeze(0)
total_loss += loss.detach().cpu()
# Logging steps
logs = {"loss": total_loss.item()}
progress_bar.set_postfix(**logs)
if __name__ == "__main__":
args = parse_args()
main(args)
main(args)

@ -1,26 +1,26 @@
import argparse
import copy
import gc
import hashlib
import itertools
import logging
import os
import random
from pathlib import Path
import datasets
import diffusers
import random
from torch.backends import cudnn
import transformers
import numpy as np
import torch
import torch.nn.functional as F
import torch.utils.checkpoint
import transformers
from accelerate import Accelerator
from accelerate.logging import get_logger
from accelerate.utils import set_seed
from diffusers import AutoencoderKL, DDPMScheduler, DiffusionPipeline, UNet2DConditionModel
from diffusers.utils.import_utils import is_xformers_available
from PIL import Image
from torch.backends import cudnn
from torch.utils.data import Dataset
from torchvision import transforms
from tqdm.auto import tqdm
@ -30,8 +30,19 @@ from transformers import AutoTokenizer, PretrainedConfig
logger = get_logger(__name__)
def _cuda_gc() -> None:
"""Try to release unreferenced CUDA memory and reduce fragmentation.
This is a best-effort helper. It does not change algorithmic behavior but can
make long runs less prone to OOM due to fragmentation/reserved-memory growth.
"""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
class DreamBoothDatasetFromTensor(Dataset):
"""Just like DreamBoothDataset, but take instance_images_tensor instead of path"""
"""Just like DreamBoothDataset, but take instance_images_tensor instead of path."""
def __init__(
self,
@ -88,7 +99,7 @@ class DreamBoothDatasetFromTensor(Dataset):
if self.class_data_root:
class_image = Image.open(self.class_images_path[index % self.num_class_images])
if not class_image.mode == "RGB":
if class_image.mode != "RGB":
class_image = class_image.convert("RGB")
example["class_images"] = self.image_transforms(class_image)
example["class_prompt_ids"] = self.tokenizer(
@ -114,12 +125,11 @@ def import_model_class_from_model_name_or_path(pretrained_model_name_or_path: st
from transformers import CLIPTextModel
return CLIPTextModel
elif model_class == "RobertaSeriesModelWithTransformation":
if model_class == "RobertaSeriesModelWithTransformation":
from diffusers.pipelines.alt_diffusion.modeling_roberta_series import RobertaSeriesModelWithTransformation
return RobertaSeriesModelWithTransformation
else:
raise ValueError(f"{model_class} is not supported.")
raise ValueError(f"{model_class} is not supported.")
def parse_args(input_args=None):
@ -337,17 +347,13 @@ def parse_args(input_args=None):
"--max_steps",
type=int,
default=50,
help=(
"Maximum steps for adaptive greedy timestep selection."
),
help=("Maximum steps for adaptive greedy timestep selection."),
)
parser.add_argument(
"--delta_t",
type=int,
default=20,
help=(
"delete 2*delta_t for each adaptive greedy timestep selection."
),
help=("delete 2*delta_t for each adaptive greedy timestep selection."),
)
if input_args is not None:
args = parser.parse_args(input_args)
@ -358,7 +364,7 @@ def parse_args(input_args=None):
class PromptDataset(Dataset):
"A simple dataset to prepare the prompts to generate class images on multiple GPUs."
"""A simple dataset to prepare the prompts to generate class images on multiple GPUs."""
def __init__(self, prompt, num_samples):
self.prompt = prompt
@ -389,7 +395,6 @@ def load_data(data_dir, size=512, center_crop=True) -> torch.Tensor:
return images
def train_one_epoch(
args,
models,
@ -399,8 +404,6 @@ def train_one_epoch(
data_tensor: torch.Tensor,
num_steps=20,
):
# Load the tokenizer
unet, text_encoder = copy.deepcopy(models[0]), copy.deepcopy(models[1])
params_to_optimize = itertools.chain(unet.parameters(), text_encoder.parameters())
@ -422,7 +425,6 @@ def train_one_epoch(
args.center_crop,
)
# weight_dtype = torch.bfloat16
weight_dtype = torch.bfloat16
device = torch.device("cuda")
@ -443,24 +445,17 @@ def train_one_epoch(
latents = vae.encode(pixel_values).latent_dist.sample()
latents = latents * vae.config.scaling_factor
# Sample noise that we'll add to the latents
noise = torch.randn_like(latents)
bsz = latents.shape[0]
# Sample a random timestep for each image
timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device)
timesteps = timesteps.long()
# Add noise to the latents according to the noise magnitude at each timestep
# (this is the forward diffusion process)
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
# Get the text embedding for conditioning
encoder_hidden_states = text_encoder(input_ids)[0]
# Predict the noise residual
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
# Get the target for loss depending on the prediction type
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
@ -468,33 +463,39 @@ def train_one_epoch(
else:
raise ValueError(f"Unknown prediction type {noise_scheduler.config.prediction_type}")
# with prior preservation loss
if args.with_prior_preservation:
model_pred, model_pred_prior = torch.chunk(model_pred, 2, dim=0)
target, target_prior = torch.chunk(target, 2, dim=0)
# Compute instance loss
instance_loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
# Compute prior loss
prior_loss = F.mse_loss(model_pred_prior.float(), target_prior.float(), reduction="mean")
# Add the prior loss to the instance loss.
loss = instance_loss + args.prior_loss_weight * prior_loss
else:
prior_loss = torch.tensor(0.0, device=device)
instance_loss = torch.tensor(0.0, device=device)
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
loss.backward()
torch.nn.utils.clip_grad_norm_(params_to_optimize, 1.0, error_if_nonfinite=True)
optimizer.step()
optimizer.zero_grad()
print(
f"Step #{step}, loss: {loss.detach().item()}, prior_loss: {prior_loss.detach().item()}, instance_loss: {instance_loss.detach().item()}"
f"Step #{step}, loss: {loss.detach().item()}, prior_loss: {prior_loss.detach().item()}, "
f"instance_loss: {instance_loss.detach().item()}"
)
# Best-effort: free per-step tensors earlier (no behavior change).
del step_data, pixel_values, input_ids, latents, noise, timesteps, noisy_latents, encoder_hidden_states
del model_pred, target, loss, prior_loss, instance_loss
# Best-effort: release optimizer state + dataset refs sooner.
del optimizer, train_dataset, params_to_optimize
_cuda_gc()
return [unet, text_encoder]
def set_unet_attr(unet):
def conv_forward(self):
def forward(input_tensor, temb):
@ -505,7 +506,6 @@ def set_unet_attr(unet):
hidden_states = self.nonlinearity(hidden_states)
if self.upsample is not None:
# upsample_nearest_nhwc fails with large batch sizes. see https://github.com/huggingface/diffusers/issues/984
if hidden_states.shape[0] >= 64:
input_tensor = input_tensor.contiguous()
hidden_states = hidden_states.contiguous()
@ -538,37 +538,33 @@ def set_unet_attr(unet):
input_tensor = self.conv_shortcut(input_tensor)
output_tensor = (input_tensor + hidden_states) / self.output_scale_factor
return output_tensor
return forward
# [MODIFIED] 只 hook 算法实际使用到的 up_blocks[3]
conv_module_list = [
unet.up_blocks[3].resnets[0], unet.up_blocks[3].resnets[1], unet.up_blocks[3].resnets[2],
]
unet.up_blocks[3].resnets[0],
unet.up_blocks[3].resnets[1],
unet.up_blocks[3].resnets[2],
]
for conv_module in conv_module_list:
conv_module.forward = conv_forward(conv_module)
setattr(conv_module, 'in_layers_features', None)
setattr(conv_module, 'out_layers_features', None)
setattr(conv_module, "in_layers_features", None)
setattr(conv_module, "out_layers_features", None)
def save_feature_maps(up_blocks, down_blocks):
out_layers_features_list_3 = []
res_3_list =[0,1,2]
res_3_list = [0, 1, 2]
# [MODIFIED] 只提取 up_blocks[3] 的特征
block = up_blocks[3]
for index in res_3_list:
out_layers_features_list_3.append(block.resnets[index].out_layers_features)
out_layers_features_list_3 = torch.stack(out_layers_features_list_3, dim=0)
# [MODIFIED] 只返回算法实际使用到的特征
return out_layers_features_list_3
def pgd_attack(
args,
models,
@ -579,10 +575,13 @@ def pgd_attack(
original_images: torch.Tensor,
target_tensor: torch.Tensor,
num_steps: int,
time_list
time_list,
):
"""Return new perturbed data"""
"""Return new perturbed data.
Note: This function keeps the external behavior identical, but tries to reduce
memory pressure by freeing tensors early and avoiding lingering references.
"""
unet, text_encoder = models
weight_dtype = torch.bfloat16
device = torch.device("cuda")
@ -595,6 +594,7 @@ def pgd_attack(
perturbed_images = data_tensor.detach().clone()
perturbed_images.requires_grad_(True)
# Keep input_ids on CPU; move to GPU only when encoding.
input_ids = tokenizer(
args.instance_prompt,
truncation=True,
@ -604,12 +604,13 @@ def pgd_attack(
).input_ids.repeat(len(data_tensor), 1)
for step in range(num_steps):
perturbed_images.requires_grad = True
perturbed_images.requires_grad_(True)
latents = vae.encode(perturbed_images.to(device, dtype=weight_dtype)).latent_dist.sample()
latents = latents * vae.config.scaling_factor
# Sample noise that we'll add to the latents
noise = torch.randn_like(latents)
bsz = latents.shape[0]
timesteps = []
for i in range(len(data_tensor)):
ts = time_list[i]
@ -618,58 +619,62 @@ def pgd_attack(
timestep = timestep.long()
timesteps.append(timestep)
timesteps = torch.cat(timesteps).to(device)
# Add noise to the latents according to the noise magnitude at each timestep
# (this is the forward diffusion process)
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
# Get the text embedding for conditioning
encoder_hidden_states = text_encoder(input_ids.to(device))[0]
# Predict the noise residual
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
# Get the target for loss depending on the prediction type
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
target = noise_scheduler.get_velocity(latents, noise, timesteps)
else:
raise ValueError(f"Unknown prediction type {noise_scheduler.config.prediction_type}")
# [MODIFIED] feature loss (只解包需要的特征)
noise_out_layers_features_3 = save_feature_maps(unet.up_blocks, unet.down_blocks)
with torch.no_grad():
clean_latents = vae.encode(data_tensor.to(device, dtype=weight_dtype)).latent_dist.sample()
clean_latents = clean_latents * vae.config.scaling_factor
noisy_clean_latents = noise_scheduler.add_noise(clean_latents, noise, timesteps)
clean_model_pred = unet(noisy_clean_latents, timesteps, encoder_hidden_states).sample
# [MODIFIED] (只解包需要的特征)
clean_out_layers_features_3 = save_feature_maps(unet.up_blocks, unet.down_blocks)
# [LOGIC UNCHANGED] 目标损失函数不变
target_loss = F.mse_loss(noise_out_layers_features_3.float(), clean_out_layers_features_3.float(), reduction="mean")
unet.zero_grad()
text_encoder.zero_grad()
_ = unet(noisy_clean_latents, timesteps, encoder_hidden_states).sample
clean_out_layers_features_3 = save_feature_maps(unet.up_blocks, unet.down_blocks)
target_loss = F.mse_loss(
noise_out_layers_features_3.float(),
clean_out_layers_features_3.float(),
reduction="mean",
)
unet.zero_grad(set_to_none=True)
text_encoder.zero_grad(set_to_none=True)
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
loss = loss + target_loss.detach().item() # 保持原有的(奇怪的) loss.backward() 逻辑
# Keep original behavior: feature loss does not backprop (added as Python float).
loss = loss + target_loss.detach().item()
loss.backward()
alpha = args.pgd_alpha
eps = args.pgd_eps / 255
adv_images = perturbed_images + alpha * perturbed_images.grad.sign()
eta = torch.clamp(adv_images - original_images, min=-eps, max=+eps)
perturbed_images = torch.clamp(original_images + eta, min=-1, max=+1).detach_()
print(f"PGD loss - step {step}, loss: {loss.detach().item()}, target_loss : {target_loss.detach().item()}")
# [MODIFIED] 显式释放特征张量并清理缓存,以确保后续 train_one_epoch 有足够的显存
# 这部分代码在 PGD 循环结束后添加 (即在 return perturbed_images 之前)
del noise_out_layers_features_3
del clean_out_layers_features_3
del noise
del latents
del encoder_hidden_states
torch.cuda.empty_cache()
print(
f"PGD loss - step {step}, loss: {loss.detach().item()}, target_loss : {target_loss.detach().item()}"
)
# Best-effort: free per-step tensors early.
del latents, noise, timesteps, noisy_latents, encoder_hidden_states, model_pred, target
del noise_out_layers_features_3, clean_latents, noisy_clean_latents, clean_out_layers_features_3
del target_loss, loss, adv_images, eta
_cuda_gc()
return perturbed_images
def select_timestep(
args,
models,
@ -679,9 +684,11 @@ def select_timestep(
data_tensor: torch.Tensor,
original_images: torch.Tensor,
target_tensor: torch.Tensor,
):
"""Return new perturbed data"""
):
"""Return timestep lists for each image.
External behavior unchanged; add best-effort per-loop cleanup to lower memory pressure.
"""
unet, text_encoder = models
weight_dtype = torch.bfloat16
device = torch.device("cuda")
@ -693,7 +700,6 @@ def select_timestep(
perturbed_images = data_tensor.detach().clone()
perturbed_images.requires_grad_(True)
input_ids = tokenizer(
args.instance_prompt,
truncation=True,
@ -701,93 +707,39 @@ def select_timestep(
max_length=tokenizer.model_max_length,
return_tensors="pt",
).input_ids
time_list = []
for id in range(len(data_tensor)):
perturbed_image = perturbed_images[id, :].unsqueeze(0)
original_image = original_images[id, :].unsqueeze(0)
time_seq = torch.tensor(list(range(0, 1000)))
input_mask = torch.ones_like(time_seq)
id_image = perturbed_image.detach().clone()
for step in range(args.max_steps):
id_image.requires_grad_(True)
select_mask = torch.where(input_mask==1, True, False)
res_time_seq = torch.masked_select(time_seq, select_mask)
if len(res_time_seq) > 100:
min_score, max_score = 0.0, 0.0
for index in range(0, 5):
id_image.requires_grad_(True)
latents = vae.encode(id_image.to(device, dtype=weight_dtype)).latent_dist.sample()
latents = latents * vae.config.scaling_factor
# Sample noise that we'll add to the latents
noise = torch.randn_like(latents)
bsz = latents.shape[0]
# Sample a random timestep for each image
inner_index = torch.randint(0, len(res_time_seq), (bsz,))
timesteps = torch.IntTensor([res_time_seq[inner_index]]).to(device)
timesteps = timesteps.long()
# Add noise to the latents according to the noise magnitude at each timestep
# (this is the forward diffusion process)
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
# Get the text embedding for conditioning
encoder_hidden_states = text_encoder(input_ids.to(device))[0]
# Predict the noise residual
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
# Get the target for loss depending on the prediction type
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
target = noise_scheduler.get_velocity(latents, noise, timesteps)
else:
raise ValueError(f"Unknown prediction type {noise_scheduler.config.prediction_type}")
unet.zero_grad()
text_encoder.zero_grad()
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
loss.backward()
score = torch.sum(torch.abs(id_image.grad.data))
index = index + 1
id_image.grad.zero_()
if index == 1:
min_score = score
max_score = score
del_t = res_time_seq[inner_index].item()
select_t = res_time_seq[inner_index].item()
else:
if min_score > score:
min_score = score
del_t = res_time_seq[inner_index].item()
if max_score < score:
max_score = score
select_t = res_time_seq[inner_index].item()
print(f"PGD loss - step {step}, index : {index}, loss: {loss.detach().item()}, score: {score}, t : {res_time_seq[inner_index]}, ts_len: {len(res_time_seq)}")
print("del_t", del_t, "max_t", select_t)
if del_t < args.delta_t :
del_t = args.delta_t
elif del_t > (1000 - args.delta_t):
del_t= 1000 - args.delta_t
input_mask[del_t - 20: del_t + 20] = input_mask[del_t - 20: del_t + 20] - 1
input_mask = torch.clamp(input_mask, min=0, max=+1)
time_list = []
for img_id in range(len(data_tensor)):
perturbed_image = perturbed_images[img_id, :].unsqueeze(0)
original_image = original_images[img_id, :].unsqueeze(0)
time_seq = torch.tensor(list(range(0, 1000)))
input_mask = torch.ones_like(time_seq)
id_image = perturbed_image.detach().clone()
for step in range(args.max_steps):
id_image.requires_grad_(True)
select_mask = torch.where(input_mask == 1, True, False)
res_time_seq = torch.masked_select(time_seq, select_mask)
if len(res_time_seq) > 100:
min_score, max_score = 0.0, 0.0
for inner_try in range(0, 5):
id_image.requires_grad_(True)
latents = vae.encode(id_image.to(device, dtype=weight_dtype)).latent_dist.sample()
latents = latents * vae.config.scaling_factor
# Sample noise that we'll add to the latents
noise = torch.randn_like(latents)
bsz = latents.shape[0]
timesteps = torch.IntTensor([select_t]).to(device)
inner_index = torch.randint(0, len(res_time_seq), (bsz,))
timesteps = torch.IntTensor([res_time_seq[inner_index]]).to(device)
timesteps = timesteps.long()
# Add noise to the latents according to the noise magnitude at each timestep
# (this is the forward diffusion process)
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
# Get the text embedding for conditioning
encoder_hidden_states = text_encoder(input_ids.to(device))[0]
# Predict the noise residual
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
# Get the target for loss depending on the prediction type
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
@ -795,26 +747,92 @@ def select_timestep(
else:
raise ValueError(f"Unknown prediction type {noise_scheduler.config.prediction_type}")
unet.zero_grad()
text_encoder.zero_grad()
unet.zero_grad(set_to_none=True)
text_encoder.zero_grad(set_to_none=True)
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
loss.backward()
alpha = args.pgd_alpha
eps = args.pgd_eps / 255
adv_image = id_image + alpha * id_image.grad.sign()
eta = torch.clamp(adv_image - original_image, min=-eps, max=+eps)
score = torch.sum(torch.abs(id_image.grad.sign()))
id_image = torch.clamp(original_image + eta, min=-1, max=+1).detach_()
score = torch.sum(torch.abs(id_image.grad.data))
id_image.grad.zero_()
if inner_try == 0:
min_score = score
max_score = score
del_t = res_time_seq[inner_index].item()
select_t = res_time_seq[inner_index].item()
else:
if min_score > score:
min_score = score
del_t = res_time_seq[inner_index].item()
if max_score < score:
max_score = score
select_t = res_time_seq[inner_index].item()
print(
f"PGD loss - step {step}, index : {inner_try + 1}, loss: {loss.detach().item()}, "
f"score: {score}, t : {res_time_seq[inner_index]}, ts_len: {len(res_time_seq)}"
)
del latents, noise, timesteps, noisy_latents, encoder_hidden_states, model_pred, target, loss, score
print("del_t", del_t, "max_t", select_t)
if del_t < args.delta_t:
del_t = args.delta_t
elif del_t > (1000 - args.delta_t):
del_t = 1000 - args.delta_t
input_mask[del_t - 20 : del_t + 20] = input_mask[del_t - 20 : del_t + 20] - 1
input_mask = torch.clamp(input_mask, min=0, max=+1)
id_image.requires_grad_(True)
latents = vae.encode(id_image.to(device, dtype=weight_dtype)).latent_dist.sample()
latents = latents * vae.config.scaling_factor
noise = torch.randn_like(latents)
timesteps = torch.IntTensor([select_t]).to(device)
timesteps = timesteps.long()
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
encoder_hidden_states = text_encoder(input_ids.to(device))[0]
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
target = noise_scheduler.get_velocity(latents, noise, timesteps)
else:
# print(id, res_time_seq, step, len(res_time_seq))
time_list.append(res_time_seq)
break
raise ValueError(f"Unknown prediction type {noise_scheduler.config.prediction_type}")
unet.zero_grad(set_to_none=True)
text_encoder.zero_grad(set_to_none=True)
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
loss.backward()
alpha = args.pgd_alpha
eps = args.pgd_eps / 255
adv_image = id_image + alpha * id_image.grad.sign()
eta = torch.clamp(adv_image - original_image, min=-eps, max=+eps)
_ = torch.sum(torch.abs(id_image.grad.sign()))
id_image = torch.clamp(original_image + eta, min=-1, max=+1).detach_()
del latents, noise, timesteps, noisy_latents, encoder_hidden_states, model_pred, target, loss, adv_image, eta
else:
time_list.append(res_time_seq)
break
del perturbed_image, original_image, time_seq, input_mask, id_image
_cuda_gc()
del perturbed_images, input_ids
_cuda_gc()
return time_list
def setup_seeds():
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
@ -850,11 +868,11 @@ def main(args):
if args.seed is not None:
set_seed(args.seed)
setup_seeds()
# Generate class images if prior preservation is enabled.
if args.with_prior_preservation:
class_images_dir = Path(args.class_data_dir)
if not class_images_dir.exists():
class_images_dir.mkdir(parents=True)
class_images_dir.mkdir(parents=True, exist_ok=True)
cur_class_images = len(list(class_images_dir.iterdir()))
if cur_class_images < args.num_class_images:
@ -865,12 +883,12 @@ def main(args):
torch_dtype = torch.float16
elif args.mixed_precision == "bf16":
torch_dtype = torch.bfloat16
pipeline = DiffusionPipeline.from_pretrained(
args.pretrained_model_name_or_path,
torch_dtype=torch_dtype,
safety_checker=None,
revision=args.revision,
)
pipeline.set_progress_bar_config(disable=True)
@ -889,27 +907,25 @@ def main(args):
disable=not accelerator.is_local_main_process,
):
images = pipeline(example["prompt"]).images
for i, image in enumerate(images):
hash_image = hashlib.sha1(image.tobytes()).hexdigest()
image_filename = class_images_dir / f"{example['index'][i] + cur_class_images}-{hash_image}.jpg"
image.save(image_filename)
del pipeline
if torch.cuda.is_available():
torch.cuda.empty_cache()
del pipeline, sample_dataset, sample_dataloader
_cuda_gc()
# import correct text encoder class
text_encoder_cls = import_model_class_from_model_name_or_path(args.pretrained_model_name_or_path, args.revision)
# Load scheduler and models
text_encoder = text_encoder_cls.from_pretrained(
args.pretrained_model_name_or_path,
subfolder="text_encoder",
revision=args.revision,
)
unet = UNet2DConditionModel.from_pretrained(
args.pretrained_model_name_or_path, subfolder="unet", revision=args.revision,
args.pretrained_model_name_or_path,
subfolder="unet",
revision=args.revision,
)
tokenizer = AutoTokenizer.from_pretrained(
@ -919,12 +935,13 @@ def main(args):
use_fast=False,
)
noise_scheduler = DDPMScheduler.from_pretrained(args.pretrained_model_name_or_path, subfolder="scheduler", )
noise_scheduler = DDPMScheduler.from_pretrained(args.pretrained_model_name_or_path, subfolder="scheduler")
vae = AutoencoderKL.from_pretrained(
args.pretrained_model_name_or_path, subfolder="vae", revision=args.revision,
args.pretrained_model_name_or_path,
subfolder="vae",
revision=args.revision,
).cuda()
vae.requires_grad_(False)
if not args.train_text_encoder:
@ -967,22 +984,23 @@ def main(args):
target_latent_tensor = target_latent_tensor.repeat(len(perturbed_data), 1, 1, 1).cuda()
f = [unet, text_encoder]
time_list = select_timestep(
args,
f,
tokenizer,
noise_scheduler,
vae,
perturbed_data,
original_data,
target_latent_tensor,
args,
f,
tokenizer,
noise_scheduler,
vae,
perturbed_data,
original_data,
target_latent_tensor,
)
for t in time_list:
print(t)
for i in range(args.max_train_steps):
# 1. f' = f.clone()
f_sur = copy.deepcopy(f)
f_sur = train_one_epoch(
args,
f_sur,
@ -992,6 +1010,7 @@ def main(args):
clean_data,
args.max_f_train_steps,
)
perturbed_data = pgd_attack(
args,
f_sur,
@ -1002,8 +1021,13 @@ def main(args):
original_data,
target_latent_tensor,
args.max_adv_train_steps,
time_list
time_list,
)
# Free surrogate ASAP (best-effort, behavior unchanged).
del f_sur
_cuda_gc()
f = train_one_epoch(
args,
f,
@ -1015,24 +1039,31 @@ def main(args):
)
if (i + 1) % args.checkpointing_iterations == 0:
save_folder = args.output_dir
save_folder = args.output_dir
os.makedirs(save_folder, exist_ok=True)
noised_imgs = perturbed_data.detach()
img_names = [
str(instance_path).split("/")[-1].split(".")[0]
for instance_path in list(Path(args.instance_data_dir_for_adversarial).iterdir())
]
for img_pixel, img_name in zip(noised_imgs, img_names):
save_path = os.path.join(save_folder, f"perturbed_{img_name}.png")
save_path = os.path.join(save_folder, f"perturbed_{img_name}.png")
Image.fromarray(
(img_pixel * 127.5 + 128).clamp(0, 255).to(torch.uint8).permute(1, 2, 0).cpu().numpy()
(img_pixel * 127.5 + 128)
.clamp(0, 255)
.to(torch.uint8)
.permute(1, 2, 0)
.cpu()
.numpy()
).save(save_path)
print(f"Saved perturbed images at step {i+1} to {save_folder} (Files are overwritten)")
# Best-effort cleanup at the end of each outer iteration.
_cuda_gc()
if __name__ == "__main__":
args = parse_args()

@ -0,0 +1,57 @@
#需要环境conda activate pid
### Generate images protected by PID
export HF_HUB_OFFLINE=1
# 强制使用本地模型缓存,避免联网下载模型
### SD v2.1
# export HF_HOME="/root/autodl-tmp/huggingface_cache"
# export MODEL_PATH="stabilityai/stable-diffusion-2-1"
### SD v1.5
# export HF_HOME="/root/autodl-tmp/huggingface_cache"
# export MODEL_PATH="runwayml/stable-diffusion-v1-5"
export MODEL_PATH="../../static/hf_models/hub/models--runwayml--stable-diffusion-v1-5/snapshots/451f4fe16113bff5a5d2269ed5ad43b0592e9a14"
export TASKNAME="task001"
### Data to be protected
export INSTANCE_DIR="../../static/originals/${TASKNAME}"
### Path to save the protected data
export OUTPUT_DIR="../../static/perturbed/${TASKNAME}"
# ------------------------- 自动创建依赖路径 -------------------------
echo "Creating required directories..."
mkdir -p "$INSTANCE_DIR"
mkdir -p "$OUTPUT_DIR"
echo "Directories created successfully."
# ------------------------- 训练前清空 OUTPUT_DIR -------------------------
echo "Clearing output directory: $OUTPUT_DIR"
# 确保目录存在,避免清理命令失败
mkdir -p "$OUTPUT_DIR"
# 查找并删除目录下的所有文件和子目录(但不删除 . 或 ..
find "$OUTPUT_DIR" -mindepth 1 -delete
### Generation command
# --max_train_steps: Optimizaiton steps
# --attack_type: target loss to update, choices=['var', 'mean', 'KL', 'add-log', 'latent_vector', 'add'],
# Please refer to the file content for more usage
CUDA_VISIBLE_DEVICES=0 python ../algorithms/pid.py \
--pretrained_model_name_or_path=$MODEL_PATH \
--instance_data_dir=$INSTANCE_DIR \
--output_dir=$OUTPUT_DIR \
--resolution=512 \
--max_train_steps=2000 \
--center_crop \
--eps 10 \
--step_size 0.002 \
--save_every 200 \
--attack_type add-log \
--seed 0 \
--dataloader_num_workers 2

@ -0,0 +1,91 @@
#!/bin/bash
#=============================================================================
# Glaze 风格保护攻击脚本
# 用于保护艺术作品免受 AI 模型的风格模仿
#=============================================================================
### ===================== 环境配置 ===================== ###
export HF_HUB_OFFLINE=1
# 强制使用本地模型缓存,避免联网下载模型
export PYTHONWARNINGS="ignore"
# 忽略所有警告
export CUDA_LAUNCH_BLOCKING=0
# CUDA 异步执行
### ===================== 模型路径 ===================== ###
# Stable Diffusion v1.5 模型路径
export MODEL_PATH="../../static/hf_models/hub/models--runwayml--stable-diffusion-v1-5/snapshots/451f4fe16113bff5a5d2269ed5ad43b0592e9a14"
### ===================== 任务配置 ===================== ###
export TASKNAME="task003"
# 待保护的原始图像目录
export INSTANCE_DIR="../../static/originals/${TASKNAME}"
# 保护后图像的输出目录
export OUTPUT_DIR="../../static/perturbed/${TASKNAME}"
### ===================== 创建必要目录 ===================== ###
echo "=============================================="
echo " 艺术风格保护"
echo "=============================================="
echo ""
echo "创建必要目录..."
mkdir -p "$INSTANCE_DIR"
mkdir -p "$OUTPUT_DIR"
echo "目录创建完成。"
echo ""
### ===================== 清空输出目录 ===================== ###
echo "清空输出目录: $OUTPUT_DIR"
find "$OUTPUT_DIR" -mindepth 1 -delete 2>/dev/null || true
echo "输出目录已清空。"
echo ""
### ===================== 显示配置 ===================== ###
echo "当前配置:"
echo " - 模型路径: $MODEL_PATH"
echo " - 输入目录: $INSTANCE_DIR"
echo " - 输出目录: $OUTPUT_DIR"
echo " - 任务名称: $TASKNAME"
echo ""
# 目标风格 (target_style) 固定可选: Glaze 会将图像的 AI 特征向此风格偏移
# - "impressionism painting by van gogh" (默认,梵高印象派)
# - "abstract art by kandinsky" (康定斯基抽象艺术)
# - "cubism painting by picasso" (毕加索立体主义)
# - "oil painting in baroque style" (巴洛克风格油画)
### ===================== 运行 Glaze ===================== ###
echo "开始风格迁移保护..."
echo ""
CUDA_VISIBLE_DEVICES=0 python ../algorithms/glaze.py \
--pretrained_model_name_or_path=$MODEL_PATH \
--instance_data_dir=$INSTANCE_DIR \
--output_dir=$OUTPUT_DIR \
--resolution=512 \
--center_crop \
--max_train_steps=150 \
--eps=0.04 \
--target_style="impressionism painting by van gogh" \
--style_strength=0.75 \
--n_runs=3 \
--style_transfer_iter=15 \
--guidance_scale=7.5 \
--seed=42
echo ""
echo "=============================================="
echo " 艺术风格防护处理完成!"
echo " 输出保存至: $OUTPUT_DIR"
echo "=============================================="

@ -47,9 +47,9 @@ CUDA_VISIBLE_DEVICES=0 accelerate launch ../finetune_infras/train_ti_gen_trace.p
--instance_data_dir=$INSTANCE_DIR \
--output_dir=$TI_OUTPUT_DIR \
--validation_image_output_dir=$OUTPUT_INFER_DIR \
--placeholder_token="sks" \
--placeholder_token="<sks-concept>" \
--initializer_token="person" \
--instance_prompt="a photo of sks person" \
--instance_prompt="a photo of <sks-concept> person" \
--resolution=512 \
--train_batch_size=1 \
--gradient_accumulation_steps=1 \
@ -60,7 +60,7 @@ CUDA_VISIBLE_DEVICES=0 accelerate launch ../finetune_infras/train_ti_gen_trace.p
--checkpointing_steps=500 \
--seed=0 \
--mixed_precision=fp16 \
--validation_prompt="a photo of sks person" \
--validation_prompt="a close-up photo of <sks-concept> person" \
--num_validation_images 4 \
--validation_epochs 50 \
--coords_save_path="$COORD_DIR" \

Binary file not shown.

After

Width:  |  Height:  |  Size: 461 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 382 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 424 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 416 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 416 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 382 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 420 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 391 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 353 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 460 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 397 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 344 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 381 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 342 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 319 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 377 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 398 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 359 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 355 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 387 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 500 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 500 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 507 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 481 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 527 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 480 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 554 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 548 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 571 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 537 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 582 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 621 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 608 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 594 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 453 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 584 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 482 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 499 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 474 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 431 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 451 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 554 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 542 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 496 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 145 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 149 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 140 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 153 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 137 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 142 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 111 KiB

Loading…
Cancel
Save