You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

144 lines
5.0 KiB

import numpy as np
import torch
def collate_fn(data, max_len=None):
"""Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create
Args:
data: len(batch_size) list of tuples (X, y).
- X: torch tensor of shape (seq_length, feat_dim); variable seq_length.
- y: torch tensor of shape (num_labels,) : class indices or numerical targets
(for classification or regression, respectively). num_labels > 1 for multi-task models
max_len: global fixed sequence length. Used for architectures requiring fixed length input,
where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s
Returns:
X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input)
targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output)
target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor
0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values
padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 means padding
"""
batch_size = len(data)
features, labels = zip(*data)
# Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension)
lengths = [
X.shape[0] for X in features
] # original sequence length for each time series
if max_len is None:
max_len = max(lengths)
X = torch.zeros(
batch_size, max_len, features[0].shape[-1]
) # (batch_size, padded_length, feat_dim)
for i in range(batch_size):
end = min(lengths[i], max_len)
X[i, :end, :] = features[i][:end, :]
targets = torch.stack(labels, dim=0) # (batch_size, num_labels)
padding_masks = padding_mask(
torch.tensor(lengths, dtype=torch.int16), max_len=max_len
) # (batch_size, padded_length) boolean tensor, "1" means keep
return X, targets, padding_masks
def padding_mask(lengths, max_len=None):
"""
Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths,
where 1 means keep element at this position (time step)
"""
batch_size = lengths.numel()
max_len = (
max_len or lengths.max_val()
) # trick works because of overloading of 'or' operator for non-boolean types
return (
torch.arange(0, max_len, device=lengths.device)
.type_as(lengths)
.repeat(batch_size, 1)
.lt(lengths.unsqueeze(1))
)
class Normalizer(object):
"""
Normalizes dataframe across ALL contained rows (time steps). Different from per-sample normalization.
"""
def __init__(
self,
norm_type="standardization",
mean=None,
std=None,
min_val=None,
max_val=None,
):
"""
Args:
norm_type: choose from:
"standardization", "minmax": normalizes dataframe across ALL contained rows (time steps)
"per_sample_std", "per_sample_minmax": normalizes each sample separately (i.e. across only its own rows)
mean, std, min_val, max_val: optional (num_feat,) Series of pre-computed values
"""
self.norm_type = norm_type
self.mean = mean
self.std = std
self.min_val = min_val
self.max_val = max_val
def normalize(self, df):
"""
Args:
df: input dataframe
Returns:
df: normalized dataframe
"""
if self.norm_type == "standardization":
if self.mean is None:
self.mean = df.mean()
self.std = df.std()
return (df - self.mean) / (self.std + np.finfo(float).eps)
elif self.norm_type == "minmax":
if self.max_val is None:
self.max_val = df.max()
self.min_val = df.min()
return (df - self.min_val) / (
self.max_val - self.min_val + np.finfo(float).eps
)
elif self.norm_type == "per_sample_std":
grouped = df.groupby(by=df.index)
return (df - grouped.transform("mean")) / grouped.transform("std")
elif self.norm_type == "per_sample_minmax":
grouped = df.groupby(by=df.index)
min_vals = grouped.transform("min")
return (df - min_vals) / (
grouped.transform("max") - min_vals + np.finfo(float).eps
)
else:
raise (NameError(f'Normalize method "{self.norm_type}" not implemented'))
def interpolate_missing(y):
"""
Replaces NaN values in pd.Series `y` using linear interpolation
"""
if y.isna().any():
y = y.interpolate(method="linear", limit_direction="both")
return y
def subsample(y, limit=256, factor=2):
"""
If a given Series is longer than `limit`, returns subsampled sequence by the specified integer factor
"""
if len(y) > limit:
return y[::factor].reset_index(drop=True)
return y