skip to content

Deep Learning with Spatio Temportal Tactile Features

16 min read

In this blog post, I will learn and reproduce the results of the article Learning Spatio Temporal Tactile Features with a ConvLSTM for the Direction Of Slip Detection1.

Our main contribution is an extensive evaluation of the usefulness of spatio-temporal tactile features when employed to detect the direction of slip. This evaluation is made by applying a new type of recurrent neural network (ConvLSTM), which is highly specialised in modelling spatio-temporal data. We chose to learn both spatial and temporal tactile features for this task because it seems reasonable to believe that the direction of slip has characteristics that belong to both components: slips occur in time (temporal aspect) and take place on a particular area of the hand (spatial aspect).

Construct the Dataset

In order to learn features from the BioTac SP sensor, it is possible to build an array θN1\theta \in \mathbb{N}^1 that holds the readings from the 24 electrodes, such as θ={e1,e2,,e24}\theta=\left\{e_1, e_2, \ldots, e_{24}\right\}, where eie_i is the ii-th electrode. If this array is read in consecutive TT time steps, it is possible to create a sample Θ={θ1,θ2,,θT}\Theta=\left\{\theta^1, \theta^2, \ldots, \theta^T\right\} that holds TT tactile readings. A learning algorithm would then be able to learn temporal features if sufficient Θ\Theta samples were collected, correctly labelling each Θ\Theta with one of the slip directions considered.

Reproduce

Install Libraries

I used CodeX to generate requirements.txt:

numpy
pandas
scikit-learn
torch
matplotlib
livelossplot
joblib
scipy

Note: You may need to install a GPU version of Torch.

pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cu130

Extract Tactile Info

code\extract_tactile_info.py

"""
Convert CSV tactile recordings stored under csvs/<label>/ into npy arrays.
 
This replicates the preprocessing that `Extract Tactile Info.ipynb` performed,
but reads the already-exported CSVs instead of ROS bag files. The output
matches the expected layout for the downstream dataset generators:
    npys/<label>/<csv_stem>.npy  -> shape: (frames, fingers, electrodes)
"""
from __future__ import annotations
 
import argparse
import re
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Tuple
 
import numpy as np
import pandas as pd
 
 
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
DEFAULT_FINGER_PRIORITY: Sequence[str] = ("ff", "mf", "th")
 
 
def parse_finger_columns(
    columns: Iterable[str],
    electrodes: int,
    finger_priority: Sequence[str],
) -> Tuple[List[str], Dict[str, List[str]]]:
    """
    Detect finger prefixes in the CSV header and return an ordered mapping of
    finger -> column names sorted by electrode index.
    """
    regex = re.compile(r"^(?P<finger>[A-Za-z]+)_?e(?P<idx>\d+)$")
    finger_cols: Dict[str, List[Tuple[int, str]]] = {}
 
    for col in columns:
        match = regex.match(col)
        if not match:
            continue
        finger = match.group("finger")
        idx = int(match.group("idx"))
        finger_cols.setdefault(finger, []).append((idx, col))
 
    # Respect preferred finger ordering, then append any others alphabetically
    ordered_fingers: List[str] = []
    for finger in finger_priority:
        if finger in finger_cols:
            ordered_fingers.append(finger)
    for finger in sorted(finger_cols):
        if finger not in ordered_fingers:
            ordered_fingers.append(finger)
 
    ordered_columns: Dict[str, List[str]] = {}
    for finger in ordered_fingers:
        cols = sorted(finger_cols[finger], key=lambda item: item[0])
        ordered_columns[finger] = [col for _, col in cols[:electrodes]]
 
    return ordered_fingers, ordered_columns
 
 
