Source code for canml.canmlio

"""
canmlio: Enhanced CAN BLF processing toolkit for production use.

This module provides end-to-end functionality for decoding CAN bus logs in BLF
format into pandas DataFrames, handling DBC file loading and merging,
streaming large logs, full-file loading with filtering, timing alignment,
missing-signal injection, and exporting to CSV or Parquet with accompanying
metadata. It also supports enums and custom signal attributes, all configurable
via a single `CanmlConfig` object.

Dependencies:
  - numpy
  - pandas
  - cantools
  - python-can
  - tqdm
  - pyarrow (for Parquet export)

Example:
    from canml.canmlio import load_dbc_files, load_blf, to_csv, CanmlConfig

    # 1. Load DBC with safe prefixing
    db = load_dbc_files("vehicle.dbc", prefix_signals=True)

    # 2. Configure BLF loading
    cfg = CanmlConfig(
        chunk_size=5000,
        progress_bar=True,
        sort_timestamps=True,
        force_uniform_timing=True,
        interval_seconds=0.02,
        interpolate_missing=True,
        dtype_map={"Engine_RPM": "int32"}
    )

    # 3. Load BLF file into DataFrame
    df = load_blf(
        blf_path="drive.blf",
        db=db,
        config=cfg,
        message_ids={0x100, 0x200},
        expected_signals=["Engine_RPM", "Brake_Active"]
    )

    # 4. Export results
    to_csv(df, "drive.csv", metadata_path="drive_meta.json")
"""
import logging
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from contextlib import contextmanager
from dataclasses import dataclass
from functools import lru_cache
from collections import Counter

import numpy as np
import pandas as pd
import cantools
from cantools.database.can import Database as CantoolsDatabase
from can.io.blf import BLFReader
from tqdm import tqdm

__all__ = [
    "CanmlConfig",
    "load_dbc_files",
    "iter_blf_chunks",
    "load_blf",
    "to_csv",
    "to_parquet",
]

# ----------------------------------------------------------------------------
# Logger setup
# ----------------------------------------------------------------------------

glogger = logging.getLogger(__name__)
glogger.handlers.clear()
_handler = logging.StreamHandler()
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
glogger.addHandler(_handler)
glogger.setLevel(logging.INFO)

