File I/O & Grid Extraction

The hdsemg_shared.fileio module provides a interface to:

  • Load HD-sEMG data from MATLAB (.mat), OTB+ (.otb+, .otb) or OTB4 (.otb4) files
  • Automatically sanitize and reshape the data/time arrays
  • Extract electrode‐grid metadata (rows, columns, IED, reference channels, etc.)
  • Cache remote grid‐configuration JSON for one week
  • Save back to .mat if needed

Core Types

EMGFile

from hdsemg_shared.fileio.file_io import EMGFile

A single class that bundles:

  • Raw data & time vectors
  • Channel descriptions
  • Sampling frequency, file name, file size, file type
  • Electrode‐grid metadata via the .grids property

Loading

emg = EMGFile.load("session1.mat")
  • load(filepath: str) -> EMGFile Detects the extension and dispatches to the appropriate loader (.matMatFileIO.load, .otb+/.otbotb_plus_file_io, .otb4otb_4_file_io), then sanitizes and returns an EMGFile.

Attributes

emg.data               # np.ndarray, shape (nSamples × nChannels), float32
emg.time               # np.ndarray, shape (nSamples,)
emg.description        # list or array of channel‐description strings
emg.sampling_frequency # float
emg.file_name          # str
emg.file_size          # int (bytes)
emg.file_type          # "mat" | "otb" | "otb4"
emg.channel_count      # int, number of channels (= data.shape[1])

Grid Metadata

from hdsemg_shared.fileio.file_io import Grid

grids: list[Grid] = emg.grids
  • .grids (lazy‐loaded): a list of Grid objects (one per detected grid in the file).
  • .get_grid(grid_key=…) or .get_grid(grid_uid=…): retrieve a single Grid by its key (e.g. "8x4") or UUID.
Grid dataclass
@dataclass
class Grid:
    emg_indices: list[int]         # indices of EMG channels in data/time
    ref_indices: list[int]         # indices of reference channels
    rows: int                      # number of rows on the grid
    cols: int                      # number of columns on the grid
    ied_mm: int                    # inter‐electrode distance in millimeters
    electrodes: int                # total electrodes (rows × cols or remote lookup)
    grid_key: str                  # e.g. "8x4"
    grid_uid: str                  # unique UUID string
    requested_path_idx: int | None # index of “requested path” entry in description
    performed_path_idx: int | None # index of “performed path” entry in description

Saving

emg.save("subset.mat")
  • .save(save_path: str) -> None Currently only supports saving to .mat via MatFileIO.save. Raises ValueError for any other extension.

Utility

emg.copy()
  • .copy() -> EMGFile Returns a deep copy of the entire EMGFile (data, metadata, grids).

Low-Level MATLAB I/O

from hdsemg_shared.fileio.matlab_file_io import MatFileIO
  • MatFileIO.load(file_path: str) -> tuple Loads a .mat and returns exactly (data, time, description, sampling_frequency, file_name, file_size).

  • MatFileIO.save(save_path: str, data, time, description, sampling_frequency) Saves the provided arrays/metadata to a .mat file.


Under the Hood

  • Format dispatch in EMGFile.load:

  • MATLAB (.mat) → MatFileIO.load

  • OTB+ / OTB (.otb+, .otb) → otb_plus_file_io.load_otb_file
  • OTB4 (.otb4) → otb_4_file_io.load_otb4_file
  • Sanitization: ensures data is 2-D (samples × channels) and time is 1-D, swapping axes if needed.
  • Grid JSON cache: fetched from Google Drive once per week, stored in ~/.hdsemg_cache/.

Quick Example

# Load and inspect
emg = EMGFile.load("myrecording.otb+")
print(emg.data.shape, emg.sampling_frequency)

# List grids
for grid in emg.grids:
    print(f"{grid.grid_key}: {len(grid.emg_indices)} EMG, {len(grid.ref_indices)} refs")

# Find a specific grid
g2x8 = emg.get_grid(grid_key="2x8")

# Save a selection back to .mat
emg.save("selected_subset.mat")

API Documentation

EMGFile

