Source code for canml.canmlio

"""
canmlio: Enhanced CAN BLF processing toolkit for production use.
Module: canml/canmlio.py

Features:
  - Merge multiple DBCs with namespace collision avoidance.
  - Stream-decode large BLF files into pandas DataFrame chunks.
  - Full-file loading with uniform timestamp spacing and interpolation.
  - Signal/message filtering by ID or signal name.
  - Automatic injection of expected signals with dtype preservation.
  - Incremental CSV/Parquet export with metadata support.
  - Generic handling for enums and custom signal attributes.
  - Progress bars via tqdm and caching for DBC loading.

Dependencies:
  numpy, pandas, cantools, python-can, tqdm, pyarrow

Usage:
  from canml.canmlio import load_dbc_files, iter_blf_chunks, load_blf, to_csv, to_parquet, CanmlConfig

"""
import logging
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from collections import Counter

import numpy as np
import pandas as pd
import cantools
from cantools.database.can import Database as CantoolsDatabase
from can.io.blf import BLFReader
from tqdm import tqdm

__all__ = [
    "CanmlConfig",
    "load_dbc_files",
    "iter_blf_chunks",
    "load_blf",
    "to_csv",
    "to_parquet",
]

# Module logger: single StreamHandler
glogger = logging.getLogger(__name__)
glogger.handlers.clear()
_handler = logging.StreamHandler()
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
glogger.addHandler(_handler)
glogger.setLevel(logging.INFO)

T = Any