def convert_csv(
    csv_path: Path,
    out_path: Path,
    electrodes: int,
    finger_priority: Sequence[str],
) -> Tuple[Tuple[int, int, int], List[str]]:
    """Read one CSV file and save it as a (frames, fingers, electrodes) npy."""
    df = pd.read_csv(csv_path)
    fingers, finger_columns = parse_finger_columns(
        df.columns, electrodes=electrodes, finger_priority=finger_priority
    )
 
    if not fingers:
        raise ValueError(f"No tactile columns detected in {csv_path}")
 
    frames = df.shape[0]
    data = np.zeros((frames, len(fingers), electrodes), dtype=np.int32)
 
    for finger_idx, finger in enumerate(fingers):
        cols = finger_columns[finger]
        if len(cols) == 0:
            continue
        finger_values = df[cols].to_numpy(dtype=np.int32)
        if finger_values.shape[1] < electrodes:
            # Pad missing electrodes with zeros to keep shapes consistent.
            padded = np.zeros((frames, electrodes), dtype=np.int32)
            padded[:, : finger_values.shape[1]] = finger_values
            finger_values = padded
        data[:, finger_idx, :] = finger_values[:, :electrodes]
 
    out_path.parent.mkdir(parents=True, exist_ok=True)
    np.save(out_path, data)
 
    return data.shape, fingers
 
 
def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Convert tactile CSV recordings into npy tensors "
        "compatible with the dataset generators."
    )
    parser.add_argument(
        "--csv-root",
        default="csvs",
        type=Path,
        help="Directory containing label subfolders with CSV files.",
    )
    parser.add_argument(
        "--output-root",
        default="npys",
        type=Path,
        help="Destination directory for generated npy files.",
    )
    parser.add_argument(
        "--labels",
        nargs="+",
        default=list(DEFAULT_LABELS),
        help="Which label subdirectories to process.",
    )
    parser.add_argument(
        "--electrodes",
        type=int,
        default=24,
        help="Number of electrodes per finger expected in each row.",
    )
    parser.add_argument(
        "--finger-order",
        nargs="+",
        default=list(DEFAULT_FINGER_PRIORITY),
        help="Preferred finger order when multiple prefixes are present. "
        "Any additional prefixes will be appended alphabetically.",
    )
    return parser
 
 
def main() -> None:
    args = build_arg_parser().parse_args()
 
    for label in args.labels:
        csv_dir = args.csv_root / label
        if not csv_dir.exists():
            print(f"[skip] {csv_dir} not found")
            continue
 
        csv_files = sorted(csv_dir.glob("*.csv"))
        if not csv_files:
            print(f"[skip] No CSV files under {csv_dir}")
            continue
 
        print(f"[label {label}] found {len(csv_files)} csv files")
        for csv_file in csv_files:
            out_file = args.output_root / label / f"{csv_file.stem}.npy"
            shape, fingers = convert_csv(
                csv_file,
                out_file,
                electrodes=args.electrodes,
                finger_priority=args.finger_order,
            )
            print(f"  saved {out_file} shape={shape} fingers={fingers}")
 
 
if __name__ == "__main__":
    main()
 
 
python3 code/extract_tactile_info.py --csv-root csvs --output-root npys --labels n s e w cw aw t

Generate Dataset

Array

code\generate_dataset_array.py

"""
Generate sequence datasets (array format) from tactile npys.
 
Replicates the logic of `Generate Dataset - Array.ipynb`, producing files that
`LSTM.ipynb` expects:
    datasets/<set_type>/<label>-<time_window>-ff-data.npy
    datasets/<set_type>/<label>-<time_window>-ff-labels.npy
"""
from __future__ import annotations
 
import argparse
from pathlib import Path
from typing import Iterable, List, Sequence
 
import numpy as np
 
 
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
 
 
def list_npys(npy_dir: Path, objects: Iterable[str]) -> List[Path]:
    files = sorted(npy_dir.glob("*.npy"))
    object_filters = list(objects)
    if not object_filters:
        return files
    filtered = []
    for file in files:
        if any(obj in file.name for obj in object_filters):
            filtered.append(file)
    return filtered
 
 
def ensure_3d(arr: np.ndarray) -> np.ndarray:
    """Force shape to (frames, fingers, electrodes)."""
    if arr.ndim == 3:
        return arr
    if arr.ndim == 2:
        return arr[:, None, :]
    raise ValueError(f"Unsupported array shape {arr.shape}")
 
 