Source code in hdsemg_shared/fileio/file_io.py
class EMGFile:
    GRID_JSON_URL = (
        "https://drive.google.com/uc?export=download&"
        "id=1FqR6-ZlT1U74PluFEjCSeIS7NXJQUT-v"
    )
    CACHE_PATH = os.path.join(
        os.path.expanduser("~"), ".hdsemg_cache", "grid_data_cache.json"
    )
    _grid_cache: list[dict] | None = None

    def __init__(self, data, time, description, sf, file_name, file_size, file_type):
        self.data = data
        self.time = time
        self.description = description
        self.sampling_frequency = sf
        self.file_name = file_name
        self.file_size = file_size
        self.file_type = file_type
        self.channel_count = data.shape[1] if data.ndim > 1 else 1

        # parse out grids *once* on demand
        self._grids: list[Grid] | None = None

    @classmethod
    def load(cls, filepath: str) -> "EMGFile":
        """Factory: pick the right underlying loader, sanitize, and return EMGFile."""
        suffix = Path(filepath).suffix.lower()
        if suffix == ".mat":
            raw = MatFileIO.load(filepath)
            file_type = "mat"
        elif suffix in {".otb+", ".otb"}:
            raw = load_otb_file(filepath)
            file_type = "otb"
        elif suffix == ".otb4":
            raw = load_otb4_file(filepath)
            file_type = "otb4"
        else:
            raise ValueError(f"Unsupported file type: {suffix!r}")

        data, time, desc, sf, fn, fs = raw

        if data.dtype == np.int16:
            data = data.astype(np.float32)

        data, time = cls._sanitize(data, time)
        return cls(data, time, desc, sf, fn, fs, file_type)

    @staticmethod
    def _sanitize(data: np.ndarray, time: np.ndarray):
        data = np.atleast_2d(data)
        if data.shape[0] < data.shape[1]:
            data = data.T

        time = np.squeeze(time)
        if time.ndim == 2:
            time = time[:, 0] if time.shape[1] == 1 else time[0, :]
        if time.ndim == 1 and time.shape[0] != data.shape[0]:
            if time.shape[0] == data.shape[1]:
                time = time.T
            else:
                raise ValueError(f"Incompatible time {time.shape} for data {data.shape}")
        return data, time

    @property
    def grids(self) -> list[Grid]:
        """
        Lazily extract grid metadata from `self.description` and return a list
        of Grid instances.
        """
        if self._grids is not None:
            return self._grids

        desc = self.description
        pattern = re.compile(r"HD(\d{2})MM(\d{2})(\d{2})")
        info: dict[str, dict] = {}
        current_key = None

        # pull in (or fetch) the grid-data cache
        grid_data = self._load_grid_data()

        def entry_text(e):
            # Handle NumPy arrays
            if isinstance(e, np.ndarray):
                if e.size == 1:
                    return entry_text(e.item())  # recurse into the item
                else:
                    return str(e)  # fallback

            # Handle bytes
            if isinstance(e, bytes):
                try:
                    return e.decode("utf-8")
                except UnicodeDecodeError:
                    return e.decode("latin1")

            # Handle regular string
            if isinstance(e, str):
                return e

            # Fallback for anything else
            try:
                return str(e[0][0])  # often used in nested arrays from .mat
            except Exception:
                return str(e)

        for idx, ent in enumerate(desc):
            txt = entry_text(ent)
            m = pattern.search(txt)
            if m:
                scale, rows, cols = map(int, m.groups())
                key = f"{rows}x{cols}"
                if key not in info:
                    # look up in JSON cache
                    prod = m.group(0).upper()
                    elec = next(
                        (g["electrodes"] for g in grid_data if g["product"].upper() == prod),
                        rows * cols
                    )
                    info[key] = {
                        "rows": rows, "cols": cols, "ied_mm": scale,
                        "electrodes": elec, "indices": [], "refs": [],
                        "req_idx": None, "perf_idx": None
                    }
                info[key]["indices"].append(idx)
                current_key = key
            else:
                if current_key:
                    if "requested path" in txt.lower():
                        info[current_key]["requested_path_idx"] = idx
                    if "performed path" in txt.lower():
                        info[current_key]["performed_path_idx"] = idx
                    info[current_key]["refs"].append((idx, txt))

        # build Grid objects
        self._grids = []
        for key, gi in info.items():
            grid = Grid(
                emg_indices=gi["indices"],
                ref_indices=[i for i, _ in gi["refs"]],
                rows=gi["rows"],
                cols=gi["cols"],
                ied_mm=gi["ied_mm"],
                electrodes=gi["electrodes"],
                grid_key=key,
                requested_path_idx=gi.get("requested_path_idx"),
                performed_path_idx=gi.get("performed_path_idx"),
            )
            self._grids.append(grid)

        return self._grids

    def save(self, save_path: str) -> None:
        if save_path.endswith(".mat"):
            MatFileIO.save(save_path, self.data, self.time, self.description, self.sampling_frequency)
        else:
            file_format = save_path.split('.')[-1].lower()
            raise ValueError(f"Unsupported save format: {file_format!r}")

    @classmethod
    def _load_grid_data(cls) -> list[dict]:
        """
        Load from cache if < 1 week old, else fetch from URL.
        """
        if cls._grid_cache is not None:
            return cls._grid_cache

        os.makedirs(os.path.dirname(cls.CACHE_PATH), exist_ok=True)
        one_week = 7 * 24 * 3600
        try:
            if os.path.exists(cls.CACHE_PATH):
                age = time.time() - os.path.getmtime(cls.CACHE_PATH)
                if age < one_week:
                    with open(cls.CACHE_PATH) as f:
                        cls._grid_cache = json.load(f)
                        return cls._grid_cache
        except Exception:
            pass

        try:
            r = requests.get(cls.GRID_JSON_URL, timeout=10)
            r.raise_for_status()
            cls._grid_cache = r.json()
            with open(cls.CACHE_PATH, "w") as f:
                json.dump(cls._grid_cache, f)
        except Exception:
            cls._grid_cache = []
        return cls._grid_cache

    def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
        """
        Searches for a Grid by its key or UID.
        If both are None, returns None.
        """
        if self._grids is None:
            _ = self.grids  # Initialisiere Grids falls noch nicht geschehen
        if grid_key is not None:
            for g in self._grids:
                if g.grid_key == grid_key:
                    return g
        if grid_uid is not None:
            for g in self._grids:
                if g.grid_uid == grid_uid:
                    return g
        return None

    def copy(self):
        """
        Returns a deep copy of the EMGFile instance.
        """
        import copy
        return copy.deepcopy(self)

