Source code for canml.canmlio

"""
canmlio: Enhanced CAN BLF processing toolkit for production use.

This module provides end-to-end functionality for decoding CAN bus logs in BLF
format into pandas DataFrames, handling DBC file loading and merging,
streaming large logs, full-file loading with filtering, timing alignment,
missing-signal injection, and exporting to CSV or Parquet with accompanying
metadata. It also supports enums and custom signal attributes, all configurable
via a single `CanmlConfig` object.

Dependencies:
  - numpy
  - pandas
  - cantools
  - python-can
  - tqdm
  - pyarrow (for Parquet export)

Example:
    from canml.canmlio import load_dbc_files, load_blf, to_csv, CanmlConfig

    # 1️⃣ Load DBC
    db = load_dbc_files("vehicle.dbc", prefix_signals=True)

    # 2️⃣ Configure BLF load
    cfg = CanmlConfig(
        chunk_size=5000,
        progress_bar=True,
        force_uniform_timing=True,
        interval_seconds=0.02,
        interpolate_missing=True,
        dtype_map={"Engine_RPM": "int32"}
    )

    # 3️⃣ Load BLF file
    df = load_blf(
        blf_path="drive.blf",
        db=db,
        config=cfg,
        message_ids={0x100, 0x200},
        expected_signals=["Engine_RPM", "Brake_Active"]
    )

    # 4️⃣ Export
    to_csv(df, "drive.csv", metadata_path="drive_meta.json")
"""
import logging
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from collections import Counter

import numpy as np
import pandas as pd
import cantools
from cantools.database.can import Database as CantoolsDatabase
from can.io.blf import BLFReader
from tqdm import tqdm

__all__ = [
    "CanmlConfig",
    "load_dbc_files",
    "iter_blf_chunks",
    "load_blf",
    "to_csv",
    "to_parquet",
]

# ----------------------------------------------------------------------------
# Logger setup
# ----------------------------------------------------------------------------

glogger = logging.getLogger(__name__)
glogger.handlers.clear()
_handler = logging.StreamHandler()
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
glogger.addHandler(_handler)
glogger.setLevel(logging.INFO)

T = Any