def build_sequences(sensor_data: np.ndarray, time_window: int) -> np.ndarray:
    usable = (sensor_data.shape[0] // time_window) * time_window
    trimmed = sensor_data[:usable]
    return trimmed.reshape(-1, time_window, sensor_data.shape[1])
 
 
def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Create time-windowed tactile datasets in array form."
    )
    parser.add_argument(
        "--npys-root",
        default="npys",
        type=Path,
        help="Directory containing label subfolders with raw tactile npys.",
    )
    parser.add_argument(
        "--output-root",
        default="datasets",
        type=Path,
        help="Destination directory for generated dataset files.",
    )
    parser.add_argument(
        "--set-type",
        default="train",
        help="Subfolder under output where files will be written "
        "(e.g. train, test, test/textures).",
    )
    parser.add_argument(
        "--labels",
        nargs="+",
        default=list(DEFAULT_LABELS),
        help="Label names (and npys subfolders) to process.",
    )
    parser.add_argument(
        "--objects",
        nargs="*",
        default=[],
        help="If provided, only include npys whose filename contains any of "
        "these substrings.",
    )
    parser.add_argument(
        "--sensor-id",
        type=int,
        default=0,
        help="Finger index to keep (0-based).",
    )
    parser.add_argument(
        "--sensor-name",
        default="ff",
        help="String used in output filenames for compatibility with notebooks.",
    )
    parser.add_argument(
        "--time-window",
        type=int,
        default=5,
        help="Number of frames per sequence.",
    )
    return parser
 
 
def main() -> None:
    args = build_arg_parser().parse_args()
 
    for label_idx, label in enumerate(args.labels):
        npy_dir = args.npys_root / label
        if not npy_dir.exists():
            print(f"[skip] {npy_dir} not found")
            continue
 
        files = list_npys(npy_dir, args.objects)
        if not files:
            print(f"[skip] No npy files found for label {label}")
            continue
 
        print(f"[label {label}] using {len(files)} files")
        arrays = []
        for file in files:
            arr = ensure_3d(np.load(file))
            arrays.append(arr)
 
        if not arrays:
            print(f"  no frames found in {label}, skipping")
            continue
 
        concatenated = np.concatenate(arrays, axis=0)
 
        if args.sensor_id >= concatenated.shape[1]:
            raise IndexError(
                f"sensor-id {args.sensor_id} exceeds finger dimension "
                f"{concatenated.shape[1]} for label {label}"
            )
 
        sensor_data = concatenated[:, args.sensor_id, :]
        sequences = build_sequences(sensor_data, args.time_window)
        labels_np = np.full((sequences.shape[0], 1), label_idx, dtype=int)
 
        out_prefix = (
            args.output_root / args.set_type / f"{label}-{args.time_window}-{args.sensor_name}"
        )
        out_prefix.parent.mkdir(parents=True, exist_ok=True)
        np.save(f"{out_prefix}-data.npy", sequences)
        np.save(f"{out_prefix}-labels.npy", labels_np)
 
        print(
            f"  saved {out_prefix}-data.npy shape={sequences.shape}, "
            f"labels shape={labels_np.shape}"
        )
 
 
if __name__ == "__main__":
    main()
 
python3 code/generate_dataset_array.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --objects pen case sponge hd
python3 code/generate_dataset_array.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --objects pen case sponge hd

Image

code\generate_dataset_image.py

"""
Generate tactile image datasets from tactile npys.
 
This mirrors `Generate Dataset - Image.ipynb`, building 12x11 electrode maps
per frame and saving them under datasets/<set_type>/ for ConvLSTM workflows.
"""
from __future__ import annotations
 
import argparse
from pathlib import Path
from typing import Iterable, List, Sequence, Tuple
 
import numpy as np
 
 
DEFAULT_LABELS: Sequence[str] = ("n", "s", "e", "w", "cw", "aw", "t")
 