# ----------------------------------------------------------------------------
# Configuration dataclass
# ----------------------------------------------------------------------------
[docs] @dataclass class CanmlConfig: """ Configuration options for BLF processing. Args: chunk_size (int): Number of messages per chunk. Defaults to 10000. progress_bar (bool): Show tqdm bar if True. Defaults to True. dtype_map (Optional[Dict[str, str]]): Signal-to-dtype map. Defaults to None. sort_timestamps (bool): Sort by timestamp. Defaults to False. force_uniform_timing (bool): Uniform spacing of timestamps. Defaults to False. interval_seconds (float): Uniform interval seconds. Defaults to 0.01. interpolate_missing (bool): Interpolate missing signals. Defaults to False. Raises: ValueError: If chunk_size or interval_seconds <= 0. """ chunk_size: int = 10000 progress_bar: bool = True dtype_map: Optional[Dict[str, Any]] = None sort_timestamps: bool = False force_uniform_timing: bool = False interval_seconds: float = 0.01 interpolate_missing: bool = False def __post_init__(self): if self.chunk_size <= 0: raise ValueError("chunk_size must be positive") if self.interval_seconds <= 0: raise ValueError("interval_seconds must be positive")
# ---------------------------------------------------------------------------- # DBC loading and merging with safe signal prefixing # ---------------------------------------------------------------------------- @lru_cache(maxsize=32) def _load_dbc_files_cached( dbc_paths: Union[str, Tuple[str, ...]], prefix_signals: bool ) -> CantoolsDatabase: paths = [dbc_paths] if isinstance(dbc_paths, str) else list(dbc_paths) if not paths: raise ValueError("At least one DBC file must be provided") db = CantoolsDatabase() for p in paths: pth = Path(p) if pth.suffix.lower() != ".dbc": raise ValueError(f"File {pth} is not a .dbc file") if not pth.is_file(): raise FileNotFoundError(f"DBC file not found: {pth}") glogger.debug(f"Loading DBC: {pth}") try: db.add_dbc_file(str(pth)) except cantools.database.errors.ParseError as e: raise ValueError(f"Invalid DBC format in {pth}: {e}") from e except Exception as e: raise ValueError(f"Invalid DBC file {pth}: {e}") from e # Collect all signal names all_signals = [sig.name for msg in db.messages for sig in msg.signals] if not prefix_signals: dupes = [n for n, c in Counter(all_signals).items() if c > 1] if dupes: raise ValueError( f"Duplicate signal names: {sorted(dupes)}; use prefix_signals=True" ) else: # Determine if message names collide msg_names = [msg.name for msg in db.messages] dup_msgs = [n for n, c in Counter(msg_names).items() if c > 1] for idx, msg in enumerate(db.messages): # Choose unique prefix key if dup_msgs: if hasattr(msg, 'frame_id'): key = msg.frame_id elif hasattr(msg, 'arbitration_id'): key = msg.arbitration_id else: key = idx prefix = f"{msg.name}_{key}" else: prefix = msg.name for sig in msg.signals: sig.name = f"{prefix}_{sig.name}" return db
[docs] def load_dbc_files( dbc_paths: Union[str, List[str]], prefix_signals: bool = False ) -> CantoolsDatabase: """ Load and merge one or more DBC files, with optional signal prefixing. Args: dbc_paths (str or list): Path or list of DBC files. prefix_signals (bool): Prefix signals to avoid collisions. Returns: CantoolsDatabase: Merged definitions. """ key = tuple(dbc_paths) if isinstance(dbc_paths, list) else dbc_paths return _load_dbc_files_cached(key, prefix_signals)
# ---------------------------------------------------------------------------- # BLFReader context manager # ---------------------------------------------------------------------------- @contextmanager def blf_reader(path: str) -> Iterator[BLFReader]: reader = BLFReader(str(path)) try: yield reader finally: try: reader.stop() except Exception: glogger.debug("Error closing BLF reader", exc_info=True) # ---------------------------------------------------------------------------- # Stream-decode BLF in chunks with drop summary # ----------------------------------------------------------------------------
[docs] def iter_blf_chunks( blf_path: str, db: CantoolsDatabase, config: CanmlConfig, filter_ids: Optional[Set[int]] = None, filter_signals: Optional[Iterable[Any]] = None, ) -> Iterator[pd.DataFrame]: """ Stream-decode a BLF file into pandas DataFrame chunks. Logs total vs dropped message counts. """ p = Path(blf_path) if p.suffix.lower() != ".blf" or not p.is_file(): raise FileNotFoundError(f"Valid BLF file not found: {p}") # Prepare signal filter set sig_set = None if filter_signals is not None: sig_set = set() for s in filter_signals: try: sig_set.add(str(s)) except Exception: continue total = 0 dropped = 0 buffer: List[Dict[str, Any]] = [] with blf_reader(blf_path) as reader: it = tqdm(reader, desc=p.name) if config.progress_bar else reader for msg in it: total += 1 if filter_ids and msg.arbitration_id not in filter_ids: dropped += 1 continue try: rec = db.decode_message(msg.arbitration_id, msg.data) except Exception: dropped += 1 continue if sig_set is not None: rec = {k: v for k, v in rec.items() if k in sig_set} if rec: rec["timestamp"] = msg.timestamp buffer.append(rec) else: dropped += 1 if len(buffer) >= config.chunk_size: yield pd.DataFrame(buffer) buffer.clear() if buffer: yield pd.DataFrame(buffer) glogger.info(f"Decoded {total-dropped}/{total} messages ({dropped} dropped)")
# ---------------------------------------------------------------------------- # Full-file load with filtering, timing, injection, metadata, enums # ----------------------------------------------------------------------------
[docs] def load_blf( blf_path: str, db: Union[CantoolsDatabase, str, List[str]], config: Optional[CanmlConfig] = None, message_ids: Optional[Set[int]] = None, expected_signals: Optional[Iterable[Any]] = None, ) -> pd.DataFrame: """ Load an entire BLF file into a DataFrame. Supports: - ID and signal filtering - Timestamp sorting and uniform spacing - Missing signal injection with dtype preservation - Metadata attributes and enum conversion """ config = config or CanmlConfig() # Normalize expected_signals exp_list = None if expected_signals is not None: seen = set() exp_list = [] for s in expected_signals: nm = str(s) if nm in seen: raise ValueError("Duplicate names in expected_signals") seen.add(nm) exp_list.append(nm) # Prevent collision with timing if exp_list and ("timestamp" in exp_list or "raw_timestamp" in exp_list): raise ValueError("'timestamp' or 'raw_timestamp' cannot be expected_signals") # Load or reuse database dbobj = db if isinstance(db, CantoolsDatabase) else load_dbc_files(db) if message_ids is not None and not message_ids: glogger.warning("Empty message_ids provided; no messages will be decoded") # Determine signals to include all_sigs = [sig.name for msg in dbobj.messages for sig in msg.signals] expected = exp_list if exp_list is not None else all_sigs # Validate dtype_map dtype_map = config.dtype_map or {} for sig, dt in dtype_map.items(): if sig not in expected: raise ValueError(f"dtype_map contains unknown signal: {sig}") try: pd.Series(dtype=dt) except Exception: raise ValueError(f"Invalid dtype '{dt}' for signal '{sig}'") # Stream decode try: chunks = list(iter_blf_chunks( blf_path, dbobj, config, message_ids, expected )) except FileNotFoundError: raise except Exception as e: glogger.error("Failed to process BLF chunks", exc_info=True) raise ValueError(f"Failed to process BLF data: {e}") from e # Concatenate or create empty if not chunks: glogger.warning(f"No data decoded from {blf_path}; returning empty DataFrame") df = pd.DataFrame({ "timestamp": pd.Series(dtype=float), **{sig: pd.Series(dtype=dtype_map.get(sig, float)) for sig in expected} }) else: df = pd.concat(chunks, ignore_index=True) # Keep only timestamp + expected signals cols = [c for c in ["timestamp"] + expected if c in df.columns] df = df[cols] # Sort and uniform timing if config.sort_timestamps: df = df.sort_values("timestamp").reset_index(drop=True) if config.force_uniform_timing: df["raw_timestamp"] = df["timestamp"] df["timestamp"] = np.arange(len(df)) * config.interval_seconds # Inject missing signals reserved = {"timestamp", "raw_timestamp"} for sig in expected: if sig in reserved: continue if sig not in df.columns: dt = np.dtype(dtype_map.get(sig, float)) if config.interpolate_missing and sig in all_sigs: srs = pd.Series(np.nan, index=df.index, dtype=dt) df[sig] = srs.interpolate(method="linear", limit_direction="both") elif np.issubdtype(dt, np.integer): df[sig] = np.zeros(len(df), dtype=dt) else: df[sig] = pd.Series(np.nan, index=df.index, dtype=dt) # Metadata attributes df.attrs["signal_attributes"] = { sig.name: getattr(sig, "attributes", {}) for msg in dbobj.messages for sig in msg.signals if sig.name in df.columns } # Enum conversion for msg in dbobj.messages: for sig in msg.signals: if sig.name in df.columns and getattr(sig, "choices", None): choices = sig.choices cats = list(choices.values()) def _map(x): raw = getattr(x, 'value', x) return choices.get(raw, x) df[sig.name] = df[sig.name].apply(_map) df[sig.name] = pd.Categorical(df[sig.name], categories=cats) return df
# ---------------------------------------------------------------------------- # CSV export # ----------------------------------------------------------------------------
[docs] def to_csv( df_or_iter: Union[pd.DataFrame, Iterable[pd.DataFrame]], output_path: str, mode: str = "w", header: bool = True, pandas_kwargs: Optional[Dict[str, Any]] = None, columns: Optional[List[str]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame or chunks to CSV with side-car metadata JSON. Args: df_or_iter (DataFrame or iterable): Data to write. output_path (str): Destination CSV file path. mode (str): Write mode 'w' or 'a'. header (bool): Include header in CSV. pandas_kwargs (dict): Extra pandas.to_csv args. columns (list): Subset of columns to write. metadata_path (str): Path to JSON for signal_attributes. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} if columns and len(columns) != len(set(columns)): raise ValueError("Duplicate columns specified") p.parent.mkdir(parents=True, exist_ok=True) def _write(block: pd.DataFrame, m, h, wmeta): block.to_csv(p, mode=m, header=h, index=False, columns=columns, **pandas_kwargs) if metadata_path and wmeta: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = block.attrs.get("signal_attributes", {c: {} for c in block.columns}) mpath.write_text(json.dumps(attrs)) if isinstance(df_or_iter, pd.DataFrame): _write(df_or_iter, mode, header, True) else: first = True for chunk in df_or_iter: _write(chunk, mode if first else "a", header if first else False, first) first = False glogger.info(f"CSV written to {output_path}")
# ---------------------------------------------------------------------------- # Parquet export # ----------------------------------------------------------------------------
[docs] def to_parquet( df: pd.DataFrame, output_path: str, compression: str = "snappy", pandas_kwargs: Optional[Dict[str, Any]] = None, metadata_path: Optional[str] = None, ) -> None: """ Write DataFrame to Parquet with side-car metadata JSON. Args: df (DataFrame): Data to write. output_path (str): Destination .parquet file path. compression (str): Parquet codec. pandas_kwargs (dict): Extra pandas.to_parquet args. metadata_path (str): JSON path for signal_attributes. """ import json p = Path(output_path) pandas_kwargs = pandas_kwargs or {} p.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(p, engine="pyarrow", compression=compression, **pandas_kwargs) except Exception as e: glogger.error(f"Failed to export Parquet {p}: {e}", exc_info=True) raise ValueError(f"Failed to export Parquet: {e}") if metadata_path: mpath = Path(metadata_path) mpath.parent.mkdir(parents=True, exist_ok=True) attrs = df.attrs.get("signal_attributes", {c: {} for c in df.columns}) mpath.write_text(json.dumps(attrs)) glogger.info(f"Metadata written to {mpath}") glogger.info(f"Parquet written to {output_path}")