File I/O & Grid Extraction
The hdsemg_shared.fileio
module provides a interface to:
- Load HD-sEMG data from MATLAB (
.mat
), OTB+ (.otb+
,.otb
) or OTB4 (.otb4
) files - Automatically sanitize and reshape the data/time arrays
- Extract electrode‐grid metadata (rows, columns, IED, reference channels, etc.)
- Cache remote grid‐configuration JSON for one week
- Save back to
.mat
if needed
Core Types
EMGFile
from hdsemg_shared.fileio.file_io import EMGFile
A single class that bundles:
- Raw data & time vectors
- Channel descriptions
- Sampling frequency, file name, file size, file type
- Electrode‐grid metadata via the
.grids
property
Loading
emg = EMGFile.load("session1.mat")
load(filepath: str) -> EMGFile
Detects the extension and dispatches to the appropriate loader (.mat
→MatFileIO.load
,.otb+
/.otb
→otb_plus_file_io
,.otb4
→otb_4_file_io
), then sanitizes and returns anEMGFile
.
Attributes
emg.data # np.ndarray, shape (nSamples × nChannels), float32
emg.time # np.ndarray, shape (nSamples,)
emg.description # list or array of channel‐description strings
emg.sampling_frequency # float
emg.file_name # str
emg.file_size # int (bytes)
emg.file_type # "mat" | "otb" | "otb4"
emg.channel_count # int, number of channels (= data.shape[1])
Grid Metadata
from hdsemg_shared.fileio.file_io import Grid
grids: list[Grid] = emg.grids
.grids
(lazy‐loaded): a list ofGrid
objects (one per detected grid in the file)..get_grid(grid_key=…)
or.get_grid(grid_uid=…)
: retrieve a singleGrid
by its key (e.g."8x4"
) or UUID.
Grid
dataclass
@dataclass
class Grid:
emg_indices: list[int] # indices of EMG channels in data/time
ref_indices: list[int] # indices of reference channels
rows: int # number of rows on the grid
cols: int # number of columns on the grid
ied_mm: int # inter‐electrode distance in millimeters
electrodes: int # total electrodes (rows × cols or remote lookup)
grid_key: str # e.g. "8x4"
grid_uid: str # unique UUID string
requested_path_idx: int | None # index of “requested path” entry in description
performed_path_idx: int | None # index of “performed path” entry in description
Saving
emg.save("subset.mat")
.save(save_path: str) -> None
Currently only supports saving to.mat
viaMatFileIO.save
. RaisesValueError
for any other extension.
Utility
emg.copy()
.copy() -> EMGFile
Returns a deep copy of the entireEMGFile
(data, metadata, grids).
Low-Level MATLAB I/O
from hdsemg_shared.fileio.matlab_file_io import MatFileIO
-
MatFileIO.load(file_path: str) -> tuple
Loads a.mat
and returns exactly(data, time, description, sampling_frequency, file_name, file_size)
. -
MatFileIO.save(save_path: str, data, time, description, sampling_frequency)
Saves the provided arrays/metadata to a.mat
file.
Under the Hood
-
Format dispatch in
EMGFile.load
: -
MATLAB (
.mat
) →MatFileIO.load
- OTB+ / OTB (
.otb+
,.otb
) →otb_plus_file_io.load_otb_file
- OTB4 (
.otb4
) →otb_4_file_io.load_otb4_file
- Sanitization: ensures
data
is 2-D (samples × channels) andtime
is 1-D, swapping axes if needed. - Grid JSON cache: fetched from Google Drive once per week, stored in
~/.hdsemg_cache/
.
Quick Example
# Load and inspect
emg = EMGFile.load("myrecording.otb+")
print(emg.data.shape, emg.sampling_frequency)
# List grids
for grid in emg.grids:
print(f"{grid.grid_key}: {len(grid.emg_indices)} EMG, {len(grid.ref_indices)} refs")
# Find a specific grid
g2x8 = emg.get_grid(grid_key="2x8")
# Save a selection back to .mat
emg.save("selected_subset.mat")
API Documentation
EMGFile
Source code in hdsemg_shared/fileio/file_io.py
class EMGFile:
GRID_JSON_URL = (
"https://drive.google.com/uc?export=download&"
"id=1FqR6-ZlT1U74PluFEjCSeIS7NXJQUT-v"
)
CACHE_PATH = os.path.join(
os.path.expanduser("~"), ".hdsemg_cache", "grid_data_cache.json"
)
_grid_cache: list[dict] | None = None
def __init__(self, data, time, description, sf, file_name, file_size, file_type):
self.data = data
self.time = time
self.description = description
self.sampling_frequency = sf
self.file_name = file_name
self.file_size = file_size
self.file_type = file_type
self.channel_count = data.shape[1] if data.ndim > 1 else 1
# parse out grids *once* on demand
self._grids: list[Grid] | None = None
@classmethod
def load(cls, filepath: str) -> "EMGFile":
"""Factory: pick the right underlying loader, sanitize, and return EMGFile."""
suffix = Path(filepath).suffix.lower()
if suffix == ".mat":
raw = MatFileIO.load(filepath)
file_type = "mat"
elif suffix in {".otb+", ".otb"}:
raw = load_otb_file(filepath)
file_type = "otb"
elif suffix == ".otb4":
raw = load_otb4_file(filepath)
file_type = "otb4"
else:
raise ValueError(f"Unsupported file type: {suffix!r}")
data, time, desc, sf, fn, fs = raw
if data.dtype == np.int16:
data = data.astype(np.float32)
data, time = cls._sanitize(data, time)
return cls(data, time, desc, sf, fn, fs, file_type)
@staticmethod
def _sanitize(data: np.ndarray, time: np.ndarray):
data = np.atleast_2d(data)
if data.shape[0] < data.shape[1]:
data = data.T
time = np.squeeze(time)
if time.ndim == 2:
time = time[:, 0] if time.shape[1] == 1 else time[0, :]
if time.ndim == 1 and time.shape[0] != data.shape[0]:
if time.shape[0] == data.shape[1]:
time = time.T
else:
raise ValueError(f"Incompatible time {time.shape} for data {data.shape}")
return data, time
@property
def grids(self) -> list[Grid]:
"""
Lazily extract grid metadata from `self.description` and return a list
of Grid instances.
"""
if self._grids is not None:
return self._grids
desc = self.description
pattern = re.compile(r"HD(\d{2})MM(\d{2})(\d{2})")
info: dict[str, dict] = {}
current_key = None
# pull in (or fetch) the grid-data cache
grid_data = self._load_grid_data()
def entry_text(e):
# Handle NumPy arrays
if isinstance(e, np.ndarray):
if e.size == 1:
return entry_text(e.item()) # recurse into the item
else:
return str(e) # fallback
# Handle bytes
if isinstance(e, bytes):
try:
return e.decode("utf-8")
except UnicodeDecodeError:
return e.decode("latin1")
# Handle regular string
if isinstance(e, str):
return e
# Fallback for anything else
try:
return str(e[0][0]) # often used in nested arrays from .mat
except Exception:
return str(e)
for idx, ent in enumerate(desc):
txt = entry_text(ent)
m = pattern.search(txt)
if m:
scale, rows, cols = map(int, m.groups())
key = f"{rows}x{cols}"
if key not in info:
# look up in JSON cache
prod = m.group(0).upper()
elec = next(
(g["electrodes"] for g in grid_data if g["product"].upper() == prod),
rows * cols
)
info[key] = {
"rows": rows, "cols": cols, "ied_mm": scale,
"electrodes": elec, "indices": [], "refs": [],
"req_idx": None, "perf_idx": None
}
info[key]["indices"].append(idx)
current_key = key
else:
if current_key:
if "requested path" in txt.lower():
info[current_key]["requested_path_idx"] = idx
if "performed path" in txt.lower():
info[current_key]["performed_path_idx"] = idx
info[current_key]["refs"].append((idx, txt))
# build Grid objects
self._grids = []
for key, gi in info.items():
grid = Grid(
emg_indices=gi["indices"],
ref_indices=[i for i, _ in gi["refs"]],
rows=gi["rows"],
cols=gi["cols"],
ied_mm=gi["ied_mm"],
electrodes=gi["electrodes"],
grid_key=key,
requested_path_idx=gi.get("requested_path_idx"),
performed_path_idx=gi.get("performed_path_idx"),
)
self._grids.append(grid)
return self._grids
def save(self, save_path: str) -> None:
if save_path.endswith(".mat"):
MatFileIO.save(save_path, self.data, self.time, self.description, self.sampling_frequency)
else:
file_format = save_path.split('.')[-1].lower()
raise ValueError(f"Unsupported save format: {file_format!r}")
@classmethod
def _load_grid_data(cls) -> list[dict]:
"""
Load from cache if < 1 week old, else fetch from URL.
"""
if cls._grid_cache is not None:
return cls._grid_cache
os.makedirs(os.path.dirname(cls.CACHE_PATH), exist_ok=True)
one_week = 7 * 24 * 3600
try:
if os.path.exists(cls.CACHE_PATH):
age = time.time() - os.path.getmtime(cls.CACHE_PATH)
if age < one_week:
with open(cls.CACHE_PATH) as f:
cls._grid_cache = json.load(f)
return cls._grid_cache
except Exception:
pass
try:
r = requests.get(cls.GRID_JSON_URL, timeout=10)
r.raise_for_status()
cls._grid_cache = r.json()
with open(cls.CACHE_PATH, "w") as f:
json.dump(cls._grid_cache, f)
except Exception:
cls._grid_cache = []
return cls._grid_cache
def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
"""
Searches for a Grid by its key or UID.
If both are None, returns None.
"""
if self._grids is None:
_ = self.grids # Initialisiere Grids falls noch nicht geschehen
if grid_key is not None:
for g in self._grids:
if g.grid_key == grid_key:
return g
if grid_uid is not None:
for g in self._grids:
if g.grid_uid == grid_uid:
return g
return None
def copy(self):
"""
Returns a deep copy of the EMGFile instance.
"""
import copy
return copy.deepcopy(self)
grids: list[hdsemg_shared.fileio.file_io.Grid]
property
readonly
Lazily extract grid metadata from self.description
and return a list
of Grid instances.
copy(self)
Returns a deep copy of the EMGFile instance.
Source code in hdsemg_shared/fileio/file_io.py
def copy(self):
"""
Returns a deep copy of the EMGFile instance.
"""
import copy
return copy.deepcopy(self)
get_grid(self, *, grid_key=None, grid_uid=None)
Searches for a Grid by its key or UID. If both are None, returns None.
Source code in hdsemg_shared/fileio/file_io.py
def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
"""
Searches for a Grid by its key or UID.
If both are None, returns None.
"""
if self._grids is None:
_ = self.grids # Initialisiere Grids falls noch nicht geschehen
if grid_key is not None:
for g in self._grids:
if g.grid_key == grid_key:
return g
if grid_uid is not None:
for g in self._grids:
if g.grid_uid == grid_uid:
return g
return None
load(filepath)
classmethod
Factory: pick the right underlying loader, sanitize, and return EMGFile.
Source code in hdsemg_shared/fileio/file_io.py
@classmethod
def load(cls, filepath: str) -> "EMGFile":
"""Factory: pick the right underlying loader, sanitize, and return EMGFile."""
suffix = Path(filepath).suffix.lower()
if suffix == ".mat":
raw = MatFileIO.load(filepath)
file_type = "mat"
elif suffix in {".otb+", ".otb"}:
raw = load_otb_file(filepath)
file_type = "otb"
elif suffix == ".otb4":
raw = load_otb4_file(filepath)
file_type = "otb4"
else:
raise ValueError(f"Unsupported file type: {suffix!r}")
data, time, desc, sf, fn, fs = raw
if data.dtype == np.int16:
data = data.astype(np.float32)
data, time = cls._sanitize(data, time)
return cls(data, time, desc, sf, fn, fs, file_type)
Grid
dataclass
Grid(emg_indices: list[int], ref_indices: list[int], rows: int, cols: int, ied_mm: int, electrodes: int, grid_key: str, grid_uid: str =
Source code in hdsemg_shared/fileio/file_io.py
@dataclass
class Grid:
emg_indices: list[int]
ref_indices: list[int]
rows: int
cols: int
ied_mm: int
electrodes: int
grid_key: str
grid_uid: str = field(default_factory=lambda: str(uuid.uuid4()))
requested_path_idx: int | None = None
performed_path_idx: int | None = None