# Electrode layout replicated from the notebook
TACTILE_IMAGE_ROWS = 12
TACTILE_IMAGE_COLS = 11
ELECTRODES_INDEX_ROWS = np.array(
    [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 2, 3, 3, 5]
)
ELECTRODES_INDEX_COLS = np.array(
    [1, 2, 0, 1, 3, 0, 1, 4, 2, 1, 9, 8, 10, 9, 7, 10, 9, 6, 8, 9, 5, 4, 6, 5]
)
 
 
def list_npys(npy_dir: Path, objects: Iterable[str]) -> List[Path]:
    files = sorted(npy_dir.glob("*.npy"))
    object_filters = list(objects)
    if not object_filters:
        return files
    filtered = []
    for file in files:
        if any(obj in file.name for obj in object_filters):
            filtered.append(file)
    return filtered
 
 
def ensure_3d(arr: np.ndarray) -> np.ndarray:
    """Force shape to (frames, fingers, electrodes)."""
    if arr.ndim == 3:
        return arr
    if arr.ndim == 2:
        return arr[:, None, :]
    raise ValueError(f"Unsupported array shape {arr.shape}")
 
 
def get_neighbours(tactile_image: np.ndarray, cell_x: int, cell_y: int) -> List[float]:
    """Return the 8 neighbours of the target cell (with zero padding)."""
    pad = 2
    padded_x = cell_x + pad
    padded_y = cell_y + pad
 
    padded = np.pad(tactile_image, ((pad, pad), (pad, pad)), "constant")
 
    neighbours_xs = [
        padded_x - 1,
        padded_x - 1,
        padded_x - 1,
        padded_x,
        padded_x,
        padded_x + 1,
        padded_x + 1,
        padded_x + 1,
    ]
    neighbours_ys = [
        padded_y - 1,
        padded_y,
        padded_y + 1,
        padded_y - 1,
        padded_y + 1,
        padded_y - 1,
        padded_y,
        padded_y + 1,
    ]
 
    neighbours = []
    for some_x, some_y in zip(neighbours_xs, neighbours_ys):
        neighbours.append(padded[some_x, some_y])
 
    return neighbours
 
 
def zeros_to_mean(tactile_image: np.ndarray) -> np.ndarray:
    prev = tactile_image.copy()
    zero_xs, zero_ys = np.where(tactile_image == 0)
 
    for cell_x, cell_y in zip(zero_xs, zero_ys):
        neighs = get_neighbours(prev, cell_x, cell_y)
        neighs = [value for value in neighs if value > 0.0]
        if neighs:
            tactile_image[cell_x, cell_y] = np.mean(neighs)
 
    return tactile_image
 
 
def create_finger_tactile_image(finger_biotac: np.ndarray) -> np.ndarray:
    tactile_image = np.zeros(shape=(TACTILE_IMAGE_ROWS, TACTILE_IMAGE_COLS))
    tactile_image[ELECTRODES_INDEX_ROWS, ELECTRODES_INDEX_COLS] = finger_biotac
 
    tactile_image = zeros_to_mean(tactile_image)
    if np.min(tactile_image) == 0.0:
        tactile_image = zeros_to_mean(tactile_image)
 
    tactile_image = (tactile_image - np.min(tactile_image)) / (
        np.max(tactile_image) - np.min(tactile_image)
    )
 
    return tactile_image
 
 
