File I/O & Grid Extraction
The hdsemg_shared.fileio module provides a interface to:
- Load HD-sEMG data from MATLAB (
.mat), OTB+ (.otb+,.otb) or OTB4 (.otb4) files - Support for Novecento+ device files with multi-track recordings and control signals
- Automatically sanitize and reshape the data/time arrays
- Extract electrode‐grid metadata (rows, columns, IED, reference channels, etc.)
- Cache remote grid‐configuration JSON for one week
- Save back to
.matif needed
Core Types
EMGFile
from hdsemg_shared.fileio.file_io import EMGFile
A single class that bundles:
- Raw data & time vectors
- Channel descriptions
- Sampling frequency, file name, file size, file type
- Electrode‐grid metadata via the
.gridsproperty
Loading
emg = EMGFile.load("session1.mat")
load(filepath: str) -> EMGFileDetects the extension and dispatches to the appropriate loader (.mat→MatFileIO.load,.otb+/.otb→otb_plus_file_io,.otb4→otb_4_file_io), then sanitizes and returns anEMGFile.
Attributes
emg.data # np.ndarray, shape (nSamples × nChannels), float32
emg.time # np.ndarray, shape (nSamples,)
emg.description # list or array of channel‐description strings
emg.sampling_frequency # float
emg.file_name # str
emg.file_size # int (bytes)
emg.file_type # "mat" | "otb" | "otb4"
emg.channel_count # int, number of channels (= data.shape[1])
Grid Metadata
from hdsemg_shared.fileio.file_io import Grid
grids: list[Grid] = emg.grids
.grids(lazy‐loaded): a list ofGridobjects (one per detected grid in the file)..get_grid(grid_key=…)or.get_grid(grid_uid=…): retrieve a singleGridby its key (e.g."8x4") or UUID.
Grid dataclass
@dataclass
class Grid:
emg_indices: list[int] # indices of EMG channels in data/time
ref_indices: list[int] # indices of reference channels
rows: int # number of rows on the grid
cols: int # number of columns on the grid
ied_mm: int # inter‐electrode distance in millimeters
electrodes: int # total electrodes (rows × cols or remote lookup)
grid_key: str # e.g. "8x4"
grid_uid: str # unique UUID string
requested_path_idx: int | None # index of “requested path” entry in description
performed_path_idx: int | None # index of “performed path” entry in description
Saving
emg.save("subset.mat")
.save(save_path: str) -> NoneCurrently only supports saving to.matviaMatFileIO.save. RaisesValueErrorfor any other extension.
Utility
emg.copy()
.copy() -> EMGFileReturns a deep copy of the entireEMGFile(data, metadata, grids).
Low-Level MATLAB I/O
from hdsemg_shared.fileio.matlab_file_io import MatFileIO
-
MatFileIO.load(file_path: str) -> tupleLoads a.matand returns exactly(data, time, description, sampling_frequency, file_name, file_size). -
MatFileIO.save(save_path: str, data, time, description, sampling_frequency)Saves the provided arrays/metadata to a.matfile.
Novecento+ Support
The library fully supports Novecento+ (OTBiolab) files in .otb4 format. These files have unique characteristics:
Multi-Track Signal Files
Novecento+ recordings store multiple data tracks in the same signal file (.sig), distinguished by:
ChannelOffsetInSubPacket: Starting position of each track's channels within the fileIsControl: Flag indicating control/reference signals (quaternions, buffer, ramp, etc.)- Grid metadata in XML
<Description>elements (<Name>,<NRow>,<NColumn>,<IED>)
Grid Extraction
The loader automatically:
- Parses grid information from
Tracks_000.xmlfile - Extracts EMG channels (channels with valid grid patterns like
HD08MM1305) - Identifies reference channels (control signals, quaternions, buffer, ramp)
- Assigns reference channels to the appropriate EMG grid based on their position in the file
Channel Organization
For Novecento+ files, channels are organized as:
- EMG channels (
emg_indices): Main electrode grid signals withHD{IED}MM{rows}{cols}pattern - Reference channels (
ref_indices): Control signals, quaternions, buffer, ramp, and auxiliary inputs marked withREFin descriptions
Example: Novecento+ File
# Load Novecento+ OTB4 file
emg = EMGFile.load("recording.otb4")
# Main EMG grid (e.g., 5x13 = 65 electrodes, 64 active)
main_grid = emg.grids[0]
print(f"EMG Grid: {main_grid.rows}x{main_grid.cols}, IED={main_grid.ied_mm}mm")
print(f"EMG channels: {len(main_grid.emg_indices)}")
print(f"Reference channels: {len(main_grid.ref_indices)}")
# Access EMG data only
emg_data = emg.data[:, main_grid.emg_indices]
# Access reference signals (quaternions, control signals, etc.)
ref_data = emg.data[:, main_grid.ref_indices]
# Check which channels are references
for idx in main_grid.ref_indices:
desc = emg.description[idx]
print(f"Ref channel {idx}: {desc}")
Supported Signal Types
Novecento+ files may contain:
- HD-sEMG grids: Main EMG electrode arrays (e.g., HD08MM1305, HD08MM0513)
- Quaternions: IMU orientation data (4 channels, 2x2 layout)
- Buffer/Ramp: Device control signals (1 channel each)
- AUX inputs: Auxiliary analog inputs (16 channels, 2kHz)
- Control signals: Additional device status channels (8 channels, 8kHz)
- Load cells: Force/torque sensors (1+ channels)
Technical Details
- Data format: int32 (Novecento+) converted to float64 with appropriate scaling
- Conversion:
ADC_Range / (2^ADC_Nbits) * 1000 / Gain - Fortran-order reshaping: Data is reshaped as
(channels, samples, order='F') - Mixed sampling rates: Main EMG at 2kHz, control signals at 8kHz (automatically trimmed to match)
Under the Hood
-
Format dispatch in
EMGFile.load: -
MATLAB (
.mat) →MatFileIO.load - OTB+ / OTB (
.otb+,.otb) →otb_plus_file_io.load_otb_file - OTB4 (
.otb4) →otb_4_file_io.load_otb4_file- Novecento+ device detection and multi-track loading
- Grid metadata extraction from XML
- Sanitization: ensures
datais 2-D (samples × channels) andtimeis 1-D, swapping axes if needed. - Grid JSON cache: fetched from Google Drive once per week, stored in
~/.hdsemg_cache/.
Quick Example
# Load and inspect
emg = EMGFile.load("myrecording.otb+")
print(emg.data.shape, emg.sampling_frequency)
# List grids
for grid in emg.grids:
print(f"{grid.grid_key}: {len(grid.emg_indices)} EMG, {len(grid.ref_indices)} refs")
# Find a specific grid
g2x8 = emg.get_grid(grid_key="2x8")
# Save a selection back to .mat
emg.save("selected_subset.mat")
API Documentation
EMGFile
Source code in hdsemg_shared/fileio/file_io.py
class EMGFile:
GRID_JSON_URL = (
"https://drive.google.com/uc?export=download&"
"id=1FqR6-ZlT1U74PluFEjCSeIS7NXJQUT-v"
)
CACHE_PATH = os.path.join(
os.path.expanduser("~"), ".hdsemg_cache", "grid_data_cache.json"
)
_grid_cache: list[dict] | None = None
def __init__(self, data, time, description, sf, file_name, file_size, file_type):
self.data = data
self.time = time
self.description = description
self.sampling_frequency = sf
self.file_name = file_name
self.file_size = file_size
self.file_type = file_type
self.channel_count = data.shape[1] if data.ndim > 1 else 1
# parse out grids *once* on demand
self._grids: list[Grid] | None = None
@classmethod
def load(cls, filepath: str) -> "EMGFile":
"""Factory: pick the right underlying loader, sanitize, and return EMGFile."""
suffix = Path(filepath).suffix.lower()
if suffix == ".mat":
raw = MatFileIO.load(filepath)
file_type = "mat"
elif suffix in {".otb+", ".otb"}:
raw = load_otb_file(filepath)
file_type = "otb"
elif suffix == ".otb4":
raw = load_otb4_file(filepath)
file_type = "otb4"
else:
raise ValueError(f"Unsupported file type: {suffix!r}")
data, time, desc, sf, fn, fs = raw
if data.dtype == np.int16:
data = data.astype(np.float32)
data, time = cls._sanitize(data, time)
return cls(data, time, desc, sf, fn, fs, file_type)
@staticmethod
def _sanitize(data: np.ndarray, time: np.ndarray):
data = np.atleast_2d(data)
if data.shape[0] < data.shape[1]:
data = data.T
time = np.squeeze(time)
if time.ndim == 2:
time = time[:, 0] if time.shape[1] == 1 else time[0, :]
if time.ndim == 1 and time.shape[0] != data.shape[0]:
if time.shape[0] == data.shape[1]:
time = time.T
else:
raise ValueError(f"Incompatible time {time.shape} for data {data.shape}")
return data, time
@property
def grids(self) -> list[Grid]:
"""
Lazily extract grid metadata from `self.description` and return a list
of Grid instances. Handles multiple grids with identical specifications
by detecting non-contiguous channel indices.
"""
if self._grids is not None:
return self._grids
desc = self.description
pattern = re.compile(r"HD(\d{2})MM(\d{2})(\d{2})")
muscle_pattern = re.compile(r"\[MUSCLE:(.*?)\]")
# Instead of dict, use list to allow multiple grids with same specs
grid_instances: list[dict] = []
current_grid = None
# pull in (or fetch) the grid-data cache
grid_data = self._load_grid_data()
def entry_text(e):
# Handle NumPy arrays
if isinstance(e, np.ndarray):
if e.size == 1:
return entry_text(e.item()) # recurse into the item
else:
return str(e) # fallback
# Handle bytes
if isinstance(e, bytes):
try:
return e.decode("utf-8")
except UnicodeDecodeError:
return e.decode("latin1")
# Handle regular string
if isinstance(e, str):
return e
# Fallback for anything else
try:
return str(e[0][0]) # often used in nested arrays from .mat
except Exception:
return str(e)
def is_contiguous(indices: list[int], new_idx: int, tolerance: int = 5) -> bool:
"""Check if new_idx is contiguous with existing indices."""
if not indices:
return True
# Check if within tolerance of the last index
return abs(new_idx - indices[-1]) <= tolerance
def find_or_create_grid(scale: int, rows: int, cols: int, idx: int, muscle: Optional[str] = None) -> dict:
"""Find existing grid with matching specs and contiguous indices, or create new one."""
# Look for existing grid with same specs
base_key = f"{scale}mm_{rows}x{cols}"
for grid_inst in grid_instances:
# Match by specs, contiguity, AND muscle (if available)
specs_match = (grid_inst["ied_mm"] == scale and
grid_inst["rows"] == rows and
grid_inst["cols"] == cols)
# If muscle info is available, use it to differentiate grids
if muscle is not None and grid_inst.get("muscle") is not None:
muscle_match = grid_inst["muscle"] == muscle
else:
muscle_match = True # No muscle info, don't use for matching
if specs_match and muscle_match and is_contiguous(grid_inst["indices"], idx):
return grid_inst
# No contiguous grid found, create new instance
# Look up electrode count from cache
prod = f"HD{scale:02d}MM{rows:02d}{cols:02d}".upper()
# Create transposed pattern
prod_transposed = f"HD{scale:02d}MM{cols:02d}{rows:02d}".upper()
elec = None
for g in grid_data:
g_prod_upper = g["product"].upper()
if g_prod_upper == prod or g_prod_upper == prod_transposed:
elec = g["electrodes"]
break
if elec is None:
elec = rows * cols
# Create unique key with instance counter if needed
instance_num = sum(1 for g in grid_instances
if g["ied_mm"] == scale and g["rows"] == rows and g["cols"] == cols)
if instance_num > 0:
grid_key = f"{base_key}_{instance_num + 1}"
else:
grid_key = base_key
new_grid = {
"rows": rows,
"cols": cols,
"ied_mm": scale,
"electrodes": elec,
"indices": [],
"refs": [],
"req_idx": None,
"perf_idx": None,
"grid_key": grid_key,
"muscle": None
}
grid_instances.append(new_grid)
return new_grid
for idx, ent in enumerate(desc):
txt = entry_text(ent)
m = pattern.search(txt)
if m:
scale, rows, cols = map(int, m.groups())
# Extract muscle information if present (do this BEFORE find_or_create_grid)
muscle = None
muscle_match = muscle_pattern.search(txt)
if muscle_match:
muscle = muscle_match.group(1).strip()
# Pass muscle info to find_or_create_grid for proper differentiation
current_grid = find_or_create_grid(scale, rows, cols, idx, muscle)
current_grid["indices"].append(idx)
# Store muscle info in grid if not already set
if muscle and current_grid["muscle"] is None:
current_grid["muscle"] = muscle
else:
if current_grid:
# Support both "requested path" and "original path" for the requested/original path index
if "requested path" in txt.lower() or "original path" in txt.lower():
current_grid["req_idx"] = idx
if "performed path" in txt.lower():
current_grid["perf_idx"] = idx
current_grid["refs"].append((idx, txt))
# build Grid objects
self._grids = []
for gi in grid_instances:
grid = Grid(
emg_indices=gi["indices"],
ref_indices=[i for i, _ in gi["refs"]],
rows=gi["rows"],
cols=gi["cols"],
ied_mm=gi["ied_mm"],
electrodes=gi["electrodes"],
grid_key=gi["grid_key"],
muscle=gi.get("muscle"),
requested_path_idx=gi.get("req_idx"),
performed_path_idx=gi.get("perf_idx"),
)
self._grids.append(grid)
return self._grids
def save(self, save_path: str) -> None:
if save_path.endswith(".mat"):
MatFileIO.save(save_path, self.data, self.time, self.description, self.sampling_frequency)
else:
file_format = save_path.split('.')[-1].lower()
raise ValueError(f"Unsupported save format: {file_format!r}")
@classmethod
def _load_grid_data(cls) -> list[dict]:
"""
Load from cache if < 1 week old, else fetch from URL.
"""
if cls._grid_cache is not None:
return cls._grid_cache
os.makedirs(os.path.dirname(cls.CACHE_PATH), exist_ok=True)
one_week = 7 * 24 * 3600
try:
if os.path.exists(cls.CACHE_PATH):
age = time.time() - os.path.getmtime(cls.CACHE_PATH)
if age < one_week:
with open(cls.CACHE_PATH) as f:
cls._grid_cache = json.load(f)
return cls._grid_cache
except Exception:
pass
try:
r = requests.get(cls.GRID_JSON_URL, timeout=10)
r.raise_for_status()
cls._grid_cache = r.json()
with open(cls.CACHE_PATH, "w") as f:
json.dump(cls._grid_cache, f)
except Exception:
cls._grid_cache = []
return cls._grid_cache
def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
"""
Searches for a Grid by its key or UID.
If both are None, returns None.
"""
if self._grids is None:
_ = self.grids # Initialisiere Grids falls noch nicht geschehen
if grid_key is not None:
for g in self._grids:
if g.grid_key == grid_key:
return g
if grid_uid is not None:
for g in self._grids:
if g.grid_uid == grid_uid:
return g
return None
def copy(self):
"""
Returns a deep copy of the EMGFile instance.
"""
import copy
return copy.deepcopy(self)
grids: list[hdsemg_shared.fileio.file_io.Grid]
property
readonly
Lazily extract grid metadata from self.description and return a list
of Grid instances. Handles multiple grids with identical specifications
by detecting non-contiguous channel indices.
copy(self)
Returns a deep copy of the EMGFile instance.
Source code in hdsemg_shared/fileio/file_io.py
def copy(self):
"""
Returns a deep copy of the EMGFile instance.
"""
import copy
return copy.deepcopy(self)
get_grid(self, *, grid_key=None, grid_uid=None)
Searches for a Grid by its key or UID. If both are None, returns None.
Source code in hdsemg_shared/fileio/file_io.py
def get_grid(self, *, grid_key: str = None, grid_uid: str = None) -> Grid | None:
"""
Searches for a Grid by its key or UID.
If both are None, returns None.
"""
if self._grids is None:
_ = self.grids # Initialisiere Grids falls noch nicht geschehen
if grid_key is not None:
for g in self._grids:
if g.grid_key == grid_key:
return g
if grid_uid is not None:
for g in self._grids:
if g.grid_uid == grid_uid:
return g
return None
load(filepath)
classmethod
Factory: pick the right underlying loader, sanitize, and return EMGFile.
Source code in hdsemg_shared/fileio/file_io.py
@classmethod
def load(cls, filepath: str) -> "EMGFile":
"""Factory: pick the right underlying loader, sanitize, and return EMGFile."""
suffix = Path(filepath).suffix.lower()
if suffix == ".mat":
raw = MatFileIO.load(filepath)
file_type = "mat"
elif suffix in {".otb+", ".otb"}:
raw = load_otb_file(filepath)
file_type = "otb"
elif suffix == ".otb4":
raw = load_otb4_file(filepath)
file_type = "otb4"
else:
raise ValueError(f"Unsupported file type: {suffix!r}")
data, time, desc, sf, fn, fs = raw
if data.dtype == np.int16:
data = data.astype(np.float32)
data, time = cls._sanitize(data, time)
return cls(data, time, desc, sf, fn, fs, file_type)
Grid
dataclass
Represents a high-density EMG electrode grid.
Attributes:
| Name | Type | Description |
|---|---|---|
emg_indices |
list[int] |
List of channel indices for EMG electrodes |
ref_indices |
list[int] |
List of channel indices for reference electrodes |
rows |
int |
Number of rows in the grid |
cols |
int |
Number of columns in the grid |
ied_mm |
int |
Inter-electrode distance in millimeters |
electrodes |
int |
Total number of active electrodes |
grid_key |
str |
Unique identifier key (format: "{ied}mm_{rows}x{cols}" or with "_N" suffix) |
grid_uid |
str |
Unique UUID for this grid instance |
muscle |
str | None |
Optional muscle name where grid is placed (extracted from OTB4 XML |
requested_path_idx |
int | None |
Optional index of "requested path" description entry |
performed_path_idx |
int | None |
Optional index of "performed path" description entry |
Source code in hdsemg_shared/fileio/file_io.py
@dataclass
class Grid:
"""
Represents a high-density EMG electrode grid.
Attributes:
emg_indices: List of channel indices for EMG electrodes
ref_indices: List of channel indices for reference electrodes
rows: Number of rows in the grid
cols: Number of columns in the grid
ied_mm: Inter-electrode distance in millimeters
electrodes: Total number of active electrodes
grid_key: Unique identifier key (format: "{ied}mm_{rows}x{cols}" or with "_N" suffix)
grid_uid: Unique UUID for this grid instance
muscle: Optional muscle name where grid is placed (extracted from OTB4 XML <Muscle> tag)
requested_path_idx: Optional index of "requested path" description entry
performed_path_idx: Optional index of "performed path" description entry
"""
emg_indices: list[int]
ref_indices: list[int]
rows: int
cols: int
ied_mm: int
electrodes: int
grid_key: str
grid_uid: str = field(default_factory=lambda: str(uuid.uuid4()))
muscle: Optional[str] = None
requested_path_idx: Optional[int] = None
performed_path_idx: Optional[int] = None