In this blog post, I will learn and reproduce the results of the article Learning Spatio Temporal Tactile Features with a ConvLSTM for the Direction Of Slip Detection1.
Our main contribution is an extensive evaluation of the usefulness of spatio-temporal tactile features when employed to detect the direction of slip. This evaluation is made by applying a new type of recurrent neural network (ConvLSTM), which is highly specialised in modelling spatio-temporal data. We chose to learn both spatial and temporal tactile features for this task because it seems reasonable to believe that the direction of slip has characteristics that belong to both components: slips occur in time (temporal aspect) and take place on a particular area of the hand (spatial aspect).
Construct the Dataset
In order to learn features from the BioTac SP sensor, it is possible to build an array that holds the readings from the 24 electrodes, such as , where is the -th electrode. If this array is read in consecutive time steps, it is possible to create a sample that holds tactile readings. A learning algorithm would then be able to learn temporal features if sufficient samples were collected, correctly labelling each with one of the slip directions considered.
Reproduce
Install Libraries
I used CodeX to generate requirements.txt:
numpy
pandas
scikit-learn
torch
matplotlib
livelossplot
joblib
scipyNote: You may need to install a GPU version of Torch.
pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cu130Extract Tactile Info
code\extract_tactile_info.py
"""
Convert CSV tactile recordings stored under csvs/<label>/ into npy arrays.
This replicates the preprocessing that `Extract Tactile Info.ipynb` performed,
but reads the already-exported CSVs instead of ROS bag files. The output
matches the expected layout for the downstream dataset generators:
npys/<label>/<csv_stem>.npy -> shape: (frames, fingers, electrodes)
"""
from __future__ import annotations
import argparse
import re
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
import numpy as np
import pandas as pd
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
DEFAULT_FINGER_PRIORITY: Sequence[str] = ("ff", "mf", "th")
def parse_finger_columns(
columns: Iterable[str],
electrodes: int,
finger_priority: Sequence[str],
) -> Tuple[List[str], Dict[str, List[str]]]:
"""
Detect finger prefixes in the CSV header and return an ordered mapping of
finger -> column names sorted by electrode index.
"""
regex = re.compile(r"^(?P<finger>[A-Za-z]+)_?e(?P<idx>\d+)$")
finger_cols: Dict[str, List[Tuple[int, str]]] = {}
for col in columns:
match = regex.match(col)
if not match:
continue
finger = match.group("finger")
idx = int(match.group("idx"))
finger_cols.setdefault(finger, []).append((idx, col))
# Respect preferred finger ordering, then append any others alphabetically
ordered_fingers: List[str] = []
for finger in finger_priority:
if finger in finger_cols:
ordered_fingers.append(finger)
for finger in sorted(finger_cols):
if finger not in ordered_fingers:
ordered_fingers.append(finger)
ordered_columns: Dict[str, List[str]] = {}
for finger in ordered_fingers:
cols = sorted(finger_cols[finger], key=lambda item: item[0])
ordered_columns[finger] = [col for _, col in cols[:electrodes]]
return ordered_fingers, ordered_columns
def convert_csv(
csv_path: Path,
out_path: Path,
electrodes: int,
finger_priority: Sequence[str],
) -> Tuple[Tuple[int, int, int], List[str]]:
"""Read one CSV file and save it as a (frames, fingers, electrodes) npy."""
df = pd.read_csv(csv_path)
fingers, finger_columns = parse_finger_columns(
df.columns, electrodes=electrodes, finger_priority=finger_priority
)
if not fingers:
raise ValueError(f"No tactile columns detected in {csv_path}")
frames = df.shape[0]
data = np.zeros((frames, len(fingers), electrodes), dtype=np.int32)
for finger_idx, finger in enumerate(fingers):
cols = finger_columns[finger]
if len(cols) == 0:
continue
finger_values = df[cols].to_numpy(dtype=np.int32)
if finger_values.shape[1] < electrodes:
# Pad missing electrodes with zeros to keep shapes consistent.
padded = np.zeros((frames, electrodes), dtype=np.int32)
padded[:, : finger_values.shape[1]] = finger_values
finger_values = padded
data[:, finger_idx, :] = finger_values[:, :electrodes]
out_path.parent.mkdir(parents=True, exist_ok=True)
np.save(out_path, data)
return data.shape, fingers
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Convert tactile CSV recordings into npy tensors "
"compatible with the dataset generators."
)
parser.add_argument(
"--csv-root",
default="csvs",
type=Path,
help="Directory containing label subfolders with CSV files.",
)
parser.add_argument(
"--output-root",
default="npys",
type=Path,
help="Destination directory for generated npy files.",
)
parser.add_argument(
"--labels",
nargs="+",
default=list(DEFAULT_LABELS),
help="Which label subdirectories to process.",
)
parser.add_argument(
"--electrodes",
type=int,
default=24,
help="Number of electrodes per finger expected in each row.",
)
parser.add_argument(
"--finger-order",
nargs="+",
default=list(DEFAULT_FINGER_PRIORITY),
help="Preferred finger order when multiple prefixes are present. "
"Any additional prefixes will be appended alphabetically.",
)
return parser
def main() -> None:
args = build_arg_parser().parse_args()
for label in args.labels:
csv_dir = args.csv_root / label
if not csv_dir.exists():
print(f"[skip] {csv_dir} not found")
continue
csv_files = sorted(csv_dir.glob("*.csv"))
if not csv_files:
print(f"[skip] No CSV files under {csv_dir}")
continue
print(f"[label {label}] found {len(csv_files)} csv files")
for csv_file in csv_files:
out_file = args.output_root / label / f"{csv_file.stem}.npy"
shape, fingers = convert_csv(
csv_file,
out_file,
electrodes=args.electrodes,
finger_priority=args.finger_order,
)
print(f" saved {out_file} shape={shape} fingers={fingers}")
if __name__ == "__main__":
main()
python3 code/extract_tactile_info.py --csv-root csvs --output-root npys --labels n s e w cw aw tGenerate Dataset
Array
code\generate_dataset_array.py
"""
Generate sequence datasets (array format) from tactile npys.
Replicates the logic of `Generate Dataset - Array.ipynb`, producing files that
`LSTM.ipynb` expects:
datasets/<set_type>/<label>-<time_window>-ff-data.npy
datasets/<set_type>/<label>-<time_window>-ff-labels.npy
"""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Iterable, List, Sequence
import numpy as np
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
def list_npys(npy_dir: Path, objects: Iterable[str]) -> List[Path]:
files = sorted(npy_dir.glob("*.npy"))
object_filters = list(objects)
if not object_filters:
return files
filtered = []
for file in files:
if any(obj in file.name for obj in object_filters):
filtered.append(file)
return filtered
def ensure_3d(arr: np.ndarray) -> np.ndarray:
"""Force shape to (frames, fingers, electrodes)."""
if arr.ndim == 3:
return arr
if arr.ndim == 2:
return arr[:, None, :]
raise ValueError(f"Unsupported array shape {arr.shape}")
def build_sequences(sensor_data: np.ndarray, time_window: int) -> np.ndarray:
usable = (sensor_data.shape[0] // time_window) * time_window
trimmed = sensor_data[:usable]
return trimmed.reshape(-1, time_window, sensor_data.shape[1])
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Create time-windowed tactile datasets in array form."
)
parser.add_argument(
"--npys-root",
default="npys",
type=Path,
help="Directory containing label subfolders with raw tactile npys.",
)
parser.add_argument(
"--output-root",
default="datasets",
type=Path,
help="Destination directory for generated dataset files.",
)
parser.add_argument(
"--set-type",
default="train",
help="Subfolder under output where files will be written "
"(e.g. train, test, test/textures).",
)
parser.add_argument(
"--labels",
nargs="+",
default=list(DEFAULT_LABELS),
help="Label names (and npys subfolders) to process.",
)
parser.add_argument(
"--objects",
nargs="*",
default=[],
help="If provided, only include npys whose filename contains any of "
"these substrings.",
)
parser.add_argument(
"--sensor-id",
type=int,
default=0,
help="Finger index to keep (0-based).",
)
parser.add_argument(
"--sensor-name",
default="ff",
help="String used in output filenames for compatibility with notebooks.",
)
parser.add_argument(
"--time-window",
type=int,
default=5,
help="Number of frames per sequence.",
)
return parser
def main() -> None:
args = build_arg_parser().parse_args()
for label_idx, label in enumerate(args.labels):
npy_dir = args.npys_root / label
if not npy_dir.exists():
print(f"[skip] {npy_dir} not found")
continue
files = list_npys(npy_dir, args.objects)
if not files:
print(f"[skip] No npy files found for label {label}")
continue
print(f"[label {label}] using {len(files)} files")
arrays = []
for file in files:
arr = ensure_3d(np.load(file))
arrays.append(arr)
if not arrays:
print(f" no frames found in {label}, skipping")
continue
concatenated = np.concatenate(arrays, axis=0)
if args.sensor_id >= concatenated.shape[1]:
raise IndexError(
f"sensor-id {args.sensor_id} exceeds finger dimension "
f"{concatenated.shape[1]} for label {label}"
)
sensor_data = concatenated[:, args.sensor_id, :]
sequences = build_sequences(sensor_data, args.time_window)
labels_np = np.full((sequences.shape[0], 1), label_idx, dtype=int)
out_prefix = (
args.output_root / args.set_type / f"{label}-{args.time_window}-{args.sensor_name}"
)
out_prefix.parent.mkdir(parents=True, exist_ok=True)
np.save(f"{out_prefix}-data.npy", sequences)
np.save(f"{out_prefix}-labels.npy", labels_np)
print(
f" saved {out_prefix}-data.npy shape={sequences.shape}, "
f"labels shape={labels_np.shape}"
)
if __name__ == "__main__":
main()
python3 code/generate_dataset_array.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --objects pen case sponge hdpython3 code/generate_dataset_array.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --objects pen case sponge hdImage
code\generate_dataset_image.py
"""
Generate tactile image datasets from tactile npys.
This mirrors `Generate Dataset - Image.ipynb`, building 12x11 electrode maps
per frame and saving them under datasets/<set_type>/ for ConvLSTM workflows.
"""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Iterable, List, Sequence, Tuple
import numpy as np
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
# Electrode layout replicated from the notebook
TACTILE_IMAGE_ROWS = 12
TACTILE_IMAGE_COLS = 11
ELECTRODES_INDEX_ROWS = np.array(
[0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 2, 3, 3, 5]
)
ELECTRODES_INDEX_COLS = np.array(
[1, 2, 0, 1, 3, 0, 1, 4, 2, 1, 9, 8, 10, 9, 7, 10, 9, 6, 8, 9, 5, 4, 6, 5]
)
def list_npys(npy_dir: Path, objects: Iterable[str]) -> List[Path]:
files = sorted(npy_dir.glob("*.npy"))
object_filters = list(objects)
if not object_filters:
return files
filtered = []
for file in files:
if any(obj in file.name for obj in object_filters):
filtered.append(file)
return filtered
def ensure_3d(arr: np.ndarray) -> np.ndarray:
"""Force shape to (frames, fingers, electrodes)."""
if arr.ndim == 3:
return arr
if arr.ndim == 2:
return arr[:, None, :]
raise ValueError(f"Unsupported array shape {arr.shape}")
def get_neighbours(tactile_image: np.ndarray, cell_x: int, cell_y: int) -> List[float]:
"""Return the 8 neighbours of the target cell (with zero padding)."""
pad = 2
padded_x = cell_x + pad
padded_y = cell_y + pad
padded = np.pad(tactile_image, ((pad, pad), (pad, pad)), "constant")
neighbours_xs = [
padded_x - 1,
padded_x - 1,
padded_x - 1,
padded_x,
padded_x,
padded_x + 1,
padded_x + 1,
padded_x + 1,
]
neighbours_ys = [
padded_y - 1,
padded_y,
padded_y + 1,
padded_y - 1,
padded_y + 1,
padded_y - 1,
padded_y,
padded_y + 1,
]
neighbours = []
for some_x, some_y in zip(neighbours_xs, neighbours_ys):
neighbours.append(padded[some_x, some_y])
return neighbours
def zeros_to_mean(tactile_image: np.ndarray) -> np.ndarray:
prev = tactile_image.copy()
zero_xs, zero_ys = np.where(tactile_image == 0)
for cell_x, cell_y in zip(zero_xs, zero_ys):
neighs = get_neighbours(prev, cell_x, cell_y)
neighs = [value for value in neighs if value > 0.0]
if neighs:
tactile_image[cell_x, cell_y] = np.mean(neighs)
return tactile_image
def create_finger_tactile_image(finger_biotac: np.ndarray) -> np.ndarray:
tactile_image = np.zeros(shape=(TACTILE_IMAGE_ROWS, TACTILE_IMAGE_COLS))
tactile_image[ELECTRODES_INDEX_ROWS, ELECTRODES_INDEX_COLS] = finger_biotac
tactile_image = zeros_to_mean(tactile_image)
if np.min(tactile_image) == 0.0:
tactile_image = zeros_to_mean(tactile_image)
tactile_image = (tactile_image - np.min(tactile_image)) / (
np.max(tactile_image) - np.min(tactile_image)
)
return tactile_image
def build_sequences(sensor_data: np.ndarray, time_window: int, step: int) -> np.ndarray:
if step <= 0:
usable = (sensor_data.shape[0] // time_window) * time_window
trimmed = sensor_data[:usable]
return trimmed.reshape(-1, time_window, sensor_data.shape[1])
samples_idx = np.arange(sensor_data.shape[0])
segments = int(len(samples_idx) / (step * time_window))
dataset_aux = np.zeros((0, time_window, sensor_data.shape[1]))
for i in range(segments):
init = i * step * time_window
end = init + step * time_window - step + 1
sequence = samples_idx[init:end:step]
index: List[np.ndarray] = []
for j in sequence:
index.append(samples_idx[j : j + step])
index = np.array(index).T # (step, time_window)
dataset_aux = np.concatenate((dataset_aux, sensor_data[index]))
return dataset_aux
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Create tactile image datasets (12x11 grids) from tactile npys."
)
parser.add_argument(
"--npys-root",
default="npys",
type=Path,
help="Directory containing label subfolders with raw tactile npys.",
)
parser.add_argument(
"--output-root",
default="datasets",
type=Path,
help="Destination directory for generated dataset files.",
)
parser.add_argument(
"--set-type",
default="train",
help="Subfolder under output where files will be written "
"(e.g. train, test, test/textures).",
)
parser.add_argument(
"--labels",
nargs="+",
default=list(DEFAULT_LABELS),
help="Label names (and npys subfolders) to process.",
)
parser.add_argument(
"--objects",
nargs="*",
default=[],
help="If provided, only include npys whose filename contains any of "
"these substrings.",
)
parser.add_argument(
"--sensor-id",
type=int,
default=0,
help="Finger index to keep (0-based).",
)
parser.add_argument(
"--sensor-name",
default="ff",
help="String used in output filenames for compatibility with notebooks.",
)
parser.add_argument(
"--time-window",
type=int,
default=5,
help="Number of frames per sequence.",
)
parser.add_argument(
"--step",
type=int,
default=0,
help="Sampling step. When >0, mimic SAMPLE_WITH_STEP=True behaviour.",
)
return parser
def main() -> None:
args = build_arg_parser().parse_args()
for label_idx, label in enumerate(args.labels):
npy_dir = args.npys_root / label
if not npy_dir.exists():
print(f"[skip] {npy_dir} not found")
continue
files = list_npys(npy_dir, args.objects)
if not files:
print(f"[skip] No npy files found for label {label}")
continue
print(f"[label {label}] using {len(files)} files")
arrays = []
for file in files:
arr = ensure_3d(np.load(file))
arrays.append(arr)
if not arrays:
print(f" no frames found in {label}, skipping")
continue
concatenated = np.concatenate(arrays, axis=0)
if args.sensor_id >= concatenated.shape[1]:
raise IndexError(
f"sensor-id {args.sensor_id} exceeds finger dimension "
f"{concatenated.shape[1]} for label {label}"
)
sensor_data = concatenated[:, args.sensor_id, :]
sequences = build_sequences(sensor_data, args.time_window, args.step)
tactile_images = np.zeros(
(
sequences.shape[0],
sequences.shape[1],
TACTILE_IMAGE_ROWS,
TACTILE_IMAGE_COLS,
)
)
for sample_idx in range(sequences.shape[0]):
for frame_idx in range(sequences.shape[1]):
tactile_images[sample_idx, frame_idx] = create_finger_tactile_image(
sequences[sample_idx, frame_idx]
)
labels_np = np.full((sequences.shape[0], 1), label_idx, dtype=int)
out_dir = args.output_root / args.set_type
if args.step > 0:
prefix = f"{label}-{args.step}-{args.time_window}-{args.sensor_name}"
else:
prefix = f"{label}-{args.time_window}-{args.sensor_name}"
out_prefix = out_dir / prefix
out_dir.mkdir(parents=True, exist_ok=True)
np.save(f"{out_prefix}-data-image.npy", tactile_images)
np.save(f"{out_prefix}-labels-image.npy", labels_np)
print(
f" saved {out_prefix}-data-image.npy shape={tactile_images.shape}, "
f"labels shape={labels_np.shape}"
)
if __name__ == "__main__":
main()
python3 code/generate_dataset_image.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --step 0python3 code/generate_dataset_image.py --npys-root npys --output-root datasets --set-type test --sensor-id 0 --sensor-name ff --time-window 5 --step 0Train LSTM
System Information
- CPU: Intel(R) Core(TM) i9-14900HX (32) @ 2.42 GHz
- GPU: NVIDIA GeForce RTX 5060 Ti @ 3.09 GHz (7.62 GiB) [Discrete]
- Memory: 31.64 GiB
- Disk (C:): 300.00 GiB - NTFS
- Disk (D:): 651.65 GiB - NTFS
Import Libraries
import numpy as np
import math
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
import torch
import torch.nn as nn
import torch.nn.functional as F
from livelossplot import PlotLosses
import itertools
import matplotlib.pyplot as plt
%matplotlib inlineConfirm we have cuda as torch device:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)Function for Plott Confusion Matrix
Define a helper to plot confusion matrices with optional normalization and styling for colors and axes.
#https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
#print(cm)
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()Baseline LSTM
Declare global constants.
FOLDER_TEST = './datasets/test/'
FOLDER = './datasets/train/'
CLASSES = ['n', 's', 'e', 'w', 'cw', 'aw', 't']
WINDOW = 5
ELECTRODES = 24
VAL_SPLIT = 0.20Load & Split Training Data
dataset = np.zeros((0, WINDOW, ELECTRODES))
labels = np.zeros((0))
print(dataset.shape)
print(labels.shape)
for class_name in CLASSES:
file = FOLDER + class_name + '-' + str(WINDOW) + '-ff'
print(file)
class_data = np.load(file + '-data.npy')
class_labels = np.load(file + '-labels.npy')
print(class_data.shape)
print(class_labels.shape)
dataset = np.append(dataset, class_data, axis=0)
labels = np.append(labels, np.reshape(class_labels, (class_labels.shape[0])), axis=0)
print(dataset.shape)
print(labels.shape)
# Normalize data! Really necessary in this dataset
x_min = dataset.min(axis=(2), keepdims=True)
x_max = dataset.max(axis=(2), keepdims=True)
dataset = (dataset - x_min)/(x_max - x_min)
data_train, data_val, labels_train, labels_val = train_test_split(dataset, labels, test_size=VAL_SPLIT,
random_state=42)
print('Dataset split')
print(data_train.shape)
print(data_val.shape)
print(labels_train.shape)
print(labels_val.shape)Plot a histogram of training labels:
plt.hist(labels_train, bins=len(CLASSES), density=True, facecolor='g', alpha=0.75)
Change for GPU:
data_val = torch.from_numpy(data_val)
data_val = data_val.to(device, dtype=torch.float)
labels_val = torch.from_numpy(labels_val)
labels_val = labels_val.to(device, dtype=torch.long)Define & train LSTM
# Recurrent neural network (many-to-one)
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
self.fc1 = nn.Linear(hidden_size, hidden_size)
self.fc1_bn = nn.BatchNorm1d(hidden_size)
self.fc1_drop = nn.Dropout(p=dropout)
self.fc2 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# Set initial hidden and cell states
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# Forward propagate LSTM
out, _ = self.lstm(x, (h0, c0)) # out: tensor of shape (batch_size, seq_length, hidden_size)
# Decode the hidden state of the last time step
#out = F.relu(self.fc1_bn(self.fc1(out[:, -1, :])))
out = F.relu(self.fc1(out[:, -1, :]))
out = self.fc1_drop(out)
out = self.fc2(out)
return outSet hyperparameters and create the model, cross-entropy loss, Adam optimizer, and step LR scheduler.
input_size = ELECTRODES
hidden_size = 32
num_layers = 3
num_classes = len(CLASSES)
dropout = 0.3
model = RNN(input_size, hidden_size, num_layers, num_classes, dropout=dropout).to(device)
# Loss and optimizer
learning_rate = 0.001
l2_reg = 0.001
step_size = 60
lr_decay = 0.5
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=l2_reg)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=lr_decay)Run the training loop: shuffle batches, forward/backward passes, update weights, track loss/accuracy, evaluate on the validation set each epoch, and stream metrics to livelossplot.
epochs = 25
batch = 32
loss_history = []
loss_history_val = []
acc_history = []
acc_history_val = []
steps = math.ceil(data_train.shape[0] / batch)
print('Epochs:', epochs, 'Batch:', batch, 'Steps:', steps)
liveloss = PlotLosses()
for epoch in range(epochs): # loop over the dataset multiple times
print('===== epoch', epoch + 1, '=====')
scheduler.step()
epoch_loss = 0.0
epoch_loss_val = 0.0
epoch_correct = 0
for param_group in optimizer.param_groups:
print('>> lr:', param_group['lr'])
# Shuffle dataset so epochs receive data in batches with different order
data_train, labels_train = shuffle(data_train, labels_train)
for i in range(steps):
# get the inputs
#batch, time, channels, height, width
batch_data = torch.from_numpy(data_train[batch * i : batch * i + batch])
batch_labels = torch.from_numpy(labels_train[batch * i : batch * i + batch])
#print(batch_data.shape, batch_labels.shape)
# Changed for using GPU
batch_data = batch_data.to(device, dtype=torch.float)
batch_labels = batch_labels.to(device, dtype=torch.long)
# Forward pass
outputs = model(batch_data)
loss = criterion(outputs, batch_labels)
epoch_loss += loss.item()
epoch_correct += (outputs.max(1)[1] == batch_labels).sum().item()
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 10 == 0:
print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.6f}'.format(epoch + 1, epochs, i + 1, steps, loss.item()))
loss_history.append(epoch_loss / steps)
acc_history.append(epoch_correct / (steps * batch) * 100)
# acc_history.append(epoch_correct.item() / (steps * batch) * 100)
with torch.no_grad(): # We don't want to track gradients now because we are not training the network.
outputs_val = model(data_val)
loss = criterion(outputs_val, labels_val)
epoch_loss_val = loss.item()
_, predicted_val = torch.max(outputs_val.data, 1)
total = labels_val.size(0)
epoch_correct_val = (predicted_val == labels_val).sum().item()
loss_history_val.append(epoch_loss_val)
acc_history_val.append((epoch_correct_val / total) * 100)
liveloss.update({
'log loss': loss_history[-1],
'val_log loss': loss_history_val[-1],
'accuracy': acc_history[-1],
'val_accuracy': acc_history_val[-1]
})
liveloss.draw()
accuracy
training (min: 34.994, max: 76.614, cur: 76.614)
validation (min: 52.614, max: 76.928, cur: 75.569)
log loss
training (min: 0.542, max: 1.527, cur: 0.542)
validation (min: 0.535, max: 1.107, cur: 0.589)Confusion Matrix
Perform inference on the full validation set and print accuracy, precision, recall, and F1 scores.
with torch.no_grad():
outputs_val = model(data_val)
_, predicted_val = torch.max(outputs_val, 1)
total = labels_val.size(0)
correct = (predicted_val == labels_val).sum().item()
score = correct / total
y_true = labels_val.cpu().numpy()
y_pred = predicted_val.cpu().numpy()
precision, recall, f1_score, _ = precision_recall_fscore_support(
y_true, y_pred, average=None, pos_label=1
)
print("accuracy:", score * 100)
print("precision:", precision * 100)
print("recall:", recall * 100)
print("f1_score:", f1_score * 100)accuracy: 70.66469719350074
precision: [100. 75.07788162 99.39393939 92.37113402 40.38461538
57.35849057 27.38095238]
recall: [83.65180467 98.16700611 99.5951417 97.60348584 74.2632613 32.68817204
9.27419355]
f1_score: [91.0982659 85.08384819 99.49443883 94.91525424 52.3183391 41.64383562
13.85542169]with torch.no_grad():
outputs_val = model(data_val)
_, predicted_val = torch.max(outputs_val, 1)
y_true_val = labels_val.detach().cpu().numpy()
y_pred_val = predicted_val.detach().cpu().numpy()
cnf_matrix = confusion_matrix(y_true_val, y_pred_val)
plt.figure()
plot_confusion_matrix(cnf_matrix, normalize=False, classes=CLASSES, title='Confusion matrix')
Test Dataset
Load test npy data and labels per class, apply the same normalization, and aggregate into test arrays.
dataset_test = np.zeros((0, WINDOW, ELECTRODES))
labels_test = np.zeros((0))
print(dataset_test.shape)
print(labels_test.shape)
for class_name in CLASSES:
file = FOLDER_TEST + class_name + '-' + str(WINDOW) + '-ff'
print(file)
class_data = np.load(file + '-data.npy')
class_labels = np.load(file + '-labels.npy')
print(class_data.shape)
print(class_labels.shape)
dataset_test = np.append(dataset_test, class_data, axis=0)
labels_test = np.append(labels_test, np.reshape(class_labels, (class_labels.shape[0])), axis=0)
print(dataset_test.shape)
print(labels_test.shape)
# Normalize data! Really necessary in this dataset
x_min = dataset_test.min(axis=(2), keepdims=True)
x_max = dataset_test.max(axis=(2), keepdims=True)
dataset_test = (dataset_test - x_min)/(x_max - x_min)plt.hist(labels_test, bins=len(CLASSES), density=True, facecolor='g', alpha=0.75)
# Changed for using GPU
dataset_test = torch.from_numpy(dataset_test)
dataset_test = dataset_test.to(device, dtype=torch.float)
labels_test = torch.from_numpy(labels_test)
labels_test = labels_test.to(device, dtype=torch.long)with torch.no_grad():
outputs_test = model(dataset_test)
_, predicted_test = torch.max(outputs_test.data, 1)
total_test = labels_test.size(0)
correct_test = (predicted_test == labels_test).sum().item()
score_test = correct_test / total_test
# 先转成 numpy
y_true_test = labels_test.detach().cpu().numpy()
y_pred_test = predicted_test.detach().cpu().numpy()
precision_test, recall_test, f1_score_test, _ = precision_recall_fscore_support(
y_true_test, y_pred_test, average=None, pos_label=1
)
print("accuracy:", score_test * 100)
print("precision:", precision_test * 100)
print("recall:", recall_test * 100)
print("f1_score:", f1_score_test * 100)accuracy: 77.21854304635761
precision: [97.93510324 81.47590361 97.12460064 98.07121662 68.06569343 54.21965318
38.31578947]
recall: [100. 89.56953642 100. 99.8489426 56.25942685
70.63253012 27.36842105]
f1_score: [98.95678092 85.33123028 98.54132901 98.95209581 61.60198183 61.34728581
31.92982456]cnf_matrix_test = confusion_matrix(y_true_test, y_pred_test)
plt.figure()
plot_confusion_matrix(cnf_matrix_test, normalize=False, classes=CLASSES, title='Confusion matrix TEST')
Footnotes
-
B. S. Zapata-Impata, P. Gil, and F. Torres, “Tactile-Driven Grasp Stability and Slip Prediction,” Robotics, vol. 8, no. 4, p. 85, Dec. 2019, doi: 10.3390/robotics8040085. ⤴