Source code for canml.canmlio

"""
canmlio: Enhanced CAN BLF processing toolkit for production use.

This module provides end-to-end functionality for decoding CAN bus logs in BLF
format into pandas DataFrames, handling DBC file loading and merging,
streaming large logs, full-file loading with filtering, timing alignment,
missing-signal injection, and exporting to CSV or Parquet with accompanying
metadata. It also supports enums and custom signal attributes, all configurable
via a single `CanmlConfig` object.

Dependencies:
  - numpy
  - pandas
  - cantools
  - python-can
  - tqdm
  - pyarrow (for Parquet export)

Example:
    from canml.canmlio import load_dbc_files, load_blf, to_csv, CanmlConfig

    # 1. Load DBC with safe prefixing
    db = load_dbc_files("vehicle.dbc", prefix_signals=True)

    # 2. Configure BLF loading
    cfg = CanmlConfig(
        chunk_size=5000,
        progress_bar=True,
        sort_timestamps=True,
        force_uniform_timing=True,
        interval_seconds=0.02,
        interpolate_missing=True,
        dtype_map={"Engine_RPM": "int32"}
    )

    # 3. Load BLF file into DataFrame
    df = load_blf(
        blf_path="drive.blf",
        db=db,
        config=cfg,
        message_ids={0x100, 0x200},
        expected_signals=["Engine_RPM", "Brake_Active"]
    )

    # 4. Export results
    to_csv(df, "drive.csv", metadata_path="drive_meta.json")
"""
import logging
import json
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from collections import Counter

import numpy as np
import pandas as pd
import cantools
from cantools.database.can import Database as CantoolsDatabase
from can.io.blf import BLFReader
from tqdm import tqdm

__all__ = [
    "CanmlConfig",
    "load_dbc_files",
    "iter_blf_chunks",
    "load_blf",
    "to_csv",
    "to_parquet",
]

# ----------------------------------------------------------------------------
# Logger setup
# ----------------------------------------------------------------------------

glogger = logging.getLogger(__name__)
glogger.handlers.clear()
_handler = logging.StreamHandler()
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
glogger.addHandler(_handler)
glogger.setLevel(logging.INFO)