def build_sequences(sensor_data: np.ndarray, time_window: int, step: int) -> np.ndarray:
    if step <= 0:
        usable = (sensor_data.shape[0] // time_window) * time_window
        trimmed = sensor_data[:usable]
        return trimmed.reshape(-1, time_window, sensor_data.shape[1])
 
    samples_idx = np.arange(sensor_data.shape[0])
    segments = int(len(samples_idx) / (step * time_window))
 
    dataset_aux = np.zeros((0, time_window, sensor_data.shape[1]))
    for i in range(segments):
        init = i * step * time_window
        end = init + step * time_window - step + 1
        sequence = samples_idx[init:end:step]
 
        index: List[np.ndarray] = []
        for j in sequence:
            index.append(samples_idx[j : j + step])
 
        index = np.array(index).T  # (step, time_window)
        dataset_aux = np.concatenate((dataset_aux, sensor_data[index]))
 
    return dataset_aux
 
 
def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Create tactile image datasets (12x11 grids) from tactile npys."
    )
    parser.add_argument(
        "--npys-root",
        default="npys",
        type=Path,
        help="Directory containing label subfolders with raw tactile npys.",
    )
    parser.add_argument(
        "--output-root",
        default="datasets",
        type=Path,
        help="Destination directory for generated dataset files.",
    )
    parser.add_argument(
        "--set-type",
        default="train",
        help="Subfolder under output where files will be written "
        "(e.g. train, test, test/textures).",
    )
    parser.add_argument(
        "--labels",
        nargs="+",
        default=list(DEFAULT_LABELS),
        help="Label names (and npys subfolders) to process.",
    )
    parser.add_argument(
        "--objects",
        nargs="*",
        default=[],
        help="If provided, only include npys whose filename contains any of "
        "these substrings.",
    )
    parser.add_argument(
        "--sensor-id",
        type=int,
        default=0,
        help="Finger index to keep (0-based).",
    )
    parser.add_argument(
        "--sensor-name",
        default="ff",
        help="String used in output filenames for compatibility with notebooks.",
    )
    parser.add_argument(
        "--time-window",
        type=int,
        default=5,
        help="Number of frames per sequence.",
    )
    parser.add_argument(
        "--step",
        type=int,
        default=0,
        help="Sampling step. When >0, mimic SAMPLE_WITH_STEP=True behaviour.",
    )
    return parser
 
 
def main() -> None:
    args = build_arg_parser().parse_args()
 
    for label_idx, label in enumerate(args.labels):
        npy_dir = args.npys_root / label
        if not npy_dir.exists():
            print(f"[skip] {npy_dir} not found")
            continue
 
        files = list_npys(npy_dir, args.objects)
        if not files:
            print(f"[skip] No npy files found for label {label}")
            continue
 
        print(f"[label {label}] using {len(files)} files")
        arrays = []
        for file in files:
            arr = ensure_3d(np.load(file))
            arrays.append(arr)
 
        if not arrays:
            print(f"  no frames found in {label}, skipping")
            continue
 
        concatenated = np.concatenate(arrays, axis=0)
 
        if args.sensor_id >= concatenated.shape[1]:
            raise IndexError(
                f"sensor-id {args.sensor_id} exceeds finger dimension "
                f"{concatenated.shape[1]} for label {label}"
            )
 
        sensor_data = concatenated[:, args.sensor_id, :]
        sequences = build_sequences(sensor_data, args.time_window, args.step)
 
        tactile_images = np.zeros(
            (
                sequences.shape[0],
                sequences.shape[1],
                TACTILE_IMAGE_ROWS,
                TACTILE_IMAGE_COLS,
            )
        )
 
        for sample_idx in range(sequences.shape[0]):
            for frame_idx in range(sequences.shape[1]):
                tactile_images[sample_idx, frame_idx] = create_finger_tactile_image(
                    sequences[sample_idx, frame_idx]
                )
 
        labels_np = np.full((sequences.shape[0], 1), label_idx, dtype=int)
 
        out_dir = args.output_root / args.set_type
        if args.step > 0:
            prefix = f"{label}-{args.step}-{args.time_window}-{args.sensor_name}"
        else:
            prefix = f"{label}-{args.time_window}-{args.sensor_name}"
 
        out_prefix = out_dir / prefix
 
        out_dir.mkdir(parents=True, exist_ok=True)
        np.save(f"{out_prefix}-data-image.npy", tactile_images)
        np.save(f"{out_prefix}-labels-image.npy", labels_np)
 
        print(
            f"  saved {out_prefix}-data-image.npy shape={tactile_images.shape}, "
            f"labels shape={labels_np.shape}"
        )
 
 
