Source code for pychopmarg.utility.general

"""
General purpose utilities for PyChOpMarg.

Original author: David Banas <capn.freako@gmail.com>

Original date:   March 3, 2024

Copyright (c) 2024 David Banas; all rights reserved World wide.
"""

from pathlib import Path
import re
from typing import Any, TypeVar

import numpy as np  # type: ignore
from scipy.interpolate import interp1d

from pychopmarg.common import Rvec, Cvec, ChnlSet, ChnlGrpName

T = TypeVar('T', Any, Any)


[docs] def all_combs(xss: list[list[T]]) -> list[list[T]]: """ Generate all combinations of input. Args: xss: The lists of candidates for each position in the final output. Returns: All possible combinations of inputs. """ if not xss: return [[]] head, *tail = xss yss = all_combs(tail) return [[x, *ys] for x in head for ys in yss] # type: ignore
[docs] def mk_combs(trips: list[tuple[float, float, float]]) -> list[Rvec]: """ Make all possible combinations of tap weights, given a list of "(min, max, step)" triples. Args: trips: A list of "(min, max, step)" triples, one per weight. Returns: A list of NDArrays of tap weights, including all possible combinations. """ ranges = [] for trip in trips: if trip[2]: # non-zero step? ranges.append(list(np.arange(trip[0], trip[1] + trip[2], trip[2]))) else: ranges.append([0.0]) # type: ignore return list(map(np.array, all_combs(ranges)))
[docs] def from_irfft(x: Rvec, t_irfft: Rvec, t: Rvec, nspui: int) -> Rvec: """ Interpolate ``irfft()`` output to ``t`` and subsample at fBaud. Args: x: ``irfft()`` results to be interpolated and subsampled. t_irfft: Time index vector for ``x``. t: Desired new time index vector (same units as ``t_irfft``). nspui: Number of samples per unit interval in ``t``. Returns: Interpolated and subsampled vector. Raises: IndexError: If length of input doesn't match length of ``t_irfft`` vector. Notes: 1. Input vector is shifted, such that its peak occurs at ``0.1 * max(t)``, before interpolating. This is done to: - ensure that we don't omit any non-causal behavior, which ends up at the end of an IFFT output vector when the peak is very near the beginning, and - to ensure that the majority of our available time span is available for capturing reflections. 2. The sub-sampling phase is adjusted, so as to ensure that we catch the peak. """ assert len(x) == len(t_irfft), IndexError( f"Length of input ({len(x)}) must match length of `t_irfft` vector ({len(t_irfft)})!") t_pk = 0.1 * t[-1] # target peak location time targ_ix = np.where(t_irfft >= t_pk)[0][0] # target peak vector index, in `x` curr_ix = np.argmax(x) # current peak vector index, in `x` _x = np.roll(x, targ_ix - curr_ix) # `x` with peak repositioned krnl = interp1d(t_irfft, _x, bounds_error=False, fill_value="extrapolate", assume_sorted=True) y = krnl(t) _, curs_ofst = divmod(np.argmax(y), nspui) # Ensure that we capture the peak in the next step. return y[curs_ofst::nspui] # Sampled at fBaud, w/ peak captured.
[docs] def fwhm(pr: Rvec) -> float: """ Measure the full width at half maximum of the given pulse response. Args: pr: Pulse response to measure. Returns: fwhm: Full width at half max of largest peak in given signal. Notes: 1. Used to characterize the _bandwidth_ of a given channel. """ pk_loc = np.where(pr == max(pr))[0][0] half_max = 0.5 * pr[pk_loc] left_hm = pk_loc while left_hm > 0 and pr[left_hm] > half_max: left_hm -= 1 right_hm = pk_loc while right_hm < (len(pr) - 1) and pr[right_hm] > half_max: right_hm += 1 return right_hm - left_hm
[docs] def reflectivity(pr: Rvec) -> float: """ Measure the _reflectivity_ of a channel with the given pulse response. Args: pr: Pulse response of channel. Returns: ref: Reflectivity of channel. Notes: 1. Use sum of: delta-x weighted by power at delta-x. """ pk_loc = np.where(pr == max(pr))[0][0] return sum(dn * y**2 for dn, y in enumerate(pr[pk_loc:]))
[docs] def get_channel_sets(path: Path) -> dict[ChnlGrpName, list[ChnlSet]]: """ Return all available groups of channel sets in the given path. Args: path: The folder in which to begin searching. (Assumed to contain some number of sub-directories, in which the actual channel sets are contained.) Returns: Dictionary of channel groups, each containing a list of channel sets. Notes: 1. A "channel set" is a dictionary containing a thru channel and some number of NEXT and FEXT aggressors. """ chnl_groups = list(filter(lambda p: p.is_dir(), path.iterdir())) chnl_groups.sort() channels: dict[ChnlGrpName, list[ChnlSet]] = {} for chnl_grp in chnl_groups: channels[chnl_grp.name] = [] thru_chnls = list(chnl_grp.glob("*[tT][hH][rR][uU]*.[sS]4[pP]")) # No global option for case insens. glob(). thru_chnls.sort() for thru_chnl in thru_chnls: nexts = list(chnl_grp.glob(re.sub("thru", "[nN][eE][xX][tT][0-9]", thru_chnl.name, flags=re.IGNORECASE))) nexts.sort() fexts = list(chnl_grp.glob(re.sub("thru", "[fF][eE][xX][tT][0-9]", thru_chnl.name, flags=re.IGNORECASE))) fexts.sort() channels[chnl_grp.name].append({ # Here we're constructing a "channel set" dictionary. "THRU": [thru_chnl], "NEXT": nexts, "FEXT": fexts }) return channels
[docs] def dBm_Hz(x: Rvec) -> Rvec: "Convert (V^2/Hz) to (dBm/Hz), assuming 100 Ohm system impedance." return 10 * np.log10(1e3 * x / 100)
[docs] def mag_dB(x: Cvec) -> Rvec: "Return the magnitude in dB of a complex amplitude vector." return 20 * np.log10(np.abs(x))