# ----------------------------------------------------------------------------
# Configuration dataclass
# ----------------------------------------------------------------------------
[docs] @dataclass class CanmlConfig: """ Configuration options for BLF processing. Args: chunk_size (int): Number of messages per chunk when streaming. Defaults to 10000. Example: chunk_size=5000 for smaller, more frequent chunks. progress_bar (bool): Show a tqdm progress bar if True. Defaults to True. dtype_map (Optional[Dict[str, Any]]): Map signal names to pandas dtypes. Example: dtype_map={"Speed": "float32"} ensures Speed column is float32. sort_timestamps (bool): Sort final DataFrame by timestamp. Defaults to False. force_uniform_timing (bool): Override timestamps with uniform spacing. Defaults to False. interval_seconds (float): Spacing interval in seconds for uniform timing. Defaults to 0.01. interpolate_missing (bool): Interpolate missing signal values if True. Defaults to False. Raises: ValueError: If chunk_size <= 0 or interval_seconds <= 0. """ chunk_size: int = 10000 progress_bar: bool = True dtype_map: Optional[Dict[str, Any]] = None sort_timestamps: bool = False force_uniform_timing: bool = False interval_seconds: float = 0.01 interpolate_missing: bool = False def __post_init__(self): if self.chunk_size <= 0: raise ValueError("chunk_size must be positive") if self.interval_seconds <= 0: raise ValueError("interval_seconds must be positive")
# ---------------------------------------------------------------------------- # DBC loading and merging # ---------------------------------------------------------------------------- @lru_cache(maxsize=32) def _load_dbc_files_cached( dbc_paths: Union[str, Tuple[str, ...]], prefix_signals: bool ) -> CantoolsDatabase: """ Internal: Load and merge .dbc files into a single CantoolsDatabase. Args: dbc_paths (str or tuple): Path(s) to .dbc file(s). prefix_signals (bool): If True, prefix each signal with its message name or message_name_frame_id on duplicate messages. Returns: CantoolsDatabase: Loaded database. Raises: FileNotFoundError: If any .dbc path is missing. ValueError: On invalid extension, parse errors, or duplicate signal names. """ paths = [dbc_paths] if isinstance(dbc_paths, str) else list(dbc_paths) if not paths: raise ValueError("At least one DBC file must be provided") db = CantoolsDatabase() for p in paths: pth = Path(p) if pth.suffix.lower() != ".dbc": raise ValueError(f"File {pth} is not a .dbc file") if not pth.is_file(): raise FileNotFoundError(f"DBC file not found: {pth}") glogger.debug(f"Loading DBC: {pth}") try: db.add_dbc_file(str(pth)) except cantools.database.errors.ParseError as e: raise ValueError(f"Invalid DBC format in {pth}: {e}") from e except Exception as e: raise ValueError(f"Invalid DBC file {pth}: {e}") from e names = [sig.name for msg in db.messages for sig in msg.signals] if not prefix_signals: dup = [n for n, c in Counter(names).items() if c > 1] if dup: raise ValueError(f"Duplicate signal names: {sorted(dup)}; use prefix_signals=True") else: msg_names = [m.name for m in db.messages] dup_msg = [n for n, c in Counter(msg_names).items() if c > 1] if dup_msg: glogger.warning( f"Duplicate message names {sorted(dup_msg)}; " "using <name>_<frame_id> prefix" ) for msg in db.messages: for sig in msg.signals: sig.name = f"{msg.name}_{msg.frame_id}_{sig.name}" else: for msg in db.messages: for sig in msg.signals: sig.name = f"{msg.name}_{sig.name}" return db
[docs] def load_dbc_files( dbc_paths: Union[str, List[str]], prefix_signals: bool = False ) -> CantoolsDatabase: """ Load and merge one or more DBC files, caching results. Args: dbc_paths (str or list): Path or list of DBC file paths. prefix_signals (bool): If True, prefix signals with message names. Returns: CantoolsDatabase: Merged database. Example: db = load_dbc_files("vehicle.dbc", prefix_signals=True) """ key = tuple(dbc_paths) if isinstance(dbc_paths, list) else dbc_paths return _load_dbc_files_cached(key, prefix_signals)
# ---------------------------------------------------------------------------- # BLFReader context manager # ---------------------------------------------------------------------------- @contextmanager def blf_reader(path: str) -> Iterator[BLFReader]: """ Context manager that ensures BLFReader.stop() is called. Args: path (str): Path to BLF file. Yields: BLFReader: Active reader. """ reader = BLFReader(str(path)) try: yield reader finally: try: reader.stop() except Exception: glogger.debug("Error closing BLF reader", exc_info=True) # ---------------------------------------------------------------------------- # Stream-decode BLF in chunks # ----------------------------------------------------------------------------
[docs] def iter_blf_chunks( blf_path: str, db: CantoolsDatabase, config: CanmlConfig, filter_ids: Optional[Set[int]] = None, filter_signals: Optional[Iterable[Any]] = None, ) -> Iterator[pd.DataFrame]: """ Stream-decode a BLF file into pandas DataFrame chunks. Args: blf_path (str): Path to BLF file. db (CantoolsDatabase): Database for message definitions. config (CanmlConfig): Chunk size, progress bar, etc. filter_ids (set[int], optional): Only decode these arbitration IDs. filter_signals (iterable, optional): Only include these signal names. Yields: pd.DataFrame: Decoded signals with a 'timestamp' column. Example: for chunk in iter_blf_chunks("drive.blf", db, cfg, filter_ids={0x123}): print(chunk.head()) """ p = Path(blf_path) if p.suffix.lower() != ".blf" or not p.is_file(): raise FileNotFoundError(f"Valid BLF file not found: {p}") sig_set: Optional[Set[str]] = None if filter_signals is not None: sig_set = set() for s in filter_signals: try: sig_set.add(str(s)) except Exception: continue buffer: List[Dict[str, Any]] = [] with blf_reader(blf_path) as reader: it = tqdm(reader, desc=p.name) if config.progress_bar else reader for msg in it: if filter_ids and msg.arbitration_id not in filter_ids: continue try: rec = db.decode_message(msg.arbitration_id, msg.data) except Exception: continue if sig_set is not None: rec = {k: v for k, v in rec.items() if k in sig_set} if rec: rec["timestamp"] = msg.timestamp buffer.append(rec) if len(buffer) >= config.chunk_size: yield pd.DataFrame(buffer) buffer.clear() if buffer: yield pd.DataFrame(buffer)
# ---------------------------------------------------------------------------- # Full-file load with filtering, timing, injection, metadata # ----------------------------------------------------------------------------
[docs] def load_blf( blf_path: str, db: Union[CantoolsDatabase, str, List[str]], config: Optional[CanmlConfig] = None, message_ids: Optional[Set[int]] = None, expected_signals: Optional[Iterable[Any]] = None, ) -> pd.DataFrame: """ Load a BLF log into a pandas DataFrame, with full-featured options. Args: blf_path (str): Path to BLF file. db (CantoolsDatabase or str/list): Database instance or DBC path(s). config (CanmlConfig, optional): Processing options. message_ids (set[int], optional): Filter by CAN IDs. Example: {0x123, 0x200}. expected_signals (iterable, optional): Signals to include. Example: ["Engine_RPM"]. Returns: pd.DataFrame: Columns ['timestamp', ...signals], dtype-safe, enums as Categorical. Example: df = load_blf( blf_path="drive.blf", db="vehicle.dbc", config=cfg, expected_signals=["Speed", NameSignalValue(...)], ) """ config = config or CanmlConfig() # Normalize and dedupe expected_signals exp_list: Optional[List[str]] = None if expected_signals is not None: seen: Set[str] = set() exp_list = [] for s in expected_signals: nm = str(s) if nm in seen: raise ValueError("Duplicate names in expected_signals") seen.add(nm) exp_list.append(nm) # Load database dbobj = db if isinstance(db, CantoolsDatabase) else load_dbc_files(db) if message_ids is not None and not message_ids: glogger.warning("Empty message_ids provided; no messages decoded") # Determine expected signals all_sigs = [sig.name for msg in dbobj.messages for sig in msg.signals] expected = exp_list if exp_list is not None else all_sigs # Validate dtype_map dtype_map = config.dtype_map or {} for sig in dtype_map: if str(sig) not in expected: raise ValueError(f"dtype_map contains unknown signal: {sig}") # Decode chunks try: chunks = list(iter_blf_chunks( blf_path, dbobj, config, message_ids, expected )) except FileNotFoundError: raise except Exception as e: glogger.error("Failed to process BLF chunks", exc_info=True) raise ValueError(f"Failed to process BLF data: {e}") from e # Assemble DataFrame if not chunks: glogger.warning(f"No data decoded from {blf_path}; returning empty DataFrame") df = pd.DataFrame({ "timestamp": pd.Series(dtype=float), **{sig: pd.Series(dtype=dtype_map.get(sig, float)) for sig in expected} }) else: df = pd.concat(chunks, ignore_index=True) # Filter columns keep = [c for c in ["timestamp"] + expected if c in df.columns] df = df[keep] # Sort timestamps if config.sort_timestamps: df = df.sort_values("timestamp").reset_index(drop=True) # Uniform timing if config.force_uniform_timing: df["raw_timestamp"] = df["timestamp"] df["timestamp"] = np.arange(len(df)) * config.interval_seconds # Inject missing signals for sig in expected: if sig not in df.columns: dt = np.dtype(dtype_map.get(sig, float)) if config.interpolate_missing and sig in all_sigs: srs = pd.Series(np.nan, index=df.index, dtype=dt) df[sig] = srs.interpolate(method="linear", limit_direction="both") elif np.issubdtype(dt, np.integer): df[sig] = np.zeros(len(df), dtype=dt) else: df[sig] = pd.Series(np.nan, index=df.index, dtype=dt) # Metadata attributes df.attrs["signal_attributes"] = { sig.name: getattr(sig, "attributes", {}) for msg in dbobj.messages for sig in msg.signals if sig.name in df.columns } # Enum conversion for msg in dbobj.messages: for sig in msg.signals: if sig.name in df.columns and getattr(sig, "choices", None): safe_map = {str(k): v for k, v in sig.choices.items()} df[sig.name] = ( df[sig.name].astype(str) .map(lambda x: safe_map.get(x, x)) ) df[sig.name] = pd.Categorical(df[sig.name], categories=list(safe_map.values())) return df[["timestamp"] + [c for c in df.columns if c != "timestamp"]]
# ---------------------------------------------------------------------------- # CSV export # ----------------------------------------------------------------------------
[docs] def to_csv( df_or_iter: Union[pd.DataFrame, Iterable[pd.DataFrame]], output_path: str, mode: str = "w", header: bool = True, pandas_kwargs: Optional[Dict[str, Any]] = None, columns: Optional[List[str]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame or chunks to CSV with metadata JSON. Args: df_or_iter (DataFrame or iterable): Data to write. output_path (str): CSV path. mode ("w"/"a"): Write or append mode. header (bool): Include header row. pandas_kwargs (dict, optional): Extra pandas.to_csv kwargs. columns (list, optional): Subset of columns to write. metadata_path (str, optional): JSON path for signal_attributes. Example: to_csv(df, "out.csv", metadata_path="out_meta.json") """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} if columns and len(columns) != len(set(columns)): raise ValueError("Duplicate columns specified") p.parent.mkdir(parents=True, exist_ok=True) def _write(df: pd.DataFrame, m, h, wmeta): df.to_csv(p, mode=m, header=h, index=False, columns=columns, **pandas_kwargs) if metadata_path and wmeta: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = df.attrs.get("signal_attributes", {c: {} for c in df.columns}) mpath.write_text(json.dumps(attrs)) if isinstance(df_or_iter, pd.DataFrame): _write(df_or_iter, mode, header, True) else: first = True for chunk in df_or_iter: _write(chunk, mode if first else "a", header if first else False, first) first = False glogger.info(f"CSV written to {output_path}")
# ---------------------------------------------------------------------------- # Parquet export # ----------------------------------------------------------------------------
[docs] def to_parquet( df: pd.DataFrame, output_path: str, compression: str = "snappy", pandas_kwargs: Optional[Dict[str, Any]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write a DataFrame to Parquet with optional metadata JSON. Args: df (DataFrame): Data to write. output_path (str): .parquet file path. compression (str): Codec e.g. snappy, gzip. pandas_kwargs (dict, optional): Extra pandas.to_parquet kwargs. metadata_path (str, optional): JSON path for signal_attributes. Example: to_parquet(df, "data.parquet", metadata_path="data_meta.json") """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} p.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(p, engine="pyarrow", compression=compression, **pandas_kwargs) except Exception as e: glogger.error(f"Failed Parquet {p}: {e}", exc_info=True) raise ValueError(f"Failed to export Parquet: {e}") from e if metadata_path: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = df.attrs.get("signal_attributes", {c: {} for c in df.columns}) mpath.write_text(json.dumps(attrs)) glogger.info(f"Metadata written to {mpath}") glogger.info(f"Parquet written to {output_path}")