if __name__ == "__main__":
    main()
 
python3 code/generate_dataset_image.py --npys-root npys --output-root datasets --set-type train --sensor-id 0 --sensor-name ff --time-window 5 --step 0
python3 code/generate_dataset_image.py --npys-root npys --output-root datasets --set-type test --sensor-id 0 --sensor-name ff --time-window 5 --step 0

Train LSTM

System Information

  • CPU: Intel(R) Core(TM) i9-14900HX (32) @ 2.42 GHz
  • GPU: NVIDIA GeForce RTX 5060 Ti @ 3.09 GHz (7.62 GiB) [Discrete]
  • Memory: 31.64 GiB
  • Disk (C:): 300.00 GiB - NTFS
  • Disk (D:): 651.65 GiB - NTFS
屏幕截图 2025-12-10 004347

Import Libraries

import numpy as np
import math
 
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
 
import torch
import torch.nn as nn
import torch.nn.functional as F
 
from livelossplot import PlotLosses
import itertools
import matplotlib.pyplot as plt
%matplotlib inline

Confirm we have cuda as torch device:

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

Function for Plott Confusion Matrix

Define a helper to plot confusion matrices with optional normalization and styling for colors and axes.

#https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
 
    #print(cm)
 
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
 
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
 
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

Baseline LSTM

Declare global constants.

FOLDER_TEST = './datasets/test/'
FOLDER = './datasets/train/'
 
CLASSES = ['n', 's', 'e', 'w', 'cw', 'aw', 't']
 
WINDOW = 5
ELECTRODES = 24
VAL_SPLIT = 0.20

Load & Split Training Data

dataset = np.zeros((0, WINDOW, ELECTRODES))
labels = np.zeros((0))
    
print(dataset.shape)
print(labels.shape)
 
for class_name in CLASSES:
    file = FOLDER + class_name + '-' + str(WINDOW) + '-ff'
    
    print(file)
    
    class_data = np.load(file + '-data.npy')
    class_labels = np.load(file + '-labels.npy')
    
    print(class_data.shape)
    print(class_labels.shape)
    
    dataset = np.append(dataset, class_data, axis=0)
    labels = np.append(labels, np.reshape(class_labels, (class_labels.shape[0])), axis=0)
    
    print(dataset.shape)
    print(labels.shape)
 
# Normalize data! Really necessary in this dataset
x_min = dataset.min(axis=(2), keepdims=True)
x_max = dataset.max(axis=(2), keepdims=True)
 
dataset = (dataset - x_min)/(x_max - x_min)
    
data_train, data_val, labels_train, labels_val = train_test_split(dataset, labels, test_size=VAL_SPLIT,
                                                                    random_state=42)
 
print('Dataset split')
print(data_train.shape)
print(data_val.shape)
print(labels_train.shape)
print(labels_val.shape)

Plot a histogram of training labels:

plt.hist(labels_train, bins=len(CLASSES), density=True, facecolor='g', alpha=0.75)

Change for GPU:

data_val = torch.from_numpy(data_val)
data_val = data_val.to(device, dtype=torch.float)
 
labels_val = torch.from_numpy(labels_val)
labels_val = labels_val.to(device, dtype=torch.long)

Define & train LSTM

# Recurrent neural network (many-to-one)
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.fc1_bn = nn.BatchNorm1d(hidden_size)
        self.fc1_drop = nn.Dropout(p=dropout)
        self.fc2 = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        # Set initial hidden and cell states 
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        #out = F.relu(self.fc1_bn(self.fc1(out[:, -1, :])))
        out = F.relu(self.fc1(out[:, -1, :]))
        out = self.fc1_drop(out)
        out = self.fc2(out)
        
        return out

Set hyperparameters and create the model, cross-entropy loss, Adam optimizer, and step LR scheduler.

input_size = ELECTRODES
hidden_size = 32
num_layers = 3
num_classes = len(CLASSES)
dropout = 0.3
 