[docs] @dataclass class CanmlConfig: """ Configuration for BLF processing. Attributes: chunk_size: number of messages per DataFrame chunk. progress_bar: show tqdm progress bar if True. dtype_map: mapping from signal name to desired pandas dtype. sort_timestamps: sort final DataFrame by timestamp if True. force_uniform_timing: override timestamps with uniform spacing if True. interval_seconds: interval between timestamps when uniform timing enabled. interpolate_missing: interpolate missing signals if True. """ chunk_size: int = 10000 progress_bar: bool = True dtype_map: Optional[Dict[str, Any]] = None sort_timestamps: bool = False force_uniform_timing: bool = False interval_seconds: float = 0.01 interpolate_missing: bool = False def __post_init__(self): if self.chunk_size <= 0: raise ValueError("chunk_size must be positive") if self.interval_seconds <= 0: raise ValueError("interval_seconds must be positive")
@lru_cache(maxsize=32) def _load_dbc_files_cached( dbc_paths: Union[str, Tuple[str, ...]], prefix_signals: bool ) -> CantoolsDatabase: """ Internal cached loader for DBC files. Args: dbc_paths: single path or tuple of paths to .dbc files. prefix_signals: if True, prefix signal names with their message name. Returns: CantoolsDatabase with all DBC definitions loaded. Raises: ValueError: for invalid inputs or parsing errors. FileNotFoundError: if any path does not exist. """ paths = [dbc_paths] if isinstance(dbc_paths, str) else list(dbc_paths) if not paths: raise ValueError("At least one DBC file must be provided") db = CantoolsDatabase() for p in paths: pth = Path(p) if pth.suffix.lower() != ".dbc": raise ValueError(f"File {pth} is not a .dbc file") if not pth.is_file(): raise FileNotFoundError(f"DBC file not found: {pth}") glogger.debug(f"Loading DBC: {pth}") try: db.add_dbc_file(str(pth)) except cantools.database.errors.ParseError as e: raise ValueError(f"Invalid DBC format in {pth}: {e}") from e except Exception as e: raise ValueError(f"Invalid DBC file {pth}: {e}") from e # Detect duplicate signal names names = [sig.name for msg in db.messages for sig in msg.signals] if not prefix_signals: dupes = [n for n, c in Counter(names).items() if c > 1] if dupes: raise ValueError( f"Duplicate signal names: {sorted(dupes)}; use prefix_signals=True" ) else: # Ensure unique message names before prefixing msg_names = [m.name for m in db.messages] if len(msg_names) != len(set(msg_names)): raise ValueError("Duplicate message names found; cannot prefix uniquely") for msg in db.messages: for sig in msg.signals: sig.name = f"{msg.name}_{sig.name}" return db
[docs] def load_dbc_files( dbc_paths: Union[str, List[str]], prefix_signals: bool = False ) -> CantoolsDatabase: """ Load and optionally prefix one or more DBC files into a Cantools database. Args: dbc_paths: path or list of paths to .dbc files. prefix_signals: if True, prefix signal names with their message name. Returns: Cached CantoolsDatabase instance. """ key = tuple(dbc_paths) if isinstance(dbc_paths, list) else dbc_paths return _load_dbc_files_cached(key, prefix_signals)
@contextmanager def blf_reader(path: str) -> Iterator[BLFReader]: """ Context manager for BLFReader to ensure reader.stop() on exit. """ reader = BLFReader(str(path)) try: yield reader finally: try: reader.stop() except Exception: glogger.debug("Error closing BLF reader", exc_info=True)
[docs] def iter_blf_chunks( blf_path: str, db: CantoolsDatabase, config: CanmlConfig, filter_ids: Optional[Set[int]] = None, filter_signals: Optional[Set[str]] = None, ) -> Iterator[pd.DataFrame]: """ Stream-decode BLF file into DataFrame chunks. Args: blf_path: .blf file path. db: loaded CantoolsDatabase. config: CanmlConfig instance. filter_ids: set of arbitration IDs to include. filter_signals: set of signal names to include. Yields: pandas.DataFrame chunks of decoded messages. """ p = Path(blf_path) if p.suffix.lower() != ".blf" or not p.is_file(): raise FileNotFoundError(f"Valid BLF file not found: {p}") buffer: List[Dict[str, T]] = [] with blf_reader(blf_path) as reader: iterator = tqdm(reader, desc=p.name) if config.progress_bar else reader for msg in iterator: if filter_ids and msg.arbitration_id not in filter_ids: continue try: rec = db.decode_message(msg.arbitration_id, msg.data) except Exception: continue if filter_signals: rec = {k: v for k, v in rec.items() if k in filter_signals} if rec: rec["timestamp"] = msg.timestamp buffer.append(rec) if len(buffer) >= config.chunk_size: yield pd.DataFrame(buffer) buffer.clear() if buffer: yield pd.DataFrame(buffer)
[docs] def load_blf( blf_path: str, db: Union[CantoolsDatabase, str, List[str]], config: Optional[CanmlConfig] = None, message_ids: Optional[Set[int]] = None, expected_signals: Optional[Iterable[str]] = None, ) -> pd.DataFrame: """ Load an entire BLF file into a DataFrame, decoding and aligning signals. Args: blf_path: .blf file path. db: CantoolsDatabase or DBC file path(s). config: CanmlConfig instance. message_ids: IDs to include (None=all). expected_signals: signals to include (None=all in DBC). Returns: pandas.DataFrame with columns [timestamp,...signals]. """ config = config or CanmlConfig() # Validate expected_signals duplicates if expected_signals is not None: exp_list = list(expected_signals) if len(exp_list) != len(set(exp_list)): raise ValueError("Duplicate names in expected_signals") else: exp_list = None # Load DB dbobj = db if isinstance(db, CantoolsDatabase) else load_dbc_files(db) # Warn on explicit empty message_ids if message_ids is not None and not message_ids: glogger.warning("Empty message_ids provided; no messages will be decoded") # Determine expected signals all_sigs = [s.name for m in dbobj.messages for s in m.signals] expected = exp_list if exp_list is not None else all_sigs # Validate dtype_map dtype_map = config.dtype_map or {} for sig in dtype_map: if sig not in expected: raise ValueError(f"dtype_map contains unknown signal: {sig}") # Decode chunks try: chunks = list(iter_blf_chunks( blf_path, dbobj, config, message_ids, set(expected) )) except FileNotFoundError: raise except Exception as e: glogger.error("Failed to process BLF chunks", exc_info=True) raise ValueError(f"Failed to process BLF data: {e}") from e # Build DataFrame if not chunks: glogger.warning(f"No data decoded from {blf_path}; returning empty DataFrame") df = pd.DataFrame({ "timestamp": pd.Series(dtype=float), **{sig: pd.Series(dtype=dtype_map.get(sig, float)) for sig in expected} }) else: df = pd.concat(chunks, ignore_index=True) # Retain only timestamp + expected columns cols_keep = [c for c in ["timestamp"] + expected if c in df.columns] df = df[cols_keep] # Sort timestamps if config.sort_timestamps: df = df.sort_values("timestamp").reset_index(drop=True) # Uniform timing if config.force_uniform_timing: df["raw_timestamp"] = df["timestamp"] df["timestamp"] = np.arange(len(df)) * config.interval_seconds # Inject missing signals for sig in expected: if sig not in df.columns: npdt = np.dtype(dtype_map.get(sig, float)) if config.interpolate_missing and sig in all_sigs: df[sig] = df[sig].interpolate(method="linear", limit_direction="both") elif np.issubdtype(npdt, np.integer): df[sig] = np.zeros(len(df), dtype=npdt) else: df[sig] = pd.Series([np.nan] * len(df), dtype=npdt) # Collect metadata and convert enums df.attrs["signal_attributes"] = { s.name: getattr(s, "attributes", {}) for m in dbobj.messages for s in m.signals if s.name in df.columns } for m in dbobj.messages: for s in m.signals: if s.name in df.columns and getattr(s, "choices", None): df[s.name] = pd.Categorical( df[s.name].map(s.choices), categories=list(s.choices.values()) ) # Ensure timestamp first column return df[["timestamp"] + [c for c in df.columns if c != "timestamp"]]
[docs] def to_csv( df_or_iter: Union[pd.DataFrame, Iterable[pd.DataFrame]], output_path: str, mode: str = "w", header: bool = True, pandas_kwargs: Optional[Dict[str, Any]] = None, columns: Optional[List[str]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame or iterable of DataFrames to CSV, exporting metadata. Args: df_or_iter: single DataFrame or iterable of chunks. output_path: CSV file path. mode: 'w' or 'a'. header: write header row. pandas_kwargs: extra pandas.to_csv kwargs. columns: subset/order of columns to write. metadata_path: JSON file path to save df.attrs["signal_attributes"]. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} # Validate columns if columns and len(columns) != len(set(columns)): raise ValueError("Duplicate columns specified") # Ensure CSV dir exists p.parent.mkdir(parents=True, exist_ok=True) def _write(df: pd.DataFrame, mode_: str, header_: bool, write_meta: bool): df.to_csv(p, mode=mode_, header=header_, index=False, columns=columns, **pandas_kwargs) if metadata_path and write_meta: m = Path(metadata_path) m.parent.mkdir(parents=True, exist_ok=True) sig_attrs = df.attrs.get("signal_attributes") or {c: {} for c in df.columns} m.write_text(json.dumps(sig_attrs)) if isinstance(df_or_iter, pd.DataFrame): _write(df_or_iter, mode, header, True) else: first = True for chunk in df_or_iter: _write(chunk, mode if first else "a", header if first else False, first) first = False glogger.info(f"CSV written to {output_path}")
[docs] def to_parquet( df: pd.DataFrame, output_path: str, compression: str = "snappy", pandas_kwargs: Optional[Dict[str, Any]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame to Parquet with optional metadata export. Args: df: DataFrame to write. output_path: .parquet file path. compression: Parquet codec. pandas_kwargs: kwargs for pandas.to_parquet. metadata_path: JSON path for signal_attributes metadata. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} # Ensure dir exists p.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(p, engine="pyarrow", compression=compression, **pandas_kwargs) except Exception as e: glogger.error(f"Failed to write Parquet {p}: {e}", exc_info=True) raise ValueError(f"Failed to export Parquet: {e}") from e # Metadata export if metadata_path: m = Path(metadata_path) m.parent.mkdir(parents=True, exist_ok=True) sig_attrs = df.attrs.get("signal_attributes") or {c: {} for c in df.columns} m.write_text(json.dumps(sig_attrs)) glogger.info(f"Metadata written to {m}") glogger.info(f"Parquet written to {p}")