You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
4.7 KiB

# -*- coding: utf-8 -*-
"""
Created on 2024/9/28 10:35
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
@url: https://github.com/wwhenxuan/SymTime
"""
import os
import math
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
import torch
from typing import List
@torch.no_grad()
def concat_all_gather(tensor):
"""
Performs all_gather operation on the provided tensors.
*** Warning ***: torch.distributed.all_gather has no gradient.
"""
tensors_gather = [
torch.ones_like(tensor) for _ in range(torch.distributed.get_world_size())
]
torch.distributed.all_gather(tensors_gather, tensor, async_op=False)
output = torch.cat(tensors_gather, dim=0)
return output
def time_now():
"""Get the current formatted time"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
def makedir(directory: str, folder_name: str) -> None:
"""Function to create a folder in the specified directory"""
# Constructing the complete path
new_folder_path = os.path.join(directory, folder_name)
# Determine if the directory exists, if not create it
try:
if not os.path.exists(new_folder_path):
os.makedirs(new_folder_path)
except OSError as e:
print(f"Error creating folder: {e}")
plt.switch_backend("agg")
def adjust_learning_rate(optimizer, epoch, args):
# lr = args.learning_rate * (0.2 ** (epoch // 2))
if args.lradj == "type1":
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 1))}
elif args.lradj == "type2":
lr_adjust = {2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6, 10: 5e-7, 15: 1e-7, 20: 5e-8}
elif args.lradj == "cosine":
lr_adjust = {
epoch: args.learning_rate
/ 2
* (1 + math.cos(epoch / args.train_epochs * math.pi))
}
if epoch in lr_adjust.keys():
lr = lr_adjust[epoch]
for param_group in optimizer.param_groups:
param_group["lr"] = lr
print("Updating learning rate to {}".format(lr))
class EarlyStopping:
def __init__(self, patience=7, verbose=False, delta=0):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
def __call__(self, val_loss, model, path):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
elif score < self.best_score + self.delta:
self.counter += 1
print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
self.counter = 0
def save_checkpoint(self, val_loss, model, path):
if self.verbose:
print(
f"Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ..."
)
torch.save(model.state_dict(), path + "/" + "checkpoint.pth")
self.val_loss_min = val_loss
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class StandardScaler:
def __init__(self, mean, std):
self.mean = mean
self.std = std
def transform(self, data):
return (data - self.mean) / self.std
def inverse_transform(self, data):
return (data * self.std) + self.mean
def visual(true, preds=None, name="./pic/test.pdf"):
"""
Results visualization
"""
plt.figure()
plt.plot(true, label="GroundTruth", linewidth=2)
if preds is not None:
plt.plot(preds, label="Prediction", linewidth=2)
plt.legend()
plt.savefig(name, bbox_inches="tight")
def adjustment(gt, pred):
anomaly_state = False
for i in range(len(gt)):
if gt[i] == 1 and pred[i] == 1 and not anomaly_state:
anomaly_state = True
for j in range(i, 0, -1):
if gt[j] == 0:
break
else:
if pred[j] == 0:
pred[j] = 1
for j in range(i, len(gt)):
if gt[j] == 0:
break
else:
if pred[j] == 0:
pred[j] = 1
elif gt[i] == 0:
anomaly_state = False
if anomaly_state:
pred[i] = 1
return gt, pred
def cal_accuracy(y_pred, y_true):
return np.mean(y_pred == y_true)