grids: list[hdsemg_shared.fileio.file_io.Grid] property readonly

Lazily extract grid metadata from self.description and return a list of Grid instances.

copy(self)

Returns a deep copy of the EMGFile instance.

Source code in hdsemg_shared/fileio/file_io.py
def copy(self):
    """
    Returns a deep copy of the EMGFile instance.
    """
    import copy
    return copy.deepcopy(self)

get_grid(self, *, grid_key=None, grid_uid=None)

Searches for a Grid by its key or UID. If both are None, returns None.

Source code in hdsemg_shared/fileio/file_io.py
def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
    """
    Searches for a Grid by its key or UID.
    If both are None, returns None.
    """
    if self._grids is None:
        _ = self.grids  # Initialisiere Grids falls noch nicht geschehen
    if grid_key is not None:
        for g in self._grids:
            if g.grid_key == grid_key:
                return g
    if grid_uid is not None:
        for g in self._grids:
            if g.grid_uid == grid_uid:
                return g
    return None

load(filepath) classmethod

Factory: pick the right underlying loader, sanitize, and return EMGFile.

Source code in hdsemg_shared/fileio/file_io.py
@classmethod
def load(cls, filepath: str) -> "EMGFile":
    """Factory: pick the right underlying loader, sanitize, and return EMGFile."""
    suffix = Path(filepath).suffix.lower()
    if suffix == ".mat":
        raw = MatFileIO.load(filepath)
        file_type = "mat"
    elif suffix in {".otb+", ".otb"}:
        raw = load_otb_file(filepath)
        file_type = "otb"
    elif suffix == ".otb4":
        raw = load_otb4_file(filepath)
        file_type = "otb4"
    else:
        raise ValueError(f"Unsupported file type: {suffix!r}")

    data, time, desc, sf, fn, fs = raw

    if data.dtype == np.int16:
        data = data.astype(np.float32)

    data, time = cls._sanitize(data, time)
    return cls(data, time, desc, sf, fn, fs, file_type)

Grid dataclass

Grid(emg_indices: list[int], ref_indices: list[int], rows: int, cols: int, ied_mm: int, electrodes: int, grid_key: str, grid_uid: str = , requested_path_idx: int | None = None, performed_path_idx: int | None = None)

Source code in hdsemg_shared/fileio/file_io.py
@dataclass
class Grid:
    emg_indices: list[int]
    ref_indices: list[int]
    rows: int
    cols: int
    ied_mm: int
    electrodes: int
    grid_key: str
    grid_uid: str = field(default_factory=lambda: str(uuid.uuid4()))
    requested_path_idx: int | None = None
    performed_path_idx: int | None = None