model = RNN(input_size, hidden_size, num_layers, num_classes, dropout=dropout).to(device)
 
# Loss and optimizer
learning_rate = 0.001
l2_reg = 0.001
step_size = 60
lr_decay = 0.5
 
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=l2_reg)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=lr_decay)

Run the training loop: shuffle batches, forward/backward passes, update weights, track loss/accuracy, evaluate on the validation set each epoch, and stream metrics to livelossplot.

epochs = 25
batch = 32
loss_history = []
loss_history_val = []
acc_history = []
acc_history_val = []
 
steps = math.ceil(data_train.shape[0] / batch)
print('Epochs:', epochs, 'Batch:', batch, 'Steps:', steps)
 
liveloss = PlotLosses()
 
for epoch in range(epochs):  # loop over the dataset multiple times
    print('===== epoch', epoch + 1, '=====')
        
    scheduler.step()
    epoch_loss = 0.0
    epoch_loss_val = 0.0
    epoch_correct = 0
    
    for param_group in optimizer.param_groups:
        print('>> lr:', param_group['lr'])
        
    # Shuffle dataset so epochs receive data in batches with different order
    data_train, labels_train = shuffle(data_train, labels_train)
    
    for i in range(steps):
        # get the inputs
        #batch, time, channels, height, width
        batch_data = torch.from_numpy(data_train[batch * i : batch * i + batch])
        batch_labels = torch.from_numpy(labels_train[batch * i : batch * i + batch])
        
        #print(batch_data.shape, batch_labels.shape)
        
        # Changed for using GPU
        batch_data = batch_data.to(device, dtype=torch.float)
        batch_labels = batch_labels.to(device, dtype=torch.long)
        
        # Forward pass
        outputs = model(batch_data)
        loss = criterion(outputs, batch_labels)
        
        epoch_loss += loss.item()
        epoch_correct += (outputs.max(1)[1] == batch_labels).sum().item()
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 10 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.6f}'.format(epoch + 1, epochs, i + 1, steps, loss.item()))
            
    loss_history.append(epoch_loss / steps)
    acc_history.append(epoch_correct / (steps * batch) * 100)
    # acc_history.append(epoch_correct.item() / (steps * batch) * 100)
    
    with torch.no_grad(): # We don't want to track gradients now because we are not training the network.
        outputs_val = model(data_val)
        loss = criterion(outputs_val, labels_val)
 
        epoch_loss_val = loss.item()
 
        _, predicted_val = torch.max(outputs_val.data, 1)
        total = labels_val.size(0)
        epoch_correct_val = (predicted_val == labels_val).sum().item()
 
    loss_history_val.append(epoch_loss_val)
    acc_history_val.append((epoch_correct_val / total) * 100)
 
    liveloss.update({
        'log loss': loss_history[-1],
        'val_log loss': loss_history_val[-1],
        'accuracy': acc_history[-1],
        'val_accuracy': acc_history_val[-1]
    })
    liveloss.draw()
accuracy
	training         	 (min:   34.994, max:   76.614, cur:   76.614)
	validation       	 (min:   52.614, max:   76.928, cur:   75.569)
log loss
	training         	 (min:    0.542, max:    1.527, cur:    0.542)
	validation       	 (min:    0.535, max:    1.107, cur:    0.589)

Confusion Matrix

Perform inference on the full validation set and print accuracy, precision, recall, and F1 scores.

with torch.no_grad():
    outputs_val = model(data_val)
_, predicted_val = torch.max(outputs_val, 1)
 
total = labels_val.size(0)
correct = (predicted_val == labels_val).sum().item()
score = correct / total
 
y_true = labels_val.cpu().numpy()
y_pred = predicted_val.cpu().numpy()
 
precision, recall, f1_score, _ = precision_recall_fscore_support(
    y_true, y_pred, average=None, pos_label=1
)
 