# ----------------------------------------------------------------------------
# Configuration dataclass
# ----------------------------------------------------------------------------
[docs] @dataclass class CanmlConfig: """ Configuration options for BLF processing. Args: chunk_size (int): Number of messages per chunk. Defaults to 10000. progress_bar (bool): Show tqdm bar if True. Defaults to True. dtype_map (Optional[Dict[str, Any]]): Signal-to-dtype map. Defaults to None. sort_timestamps (bool): Sort by timestamp. Defaults to False. force_uniform_timing (bool): Uniform spacing of timestamps. Defaults to False. interval_seconds (float): Uniform interval seconds. Defaults to 0.01. interpolate_missing (bool): Interpolate missing signals. Defaults to False. Raises: ValueError: If chunk_size or interval_seconds <= 0. """ chunk_size: int = 10000 progress_bar: bool = True dtype_map: Optional[Dict[str, Any]] = None sort_timestamps: bool = False force_uniform_timing: bool = False interval_seconds: float = 0.01 interpolate_missing: bool = False def __post_init__(self): if self.chunk_size <= 0: raise ValueError("chunk_size must be positive") if self.interval_seconds <= 0: raise ValueError("interval_seconds must be positive")
# ---------------------------------------------------------------------------- # DBC loading and merging with safe signal prefixing # ---------------------------------------------------------------------------- @lru_cache(maxsize=32) def _load_dbc_files_cached( dbc_paths: Union[str, Tuple[str, ...]], prefix_signals: bool ) -> CantoolsDatabase: """ Internal: Load and merge .dbc files into a single CantoolsDatabase. """ paths = [dbc_paths] if isinstance(dbc_paths, str) else list(dbc_paths) if not paths: raise ValueError("At least one DBC file must be provided") db = CantoolsDatabase() for p in paths: pth = Path(p) if pth.suffix.lower() != ".dbc": raise ValueError(f"File {pth} is not a .dbc file") if not pth.is_file(): raise FileNotFoundError(f"DBC file not found: {pth}") glogger.debug(f"Loading DBC: {pth}") try: db.add_dbc_file(str(pth)) except cantools.database.errors.ParseError as e: raise ValueError(f"Invalid DBC format in {pth}: {e}") from e except Exception as e: raise ValueError(f"Invalid DBC file {pth}: {e}") from e # Prefixing logic all_signals = [sig.name for msg in db.messages for sig in msg.signals] if not prefix_signals: dupes = [n for n, c in Counter(all_signals).items() if c > 1] if dupes: raise ValueError( f"Duplicate signal names: {sorted(dupes)}; use prefix_signals=True" ) else: msg_names = [msg.name for msg in db.messages] dup_msgs = [n for n, c in Counter(msg_names).items() if c > 1] for idx, msg in enumerate(db.messages): if dup_msgs: if hasattr(msg, 'frame_id'): key = msg.frame_id elif hasattr(msg, 'arbitration_id'): key = msg.arbitration_id else: key = idx prefix = f"{msg.name}_{key}" else: prefix = msg.name for sig in msg.signals: sig.name = f"{prefix}_{sig.name}" return db
[docs] def load_dbc_files( dbc_paths: Union[str, List[str]], prefix_signals: bool = False ) -> CantoolsDatabase: """ Load and merge DBC files with optional prefixing. """ key = tuple(dbc_paths) if isinstance(dbc_paths, list) else dbc_paths return _load_dbc_files_cached(key, prefix_signals)
# ---------------------------------------------------------------------------- # BLFReader context manager # ---------------------------------------------------------------------------- @contextmanager def blf_reader(path: str) -> Iterator[BLFReader]: reader = BLFReader(str(path)) try: yield reader finally: try: reader.stop() except Exception: glogger.debug("Error closing BLF reader", exc_info=True) # ---------------------------------------------------------------------------- # Stream-decode BLF in chunks with drop summary # ----------------------------------------------------------------------------
[docs] def iter_blf_chunks( blf_path: str, db: CantoolsDatabase, config: CanmlConfig, filter_ids: Optional[Set[int]] = None, filter_signals: Optional[Iterable[Any]] = None, ) -> Iterator[pd.DataFrame]: """ Stream-decode a BLF file into pandas DataFrame chunks. Logs total vs dropped message counts. """ p = Path(blf_path) if p.suffix.lower() != ".blf" or not p.is_file(): raise FileNotFoundError(f"Valid BLF file not found: {p}") sig_set: Optional[Set[str]] = None if filter_signals is not None: sig_set = set() for s in filter_signals: try: sig_set.add(str(s)) except Exception: continue total = 0 dropped = 0 buffer: List[Dict[str, Any]] = [] with blf_reader(blf_path) as reader: it = tqdm(reader, desc=p.name) if config.progress_bar else reader for msg in it: total += 1 if filter_ids and msg.arbitration_id not in filter_ids: dropped += 1 continue try: rec = db.decode_message(msg.arbitration_id, msg.data) except Exception: dropped += 1 continue if sig_set is not None: rec = {k: v for k, v in rec.items() if k in sig_set} if rec: rec["timestamp"] = msg.timestamp buffer.append(rec) else: dropped += 1 if len(buffer) >= config.chunk_size: yield pd.DataFrame(buffer) buffer.clear() if buffer: yield pd.DataFrame(buffer) glogger.info(f"Decoded {total-dropped}/{total} messages ({dropped} dropped)")
# ---------------------------------------------------------------------------- # Full-file load with filtering, timing, injection, metadata, enums # ----------------------------------------------------------------------------
[docs] def load_blf( blf_path: str, db: Union[CantoolsDatabase, str, List[str]], config: Optional[CanmlConfig] = None, message_ids: Optional[Set[int]] = None, expected_signals: Optional[Iterable[Any]] = None, ) -> pd.DataFrame: """ Load an entire BLF file into a DataFrame. Supports: - ID and signal filtering - Timestamp sorting and uniform spacing - Missing signal injection with dtype preservation - Metadata attributes and enum conversion """ config = config or CanmlConfig() # Normalize expected_signals exp_list: Optional[List[str]] = None if expected_signals is not None: seen: Set[str] = set() exp_list = [] for s in expected_signals: nm = str(s) if nm in seen: raise ValueError("Duplicate names in expected_signals") seen.add(nm) exp_list.append(nm) # Prevent collision with timing if exp_list and ("timestamp" in exp_list or "raw_timestamp" in exp_list): raise ValueError("'timestamp' or 'raw_timestamp' cannot be expected_signals") # Load or reuse database dbobj = db if isinstance(db, CantoolsDatabase) else load_dbc_files(db) if message_ids is not None and not message_ids: glogger.warning("Empty message_ids provided; no messages will be decoded") # Determine signals to include all_sigs: List[str] = [sig.name for msg in dbobj.messages for sig in msg.signals] expected: List[str] = exp_list if exp_list is not None else all_sigs # Validate dtype_map dtype_map: Dict[str, Any] = config.dtype_map or {} for sig, dt in dtype_map.items(): if sig not in expected: raise ValueError(f"dtype_map contains unknown signal: {sig}") try: pd.Series(dtype=dt) except Exception: raise ValueError(f"Invalid dtype '{dt}' for signal '{sig}'") # Stream decode try: chunks = list(iter_blf_chunks( blf_path, dbobj, config, message_ids, expected )) except FileNotFoundError: raise except Exception as e: glogger.error("Failed to process BLF chunks", exc_info=True) raise ValueError(f"Failed to process BLF data: {e}") from e # Concatenate or create empty if not chunks: glogger.warning(f"No data decoded from {blf_path}; returning empty DataFrame") df = pd.DataFrame({ "timestamp": pd.Series(dtype=float), **{sig: pd.Series(dtype=dtype_map.get(sig, float)) for sig in expected} }) else: df = pd.concat(chunks, ignore_index=True) # Keep only timestamp + expected signals cols_keep = [c for c in ["timestamp"] + expected if c in df.columns] df = df[cols_keep] # Sort and uniform timing if config.sort_timestamps: df = df.sort_values("timestamp").reset_index(drop=True) if config.force_uniform_timing: df["raw_timestamp"] = df["timestamp"] df["timestamp"] = np.arange(len(df)) * config.interval_seconds # Inject missing signals reserved = {"timestamp", "raw_timestamp"} for sig in expected: if sig in reserved: continue if sig not in df.columns: dt = np.dtype(dtype_map.get(sig, float)) if config.interpolate_missing and sig in all_sigs: srs = pd.Series(np.nan, index=df.index, dtype=dt) df[sig] = srs.interpolate(method="linear", limit_direction="both") elif np.issubdtype(dt, np.integer): df[sig] = np.zeros(len(df), dtype=dt) else: df[sig] = pd.Series(np.nan, index=df.index, dtype=dt) # Metadata attributes df.attrs["signal_attributes"] = { sig.name: getattr(sig, "attributes", {}) for msg in dbobj.messages for sig in msg.signals if sig.name in df.columns } # Enum conversion: safely map values to string labels for msg in dbobj.messages: for sig in msg.signals: if sig.name in df.columns and getattr(sig, "choices", None): choices = sig.choices # raw -> label cats = [str(lab) for lab in choices.values()] def _map_label(x): raw = getattr(x, 'value', x) lbl = choices.get(raw, raw) return str(lbl) df[sig.name] = df[sig.name].apply(_map_label) df[sig.name] = pd.Categorical(df[sig.name], categories=cats) return df
# ---------------------------------------------------------------------------- # CSV export # ----------------------------------------------------------------------------
[docs] def to_csv( df_or_iter: Union[pd.DataFrame, Iterable[pd.DataFrame]], output_path: str, mode: str = "w", header: bool = True, pandas_kwargs: Optional[Dict[str, Any]] = None, columns: Optional[List[str]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame or chunks to CSV with side-car metadata JSON. Args: df_or_iter (DataFrame or iterable): Data to write. output_path (str): Destination CSV file path. mode (str): Write mode 'w' or 'a'. header (bool): Include header in CSV. pandas_kwargs (dict): Extra pandas.to_csv args. columns (list): Subset of columns to write. metadata_path (str): Path to JSON for signal_attributes. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} if columns and len(columns) != len(set(columns)): raise ValueError("Duplicate columns specified") p.parent.mkdir(parents=True, exist_ok=True) def _write(block: pd.DataFrame, m, h, wmeta): block.to_csv(p, mode=m, header=h, index=False, columns=columns, **pandas_kwargs) if metadata_path and wmeta: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = block.attrs.get("signal_attributes", {c: {} for c in block.columns}) mpath.write_text(json.dumps(attrs)) if isinstance(df_or_iter, pd.DataFrame): _write(df_or_iter, mode, header, True) else: first = True for chunk in df_or_iter: _write(chunk, mode if first else "a", header if first else False, first) first = False glogger.info(f"CSV written to {output_path}")
# ---------------------------------------------------------------------------- # Parquet export # ----------------------------------------------------------------------------
[docs] def to_parquet( df: pd.DataFrame, output_path: str, compression: str = "snappy", pandas_kwargs: Optional[Dict[str, Any]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame to Parquet with side-car metadata JSON. Args: df (DataFrame): Data to write. output_path (str): Destination .parquet file path. compression (str): Parquet codec. pandas_kwargs (dict): Extra pandas.to_parquet args. metadata_path (str): JSON path for signal_attributes. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} p.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(p, engine="pyarrow", compression=compression, **pandas_kwargs) except Exception as e: glogger.error(f"Failed to export Parquet {p}: {e}", exc_info=True) raise ValueError(f"Failed to export Parquet: {e}") if metadata_path: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = df.attrs.get("signal_attributes", {c: {} for c in df.columns}) mpath.write_text(json.dumps(attrs)) glogger.info(f"Metadata written to {mpath}") glogger.info(f"Parquet written to {output_path}")