print("accuracy:", score * 100)
print("precision:", precision * 100)
print("recall:", recall * 100)
print("f1_score:", f1_score * 100)
accuracy: 70.66469719350074
precision: [100.          75.07788162  99.39393939  92.37113402  40.38461538
  57.35849057  27.38095238]
recall: [83.65180467 98.16700611 99.5951417  97.60348584 74.2632613  32.68817204
  9.27419355]
f1_score: [91.0982659  85.08384819 99.49443883 94.91525424 52.3183391  41.64383562
 13.85542169]
with torch.no_grad():
    outputs_val = model(data_val)
_, predicted_val = torch.max(outputs_val, 1)
 
y_true_val = labels_val.detach().cpu().numpy()
y_pred_val = predicted_val.detach().cpu().numpy()
 
cnf_matrix = confusion_matrix(y_true_val, y_pred_val)
 
plt.figure()
plot_confusion_matrix(cnf_matrix, normalize=False, classes=CLASSES, title='Confusion matrix')

Test Dataset

Load test npy data and labels per class, apply the same normalization, and aggregate into test arrays.

dataset_test = np.zeros((0, WINDOW, ELECTRODES))
labels_test = np.zeros((0))
    
print(dataset_test.shape)
print(labels_test.shape)
 
for class_name in CLASSES:
    file = FOLDER_TEST + class_name + '-' + str(WINDOW) + '-ff'
    
    print(file)
    
    class_data = np.load(file + '-data.npy')
    class_labels = np.load(file + '-labels.npy')
    
    print(class_data.shape)
    print(class_labels.shape)
    
    dataset_test = np.append(dataset_test, class_data, axis=0)
    labels_test = np.append(labels_test, np.reshape(class_labels, (class_labels.shape[0])), axis=0)
    
    print(dataset_test.shape)
    print(labels_test.shape)
 
# Normalize data! Really necessary in this dataset
x_min = dataset_test.min(axis=(2), keepdims=True)
x_max = dataset_test.max(axis=(2), keepdims=True)
 
dataset_test = (dataset_test - x_min)/(x_max - x_min)
plt.hist(labels_test, bins=len(CLASSES), density=True, facecolor='g', alpha=0.75)
# Changed for using GPU
dataset_test = torch.from_numpy(dataset_test)
dataset_test = dataset_test.to(device, dtype=torch.float)
 
labels_test = torch.from_numpy(labels_test)
labels_test = labels_test.to(device, dtype=torch.long)
with torch.no_grad():
    outputs_test = model(dataset_test)
_, predicted_test = torch.max(outputs_test.data, 1)
 
total_test = labels_test.size(0)
correct_test = (predicted_test == labels_test).sum().item()
score_test = correct_test / total_test
 
# 先转成 numpy
y_true_test = labels_test.detach().cpu().numpy()
y_pred_test = predicted_test.detach().cpu().numpy()
 
precision_test, recall_test, f1_score_test, _ = precision_recall_fscore_support(
    y_true_test, y_pred_test, average=None, pos_label=1
)
 
print("accuracy:", score_test * 100)
print("precision:", precision_test * 100)
print("recall:", recall_test * 100)
print("f1_score:", f1_score_test * 100)
accuracy: 77.21854304635761
precision: [97.93510324 81.47590361 97.12460064 98.07121662 68.06569343 54.21965318
 38.31578947]
recall: [100.          89.56953642 100.          99.8489426   56.25942685
  70.63253012  27.36842105]
f1_score: [98.95678092 85.33123028 98.54132901 98.95209581 61.60198183 61.34728581
 31.92982456]
cnf_matrix_test = confusion_matrix(y_true_test, y_pred_test)
 
plt.figure()
plot_confusion_matrix(cnf_matrix_test, normalize=False, classes=CLASSES, title='Confusion matrix TEST')

Footnotes

  1. B. S. Zapata-Impata, P. Gil, and F. Torres, “Tactile-Driven Grasp Stability and Slip Prediction,” Robotics, vol. 8, no. 4, p. 85, Dec. 2019, doi: 10.3390/robotics8040085.