Source code for pychopmarg.com

#!/usr/bin/env python

# pylint: disable=too-many-lines

"""
The Channel Operating Margin (COM) model, as per IEEE 802.3-22 Annex 93A/178A.

Original author: David Banas <capn.freako@gmail.com>

Original date:   February 29, 2024

Copyright (c) 2024 David Banas; all rights reserved World wide.

Notes:
    1. Throughout this file, equation numbers refer to Annex 93A of the IEEE 802.3-22 standard.
    2. Throughout this file, reference may be made to the following, via "[n]" syntax:
        [1] - Healey, A., Hegde, R., _Reference receiver framework for 200G/lane electrical interfaces and PHYs_,
            IEEE P802.3dj Task Force, January 2024 (r4).

ToDo:
    1. Provide type hints for imports.
"""

from functools import cache
from pathlib import Path
from typing  import Any, Dict, Optional

import numpy as np  # type: ignore
import skrf  as rf  # type: ignore
from numpy             import array, arange

from pychopmarg.common import (
    PI, TWOPI, Cvec, Rvec, Cmat, OptMode,
    COMChnl, COMNtwk, ChnlSet, ChnlGrpName, ChnlSetName)
from pychopmarg.config.ieee_8023dj import IEEE_8023dj
from pychopmarg.config.template    import COMParams
from pychopmarg.noise    import NoiseCalc
from pychopmarg.optimize import NormMode, mmse, przf
from pychopmarg.utility  import (
    import_s32p, sdd_21, sDieLadderSegment, sPkgTline, sCshunt, filt_pr_samps,
    delta_pmf, mk_combs, calc_Hffe, calc_Hctle, calc_H21, calc_hJ, loc_curs)


[docs] class COM(): # pylint: disable=too-many-instance-attributes,too-many-public-methods """ Encoding of the IEEE 802.3-22 Annex 93A/178A 'Channel Operating Margin' (COM) specification. """ # General status_str = str("Ready") debug = bool(False) opt_mode = OptMode(OptMode.MMSE) norm_mode = NormMode(NormMode.P8023dj) unit_amp = bool(True) com_params = IEEE_8023dj # Linear EQ tx_ix: int = 0 # Index into list of all possible combinations of Tx FFE tap weights. rx_taps: Rvec = array([1.0]) dfe_taps: Rvec = array([]) nRxTaps: int = 0 nRxPreTaps: int = 0 # `dw` from `com_params` gDC = 0.0 # Choices are in `com_params.g_DC`. gDC2 = 0.0 # Choices are in `com_params.g_DC2`. rx_ffe_phase_matrix: Cmat = array([]) # Channel data vic_chnl_ix = int(1) # Used with s32p file. chnls: list[COMChnl] = [] chnls_noPkg: list[COMChnl] = [] pulse_resps_nopkg: list[Rvec] = [] pulse_resps_noeq: list[Rvec] = [] cursor_ix: int = 0 # Package zp_sel = 0 # package length selector _sPkgTx: list[rf.Network] = [] _sPkgRx: list[rf.Network] = [] def __init__( # noqa=E501 pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-statements self, com_params: COMParams, channels: Path | dict[str, list[Path]], vic_chnl_ix: int = 1, debug: bool = False, do_init: bool = True ) -> None: """ Args: com_params: The COM parameters for this instance. channels: Channel file path(s), either: - A single "*.s32p" containing a 32-port Touchstone model, or - A dictionary containing the following key/value pairs: - "THRU": A singleton list containing the path to the "*.s4p" of the thru-channel. - "FEXT": A list of "*.s4p"s containing the Touchstone models for all far-end aggressors. - "NEXT": A list of "*.s4p"s containing the Touchstone models for all near-end aggressors. Keyword Args: vic_chnl_ix: Index (1-based) of victim path in s32p. Default: 1 debug: Gather/report certain debugging information when ``True``. Default: ``False`` do_init: Skip normal initialization behavior when ``False``. Default: ``True`` """ self.com_params = com_params self.debug = debug self.do_init = do_init if not do_init: return # Process the given channel file names. ntwks: list[COMNtwk] = [] if isinstance(channels, str): # Should be a "*.s32p". assert channels.endswith(".s32p"), ValueError( "When `channels` is a string it must contain a '*.s32p' value!") if channels.exists() and channels.is_file(): ntwks = import_s32p(channels, vic_chnl_ix) else: raise RuntimeError( f"Unable to import '{channels}'!") elif isinstance(channels, dict): # Using s4p files. chnl_types = ["THRU", "FEXT", "NEXT"] assert all(k in channels and isinstance(channels[k], list) for k in chnl_types), ValueError( f"When `channels` is a dictionary it must contain: {chnl_types} keys, which must all refer to lists!") assert len(channels["THRU"]) == 1, ValueError( f"Length of `channels['THRU']` must be 1, not {len(channels['THRU'])}!") for fname, chtype in [(chnl_name, chnl_type) for chnl_type in chnl_types for chnl_name in channels[chnl_type]]: # noqa=E127 ntwks.append((sdd_21(rf.Network(fname)), chtype)) else: raise ValueError(f"`channels` must be of type 'str' or 'dict', not '{type(channels)}'!") # Create the system time & frequency vectors. fb = com_params.fb * 1e9 fstep = com_params.fstep * 1e9 # fstep = 50e6 # Dramatically improves performance. # - time tmax = 1 / fstep # Just enough to cover one full cycle of the fundamental. ui = 1 / fb tstep = ui / com_params.M # Obeying requested samps. per UI. t = arange(0, tmax, tstep) # - freq. fmax = 0.5 / t[1] # Nyquist freq. f = arange(0, fmax + fstep, fstep) # "+ fstep", to include `fmax`. self.t: Rvec = t self.f: Rvec = f # Pre-calculate constant responses. self._Xsinc = int(ui / t[1]) * np.sinc(ui * f) self._Ht = np.exp(-2 * (PI * (f / 1e9) * com_params.T_r / 1.6832)**2) # 93A-46 calls for f in GHz. _f = f / (com_params.f_r * fb) H_Butterworth = 1 / (1 - 3.414214 * _f**2 + _f**4 + 2.613126j * (_f - _f**3)) self._Hr = H_Butterworth Rd = com_params.R_d R0 = com_params.R_0 self._gamma1: Rvec = (Rd - R0) / (Rd + R0) z_pairs = list(zip(com_params.z_c, [com_params.z_p[self.zp_sel], com_params.z_pB])) sPkgTx_SE = rf.network.cascade_list([ self.sDie(False), sPkgTline(self.freqs, self.com_params.R_0, self.com_params.a1, self.com_params.a2, self.com_params.tau, self.com_params.gamma0, z_pairs), self.sC(self.com_params.C_p[0] / 1e9)]) self._sPkgTx = [sdd_21(rf.network.concat_ports([sPkgTx_SE, sPkgTx_SE], port_order='first'))] sPkgRx_SE = rf.network.cascade_list([ self.sC(self.com_params.C_p[1] / 1e9), sPkgTline(self.freqs, self.com_params.R_0, self.com_params.a1, self.com_params.a2, self.com_params.tau, self.com_params.gamma0, z_pairs), self.sDie(True)]) self._sPkgRx = [sdd_21(rf.network.concat_ports([sPkgRx_SE, sPkgRx_SE], port_order='first'))] self.chnls = list(map(self.add_pkg, ntwks)) if debug: self.chnls_noPkg = list( map(lambda ntwk: (ntwk, calc_H21(f, ntwk[0], self.gamma1_Tx, self.gamma2_Rx)), [ntwks[0]])) self.pulse_resps_nopkg = self.gen_pulse_resps(chnls=[self.chnls_noPkg[0]], apply_eq=False) self.pulse_resps_noeq = self.gen_pulse_resps(chnls=[self.chnls[0]], apply_eq=False) # Generate all possible combinations of Tx FFE tap weights. c0_min = com_params.c0_min trips = list(zip(com_params.tx_taps_min, com_params.tx_taps_max, com_params.tx_taps_step)) _tx_combs: list[Rvec] = list(filter( lambda v: (1 - abs(v).sum()) >= c0_min, mk_combs(trips))) self._tx_combs = [np.zeros(len(com_params.tx_taps_min)), *_tx_combs] self._num_tx_combs = len(self._tx_combs) # Set Rx FFE quantities. self.nRxTaps = len(com_params.rx_taps_max) self.nRxPreTaps = com_params.dw self.rx_ffe_phase_matrix = np.exp(np.outer(np.arange(self.nRxTaps), -1j * TWOPI * ui * f)) self.null_rx_ffe: Rvec = array([0] * self.nRxPreTaps + [1.0] + [0] * (self.nRxTaps - self.nRxPreTaps - 1)) self.empty_array: Rvec = array([]) # Misc. self.com_rslts: dict[str, Any] = {} self.fom_rslts: dict[str, Any] = {} self.dbg_dict: Optional[dict[str, Any]] = None self.set_status("Ready") def __call__( # pylint: disable=too-many-arguments,too-many-positional-arguments self, do_opt_eq: bool = True, tx_taps: Optional[Rvec] = None, opt_mode: Optional[OptMode] = None, norm_mode: Optional[NormMode] = None, unit_amp: Optional[bool] = None, dbg_dict: Optional[Dict[str, Any]] = None ) -> float: """ Calculate the COM value. Keyword Args: do_opt_eq: Perform optimization of linear equalization when True. Default: True tx_taps: Used when `do_opt_eq` = False. Default: None opt_mode: Optimization mode. Default: None (i.e. - Use `self.opt_mode`.) norm_mode: The tap weight normalization mode to use. Default: None (i.e. - Use `self.norm_mode`.) unit_amp: Enforce unit pulse response amplitude when True. (For comparing `przf()` results to `mmse()` results.) Default: None (i.e. - Use `self.unit_amp`.) dbg_dict: Optional dictionary into which debugging values may be stashed, for later analysis. Default: None Returns: COM: The calculated COM value (dB). Raises: RuntimeError if throughput channel is missing. RuntimeError if EQ optimization fails. """ # Honor any mode overrides. if opt_mode: self.opt_mode = opt_mode if norm_mode: self.norm_mode = norm_mode if unit_amp: self.unit_amp = unit_amp if dbg_dict is not None: self.dbg_dict = dbg_dict self.set_status("Optimizing EQ...") assert self.opt_eq(do_opt_eq=do_opt_eq, tx_taps=tx_taps), RuntimeError( "EQ optimization failed!") self.set_status("Calculating noise...") As, Ani, self.cursor_ix = self.calc_noise( cursor_ix=self.fom_rslts['cursor_ix']) com = 20 * np.log10(As / Ani) self.com_rslts["COM"] = com self.set_status(f"Ready; COM = {com: 5.1f} dB") return com # Instance Properties @property def fb(self) -> float: "Baud. rate (Hz)" return self.com_params.fb * 1e9 @property def ui(self) -> float: "Unit interval (s)" return 1 / self.fb @property def nspui(self) -> int: "Samples per UI" return self.com_params.M @property def times(self) -> Rvec: "System time vector (s); decoupled from system frequency vector!" return self.t @property def fstep(self) -> float: "Frequency step (Hz)" return self.f[1] @property def fmax(self) -> float: "Maximum frequency (Hz)" return self.f[-1] @property def freqs(self) -> Rvec: "System frequency vector (Hz); decoupled from system time vector!" return self.f @property def Xsinc(self) -> Rvec: """Frequency domain sinc(f) corresponding to Rect(ui) in time domain.""" return self._Xsinc @property def Ht(self) -> Cvec: """ Return the voltage transfer function, H(f), associated w/ the Tx risetime, according to (93A-46). """ return self._Ht @property def num_tx_combs(self) -> int: "Return the number of available Tx FFE tap weight combinations." return self._num_tx_combs @property def Hr(self) -> Cvec: """ Return the voltage transfer function, H(f), of the Rx AFE, according to (93A-20). """ return self._Hr @property def Hctf(self) -> Cvec: """ Return the voltage transfer function, H(f), of the Rx CTLE, according to (93A-22). """ return self.calc_Hctf(self.gDC, self.gDC2)
[docs] def calc_Hctf(self, gDC: Optional[float] = None, gDC2: Optional[float] = None) -> Cvec: """ Return the voltage transfer function, H(f), of the Rx CTLE, according to (93A-22). Keyword Args: gDC: CTLE first stage d.c. gain (dB). Default: None gDC2: CTLE second stage d.c. gain (dB). Default: None Notes: 1. The instance's current value(s) for ``gDC`` and ``gDC2`` are used if not provided. (Necessary, to accommodate sweeping when optimizing EQ.) """ if gDC is None: gDC = self.gDC if gDC2 is None: gDC2 = self.gDC2 return calc_Hctle(self.freqs, self.com_params.f_z * 1e9, self.com_params.f_p1 * 1e9, self.com_params.f_p2 * 1e9, self.com_params.f_LF * 1e9, gDC, gDC2)
@property def gamma1_Tx(self) -> float: "Reflection coefficient looking out of the left end of the channel." return self._gamma1[0] @property def gamma2_Tx(self) -> float: "Reflection coefficient looking out of the right end of the channel." return self._gamma1[0] @property def gamma1_Rx(self) -> float: "Reflection coefficient looking out of the left end of the channel." return self._gamma1[1] @property def gamma2_Rx(self) -> float: "Reflection coefficient looking out of the right end of the channel." return self._gamma1[1]
[docs] def sDie(self, isRx: bool) -> rf.Network: "On-die parasitic capacitance/inductance ladder network, including bump." if isRx: ix = 1 else: ix = 0 Cd = self.com_params.C_d[ix] / 1e9 Ls = self.com_params.L_s[ix] / 1e9 R0 = [self.com_params.R_0] * len(Cd) # type: ignore rslt = rf.network.cascade_list( list(map(lambda trip: sDieLadderSegment(self.freqs, trip), zip(R0, Cd, Ls)))) # type: ignore rslt = rslt ** self.sC(self.com_params.C_b[ix] / 1e9) if isRx: rslt.flip() return rslt
@property def sPkgRx(self) -> rf.Network: "Rx package response." return self._sPkgRx[self.zp_sel] @property def sPkgTx(self) -> rf.Network: "Tx package response." return self._sPkgTx[self.zp_sel] @property def sPkgNEXT(self) -> rf.Network: "NEXT package response." return self._sPkgTx[0] # Package modeling
[docs] def add_pkg(self, ntwk: COMNtwk) -> COMChnl: """Add package response to raw channel and pre-calculate H21.""" # Pre-interpolate channel model to system frequency vector, to prevent noisy non-causal artifacts. freqs = self.freqs sChnl = ntwk[0] sChnlInterp = sChnl.extrapolate_to_dc().interpolate( freqs[freqs <= sChnl.f[-1]], kind='cubic', coords='polar', basis='t', assume_sorted=True) new_s = sChnlInterp.s.tolist() # I couldn't make NumPy array padding work correctly here. pad_len = len(freqs) - len(sChnlInterp.f) new_s.extend( [[[sChnlInterp.s11.s[-1, 0, 0], sChnlInterp.s12.s[-1, 0, 0]], [sChnlInterp.s21.s[-1, 0, 0], sChnlInterp.s22.s[-1, 0, 0]]] ] * pad_len) # noqa=E124 new_s = np.array(new_s) sChnlInterp = rf.Network(s=new_s, z0=sChnlInterp.z0[0], f=freqs) ntype = ntwk[1] if ntype == 'NEXT': _ntwk = self.sPkgNEXT ** sChnlInterp ** self.sPkgRx else: _ntwk = self.sPkgTx ** sChnlInterp ** self.sPkgRx return ((_ntwk, ntype), calc_H21(self.freqs, _ntwk, self.gamma1_Tx, self.gamma2_Rx))
# Logging / Debugging
[docs] def set_status(self, status: str) -> None: "Set the GUI status string and print it if we're debugging." self.status_str = status if self.debug: # print(status, flush=True) pass
# General functions
[docs] def sC(self, c: float) -> rf.Network: """ Return the 2-port network corresponding to a shunt capacitance, according to (93A-8). Args: c: Value of shunt capacitance (F). Returns: 2-port network equivalent to shunt capacitance, calculated at given frequencies. """ return sCshunt(self.freqs, c, self.com_params.R_0)
[docs] @cache # pylint: disable=method-cache-max-size-none def Htx(self, tx_taps_ix: int) -> Cvec: """ Return the complex frequency response of the Tx deemphasis filter. Args: tx_taps_ix: Index into the Tx tap weight combinations list. Returns: Complex frequency response of Tx deemphasis filter. """ assert 0 <= tx_taps_ix < self.num_tx_combs, ValueError( f"tx_taps_ix ({tx_taps_ix}) must be an integer in: [0, {self.num_tx_combs})!") return calc_Hffe(self.freqs, 1 / self.fb, array(self._tx_combs[tx_taps_ix]), 3)
[docs] def Hffe_Rx(self, taps: Optional[Rvec] = None) -> Cvec: """ Return the complex frequency response of the Rx FFE. Keyword Args: taps: Tap weights to use for calculation. Default: None (Means "use self.rx_taps".) Returns: Complex frequency response of Rx FFE. Raises: ValueError: If ``taps`` is supplied and the length is wrong. """ if taps is None: taps = self.rx_taps else: assert len(taps) == self.nRxTaps, ValueError( f"If `taps` is given then its length ({len(taps)}) must equal `nRxTaps` ({self.nRxTaps})!") return taps @ self.rx_ffe_phase_matrix
[docs] def H( # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments self, H21: Cvec, tx_ix: int, Hctf: Optional[Cvec] = None, rx_taps: Optional[Rvec] = None, dfe_taps: Optional[Rvec] = None, passive_RxFFE: bool = False ) -> Cvec: """ Return the voltage transfer function, H(f), of a complete COM signal path, according to (93A-19). Args: H21: Voltage transfer function of channel + package. tx_ix: Tx FFE tap weights index. Keyword Args: Hctf: Complex voltage transfer function of CTLE. Default: None (i.e. - Calculate, using ``self.gDC`` & ``self.gDC2``.) rx_taps: Rx FFE tap weights. Default: None (i.e. - Use ``self.rx_taps``.) dfe_taps: Rx DFE tap weights. Default: None (i.e. - Use ``self.dfe_taps``.) passive_RxFFE: Enforce passivity of Rx FFE when True. Default: True Returns: Complex voltage transfer function of complete path. Raises: ValueError: If the length of the given voltage transfer function differs from the length of the system frequency vector. Notes: 1. It is in this processing step that linear EQ is first applied. 2. Any unprovided EQ values are taken from the ``COM`` instance. If you really want to omit a particular EQ component then call with: - ``tx_taps``: [] - ``rx_taps``: [1.0] - ``Hctle``: ones(len(self.freqs)) """ freqs = self.freqs if rx_taps is None: rx_taps = self.rx_taps if dfe_taps is None: dfe_taps = self.dfe_taps assert len(H21) == len(freqs), ValueError( f"Length of `H21` ({len(H21)}) must match length of frequency vector ({len(freqs)})!") Htx = self.Htx(tx_ix) Hr = self.Hr if Hctf is None: Hctf = self.calc_Hctf(self.gDC, self.gDC2) rslt = Htx * H21 * Hr * Hctf nRxTaps = len(rx_taps) if nRxTaps: Hrx = self.Hffe_Rx(rx_taps) if passive_RxFFE: Hrx /= max(np.abs(Hrx)) rslt *= Hrx return rslt
[docs] def pulse_resp(self, H: Cvec) -> Rvec: """ Return the unit pulse response, p(t), corresponding to the given voltage transfer function, H(f), according to (93A-24). Args: H: The voltage transfer function, H(f). Note: Possitive frequency components only, including fN. Returns: The pulse response corresponding to the given voltage transfer function. Raises: ValueError: If the length of the given voltage transfer function differs from that of the system frequency vector. Notes: 1. The pulse response is trimmed to match the length of the system time vector. """ assert len(H) == len(self.freqs), ValueError( f"Length of given H(f) {len(H)} does not match length of f {len(self.freqs)}!") p = np.fft.irfft(self.Xsinc * H) return p[:len(self.times)]
[docs] def gen_pulse_resps( # pylint: disable=too-many-arguments,too-many-positional-arguments self, chnls: Optional[list[COMChnl]] = None, Hctf: Optional[Cvec] = None, tx_ix: Optional[int] = None, rx_taps: Optional[Rvec] = None, dfe_taps: Optional[Rvec] = None, apply_eq: bool = True ) -> list[Rvec]: """ Generate pulse responses for all networks. Keyword Args: chnls: The list of networks to generate pulse responses for. Default: None (i.e. - Use ``self.chnls``.) Hctf: Complex voltage transfer function of CTLE. Default: None (i.e. - Calculate, using ``self.gDC`` & ``self.gDC2``.) tx_ix: Desired Tx tap weights index. Default: None (i.e. - Use ``self.tx_ix``.) rx_taps: Desired Rx FFE tap weights. Default: None (i.e. - Use ``self.rx_taps``.) dfe_taps: Desired Rx DFE tap weights. Default: None (i.e. - Use ``self.dfe_taps``.) apply_eq: Include linear EQ when True; otherwise, exclude it. (Allows for pulse response generation of terminated, but unequalized, channel.) Default: True Returns: list of pulse responses. Notes: 1. Assumes ``self.gDC``, ``self.gDC2``, ``self.tx_taps``, ``self.rx_taps``, and ``self.dfe_taps`` have been set correctly, if the equivalent function parameters have not been provided. 2. To generate pulse responses that include all linear EQ except the Rx FFE/DFE (i.e. - pulse responses suitable for Rx FFE/DFE tap weight optimization, via either ``optimize.przf()`` or ``optimize.mmse()``), set ``rx_taps`` equal to: ``[1.0]`` and ``dfe_taps`` equal to: ``[]``. """ if chnls is None: chnls = self.chnls if tx_ix is None: tx_ix = self.tx_ix if rx_taps is None: rx_taps = self.rx_taps if dfe_taps is None: dfe_taps = self.dfe_taps if Hctf is None: Hctf = self.calc_Hctf(self.gDC, self.gDC2) pulse_resps = [] for (_, ntype), H21 in chnls: if apply_eq: if ntype == 'NEXT': tx_ix = 0 pr = self.pulse_resp(self.H(H21, tx_ix, Hctf=Hctf, rx_taps=rx_taps, dfe_taps=dfe_taps)) else: pr = self.pulse_resp(H21) if ntype == 'THRU': pr *= self.com_params.A_v elif ntype == 'NEXT': pr *= self.com_params.A_ne else: pr *= self.com_params.A_fe pulse_resps.append(pr) return pulse_resps
# pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-statements
[docs] def calc_fom( self, tx_ix: int, Hctf: Cvec, rx_taps: Optional[Rvec] = None, opt_mode: Optional[OptMode] = None, norm_mode: Optional[NormMode] = None, unit_amp: Optional[bool] = None, n_pre: Optional[int] = None ) -> float: """ Calculate the **figure of merit** (FOM), given the existing linear EQ settings. Optimize Rx FFE taps if they aren't specified by caller. Args: tx_ix: Index into the list of Tx tap weight combinations. Hctf: Complex voltage transfer function of CTLE. Keyword Args: rx_taps: Rx FFE tap weight overrides. Default: None (i.e. - Optimize Rx FFE tap weights.) opt_mode: Optimization mode. Default: None (i.e. - Use ``self.opt_mode``.) norm_mode: The tap weight normalization mode to use. Default: None (i.e. - Use ``self.norm_mode``.) unit_amp: Enforce unit pulse response amplitude when True. (For comparing ``optimize.przf()`` results to ``optimize.mmse()`` results.) Default: None (i.e. - Use ``self.unit_amp``.) n_pre: Number of pre-cursor taps to use in calculating ISI. Default: None (i.e. - Use ``self.nRxPreTaps``.) Returns: The resultant figure of merit. Notes: 1. See: IEEE 802.3-2022 93A.1.6. 2. Unlike other member functions of the ``COM`` class, this function *optimizes* the Rx FFE tap weights when they are not provided. """ # Honor any mode overrides. opt_mode = opt_mode or self.opt_mode norm_mode = norm_mode or self.norm_mode if unit_amp is None: unit_amp = self.unit_amp # Copy instance variables. L = self.com_params.L M = self.nspui times = self.times freqs = self.freqs nDFE = len(self.com_params.dfe_min) nRxTaps = self.nRxTaps nRxPreTaps = self.nRxPreTaps rx_taps_min = self.com_params.rx_taps_min rx_taps_max = self.com_params.rx_taps_max bmin = self.com_params.dfe_min bmax = self.com_params.dfe_max tb = 1 / self.fb # Resolve remaining optionals. n_pre = n_pre or nRxPreTaps pulse_resps_preFFE = self.gen_pulse_resps( # Assumes no Rx FFE/DFE. tx_ix=tx_ix, Hctf=Hctf, rx_taps=self.null_rx_ffe, dfe_taps=self.empty_array) match opt_mode: case OptMode.PRZF: # Step a - Pulse response construction. pulse_resps = pulse_resps_preFFE if nRxTaps: # If we have an Rx FFE... if rx_taps is None: # If we received no explicit override of the Rx FFE rx_taps, _, _ = przf( # tap weight values,then optimize them. pulse_resps_preFFE[0], M, nRxTaps, nRxPreTaps, nDFE, rx_taps_min, rx_taps_max, bmin, bmax, norm_mode=norm_mode, unit_amp=unit_amp) pulse_resps = self.gen_pulse_resps( # Regenerate the pulse responses, tx_ix=tx_ix, Hctf=Hctf, rx_taps=rx_taps, # including the Rx FFE. dfe_taps=self.empty_array) else: # Otherwise, pass the signal through unaltered. rx_taps = self.null_rx_ffe # Step b - Cursor identification. vic_pulse_resp = pulse_resps[0] # Note: Includes any Rx FFE, but not DFE. vic_peak_loc = np.argmax(vic_pulse_resp) cursor_ix = loc_curs(vic_pulse_resp, M, self.com_params.dfe_max, self.com_params.dfe_min) curs_uis, curs_ofst = divmod(cursor_ix, M) pr_samps = vic_pulse_resp[curs_ofst::M] if n_pre > curs_uis: pr_samps = np.pad(pr_samps, (n_pre - curs_uis, 0)) else: pr_samps = pr_samps[curs_uis - n_pre:] # At this point, `pr_samps` contains one sample per UI w/ exactly `n_pre` pre-cursor samples. # Step c - As. vic_curs_val = pr_samps[n_pre] As = self.com_params.RLM * vic_curs_val / (L - 1) # Step d - Tx noise. varX = (L**2 - 1) / (3 * (L - 1)**2) # (93A-29) varTx = vic_curs_val**2 * pow(10, -self.com_params.SNR_TX / 10) # (93A-30) # Step e - ISI. hISI = pr_samps.copy() hISI[n_pre] = 0 # No ISI at cursor. dfe_slice = slice(n_pre + 1, n_pre + 1 + nDFE) dfe_tap_weights = np.maximum( # (93A-26) self.com_params.dfe_min, np.minimum( self.com_params.dfe_max, (hISI[dfe_slice] / vic_curs_val))) hISI[dfe_slice] -= dfe_tap_weights * vic_curs_val # (93A-27) varISI = varX * (hISI**2).sum() # (93A-31) # Step f - Jitter noise. hJ = calc_hJ(vic_pulse_resp, As, cursor_ix, M) varJ = (self.com_params.A_DD**2 + self.com_params.sigma_Rj**2) * varX * (hJ**2).sum() # (93A-32) # Step g - Crosstalk. varXT = 0. for pulse_resp in pulse_resps[1:]: # (93A-34) varXT += max((filt_pr_samps(pulse_resp[m::M], As)**2).sum() for m in range(M)) # noqa=E127,E501 (93A-33) varXT *= varX # Step h - Spectral noise. df = freqs[1] varN = (self.com_params.eta_0 / 1e9) * (abs(self.Hr * Hctf)**2).sum() * df # (93A-35) # Step i - FOM calculation. fom = 10 * np.log10(As**2 / (varTx + varISI + varJ + varXT + varN)) # (93A-36) case OptMode.MMSE: theNoiseCalc = NoiseCalc( L, tb, 0, times, pulse_resps_preFFE[0], pulse_resps_preFFE[1:], freqs, self.Ht, self.chnls[0][1], self.Hr, Hctf, self.com_params.eta_0, self.com_params.A_v, self.com_params.SNR_TX, self.com_params.A_DD, self.com_params.sigma_Rj) rslt = mmse(theNoiseCalc, nRxTaps, nRxPreTaps, len(self.com_params.dfe_min), self.com_params.RLM, self.com_params.L, bmin, bmax, rx_taps_min, rx_taps_max, norm_mode=norm_mode) fom = rslt["fom"] rx_taps = rslt["rx_taps"] dfe_tap_weights = rslt["dfe_tap_weights"] pr_samps = rslt["h"] vic_pulse_resp = rslt["vic_pulse_resp"] # Note: Does not include Rx FFE/DFE! vic_peak_loc = np.argmax(vic_pulse_resp) cursor_ix = rslt["cursor_ix"] As = vic_pulse_resp[cursor_ix] varTx = rslt["varTx"] varISI = rslt["varISI"] varJ = rslt["varJ"] varXT = rslt["varXT"] varN = rslt["varN"] self.fom_rslts['mse'] = rslt['mse'] if 'mse' in rslt else None self.theNoiseCalc = theNoiseCalc # pylint: disable=attribute-defined-outside-init self.mmse_rslt = rslt # pylint: disable=attribute-defined-outside-init case _: raise ValueError(f"Unrecognized optimization mode: {opt_mode}, requested!") # Stash our calculation results. self.fom_rslts['pulse_resps'] = pulse_resps_preFFE self.fom_rslts['vic_pulse_resp'] = vic_pulse_resp self.fom_rslts['vic_peak_loc'] = vic_peak_loc self.fom_rslts['cursor_ix'] = cursor_ix self.fom_rslts['As'] = As self.fom_rslts['varTx'] = varTx self.fom_rslts['varISI'] = varISI self.fom_rslts['varJ'] = varJ self.fom_rslts['varXT'] = varXT self.fom_rslts['varN'] = varN self.fom_rslts['rx_taps'] = rx_taps self.fom_rslts['dfe_tap_weights'] = dfe_tap_weights self.fom_rslts['pr_samps'] = pr_samps return fom
[docs] def opt_eq( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-statements self, do_opt_eq: bool = True, tx_taps: Optional[Rvec] = None, opt_mode: Optional[OptMode] = None, norm_mode: Optional[NormMode] = None, unit_amp: Optional[bool] = None ) -> bool: """ Find and set the optimum values for the linear equalization parameters: ``c[n]``, ``gDC``, ``gDC2``, and ``w[n]`` as per IEEE 802.3-22 93A.1.6 (or, [1] slide 11 if MMSE has been chosen). Keyword Args: do_opt_eq: Perform optimization of linear EQ when True. Default: True tx_taps: Used when ``do_opt_eq`` = False. Default: None opt_mode: Optimization mode. Default: None (i.e. - Use ``self.opt_mode``.) norm_mode: The tap weight normalization mode to use. Default: None (i.e. - Use ``self.norm_mode``.) unit_amp: Enforce unit pulse response amplitude when True. (For comparing ``przf()`` results to ``mmse()`` results.) Default: None (i.e. - Use ``self.unit_amp``.) Returns: True if no errors encountered; False otherwise. Notes: 1. The found optimum equalization values are set for the instance. """ if do_opt_eq: # Honor any mode overrides. opt_mode = opt_mode or self.opt_mode norm_mode = norm_mode or self.norm_mode if unit_amp is None: # Don't try the more Pythonic syntax above; it doesn't work for Boolean values. unit_amp = self.unit_amp # Run the nested optimization loops. fom_max = -1000.0 fom_max_changed = False for _gDC2 in self.com_params.g_DC2: for _gDC in self.com_params.g_DC: _Hctle = self.calc_Hctf(_gDC, _gDC2) for _tx_ix in range(self.num_tx_combs): fom = self.calc_fom( _tx_ix, _Hctle, opt_mode=opt_mode, norm_mode=norm_mode, unit_amp=unit_amp) if fom > fom_max: fom_max_changed = True fom_max = fom gDC2_best = _gDC2 gDC_best = _gDC tx_ix_best = _tx_ix rx_taps_best = self.fom_rslts['rx_taps'] pr_samps_best = self.fom_rslts['pr_samps'] dfe_tap_weights_best = self.fom_rslts['dfe_tap_weights'] cursor_ix_best = self.fom_rslts['cursor_ix'] As_best = self.fom_rslts['As'] varTx_best = self.fom_rslts['varTx'] varISI_best = self.fom_rslts['varISI'] varJ_best = self.fom_rslts['varJ'] varXT_best = self.fom_rslts['varXT'] varN_best = self.fom_rslts['varN'] vic_pulse_resp = self.fom_rslts['vic_pulse_resp'] mse_best = self.fom_rslts['mse'] if 'mse' in self.fom_rslts else 0 else: assert tx_taps, RuntimeError("You must define `tx_taps` when setting `do_opt_eq` False!") fom = self.calc_fom(0, self.calc_Hctf(self.gDC, self.gDC2)) fom_max = fom fom_max_changed = True gDC2_best = self.gDC2 gDC_best = self.gDC tx_ix_best = 0 rx_taps_best = self.rx_taps dfe_tap_weights_best = self.fom_rslts['dfe_tap_weights'] cursor_ix_best = self.fom_rslts['cursor_ix'] As_best = self.fom_rslts['As'] varTx_best = self.fom_rslts['varTx'] varISI_best = self.fom_rslts['varISI'] varJ_best = self.fom_rslts['varJ'] varXT_best = self.fom_rslts['varXT'] varN_best = self.fom_rslts['varN'] vic_pulse_resp = self.fom_rslts['vic_pulse_resp'] mse_best = 0 # Check for error and save the best results. if not fom_max_changed: return False # Flags the caller that the following settings have NOT been made. # Note the normalization of the Rx FFE tap weights, to produce a unit amplitude cursor tap. # This is not dictated by the spec., but is what the MATLAB code does. self.gDC2 = gDC2_best # pylint: disable=possibly-used-before-assignment self.gDC = gDC_best # pylint: disable=possibly-used-before-assignment self.tx_ix = tx_ix_best # pylint: disable=possibly-used-before-assignment self.rx_taps = rx_taps_best / rx_taps_best[self.nRxPreTaps] # pylint: disable=possibly-used-before-assignment self.dfe_taps = dfe_tap_weights_best # pylint: disable=possibly-used-before-assignment self.fom_rslts['FOM'] = fom_max # pylint: disable=possibly-used-before-assignment self.fom_rslts['mse'] = mse_best # pylint: disable=possibly-used-before-assignment self.fom_rslts['pr_samps'] = pr_samps_best # pylint: disable=possibly-used-before-assignment self.fom_rslts['cursor_ix'] = cursor_ix_best # pylint: disable=possibly-used-before-assignment self.fom_rslts['As'] = As_best # pylint: disable=possibly-used-before-assignment self.fom_rslts['sigma_ISI'] = np.sqrt(varISI_best) # pylint: disable=possibly-used-before-assignment self.fom_rslts['sigma_J'] = np.sqrt(varJ_best) # pylint: disable=possibly-used-before-assignment self.fom_rslts['sigma_XT'] = np.sqrt(varXT_best) # pylint: disable=possibly-used-before-assignment self.fom_rslts['sigma_Tx'] = np.sqrt(varTx_best) # pylint: disable=possibly-used-before-assignment self.fom_rslts['sigma_N'] = np.sqrt(varN_best) # pylint: disable=possibly-used-before-assignment self.fom_rslts['vic_pulse_resp'] = vic_pulse_resp # pylint: disable=possibly-used-before-assignment return True
# pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-statements
[docs] def calc_noise( self, cursor_ix: Optional[int] = None, ) -> tuple[float, float, int]: """ Calculate the interference and noise for COM. Keyword Args: cursor_ix: An optional predetermined cursor index, to be used instead of our own estimate. (In support of MMSE.) Default: None Returns: A triplet containing - signal amplitude (V) - noise + interference amplitude (V) - cursor location within victim pulse response vector Notes: 1. Assumes the following instance variables have been set optimally: - ``gDC`` - ``gDC2`` - ``tx_ix`` - ``rx_taps`` (This assumption is embedded into the ``gen_pulse_resps()`` function.) 2. Fills in the ``com_results`` dictionary w/ various useful values for debugging. """ # Copy instance variables. L = self.com_params.L M = self.nspui RLM = self.com_params.RLM freqs = self.freqs nDFE = len(self.com_params.dfe_min) nRxTaps = self.nRxTaps self.set_status("Calculating COM...") pulse_resps = self.gen_pulse_resps(dfe_taps=self.empty_array) # DFE taps are included explicitly, below. vic_pulse_resp = pulse_resps[0] if cursor_ix is None: cursor_ix = loc_curs(vic_pulse_resp, self.nspui, self.com_params.dfe_max, self.com_params.dfe_min) curs_uis, curs_ofst = divmod(cursor_ix, M) vic_curs_val = vic_pulse_resp[cursor_ix] # Missing `2*` is also missing from `Ani` definition; so, they cancel in COM calc. As = RLM * vic_curs_val / (L - 1) ymax = 1.1 * As npts = 2 * min(int(ymax / 0.00001), 1_000) + 1 # Note 1 of 93A.1.7.1; MUST BE ODD! y = np.linspace(-ymax, ymax, npts) ystep = 2 * ymax / (npts - 1) pDelta = np.zeros(npts) pDelta[npts // 2] = 1.0 # Sec. 93A.1.7.2 varX = (L**2 - 1) / (3 * (L - 1)**2) # (93A-29) df = freqs[1] - freqs[0] if nRxTaps: Hrx = self.Hffe_Rx() else: Hrx = np.ones(len(freqs)) varN = self.com_params.eta_0 * ( abs(self.Hr[1:] * self.Hctf[1:] * Hrx[1:])**2).sum() * (df / 1e9) # (93A-35) + Hffe if self.opt_mode == OptMode.PRZF: varTx = vic_curs_val**2 * pow(10, -self.com_params.SNR_TX / 10) # (93A-30) else: Stx = self.theNoiseCalc.Stn(Hrx=Hrx) varTx = sum(Stx) * df # (178A-17) hJ = calc_hJ(vic_pulse_resp, As, cursor_ix, self.nspui) _, pJ = delta_pmf(filt_pr_samps(self.com_params.A_DD * hJ, ymax), L=L, y=y) # (93A-40) varJ = self.com_params.sigma_Rj**2 * varX * (hJ**2).sum() # (93A-31) varG = varTx + varJ + varN # (93A-41) pG = np.exp(-y**2 / (2 * varG)) / np.sqrt(TWOPI * varG) * ystep # (93A-42), but converted to PMF. pN = np.convolve(pG, pJ, mode='same') # (93A-43) pN /= pN.sum() # Enforce a PMF. # Sec. 93A.1.7.3 self.set_status("Sec. 93A.1.7.3") # - ISI (Inconsistent w/ IEEE 802.3-22, but consistent w/ v2.60 of MATLAB code.) n_pre = min(5, curs_uis) # Sample every M points, such that we include our identified cursor sample. isi_sample_slice = slice(curs_ofst, len(vic_pulse_resp), M) isi_select_slice = slice(curs_uis - n_pre, curs_uis + 2048) tISI = self.times[isi_sample_slice][isi_select_slice] hISI = vic_pulse_resp[isi_sample_slice][isi_select_slice].copy() hISI[n_pre] = 0 # No ISI at cursor. dfe_slice = slice(n_pre + 1, n_pre + 1 + nDFE) dfe_tap_weights = np.maximum( # (93A-26) self.com_params.dfe_min, np.minimum( self.com_params.dfe_max, (hISI[dfe_slice] / vic_curs_val))) hISI[dfe_slice] -= dfe_tap_weights * vic_curs_val # (93A-27) varISI = varX * (hISI**2).sum() # (93A-31) _, pISI = delta_pmf(filt_pr_samps(hISI, ymax), L=L, y=y) # (93A-40) # - Crosstalk pXT = pDelta for pulse_resp in pulse_resps[1:]: # (93A-44) i = np.argmax([(pulse_resp[m::M]**2).sum() for m in range(M)]) # (93A-33) samps = pulse_resp[i::M] # [isi_select_slice] _, pk = delta_pmf(filt_pr_samps(samps, ymax), L=L, y=y) # , dbg_dict=dbg_dict) pXT = np.convolve(pXT, pk, mode='same') pXT /= pXT.sum() # Enforce a PMF. varXT = sum(_y**2 * p for (_y, p) in zip(y, pXT)) # Final calculation py = np.convolve(np.convolve(pISI, pN, mode='same'), pXT, mode='same') # (93A-45) Py = np.cumsum(py) Py /= Py[-1] # Enforce cumulative probability distribution. Ani = -y[np.where(Py >= self.com_params.DER_0)[0][0]] # Store some results. self.com_rslts['As'] = As self.com_rslts['Ani'] = Ani self.com_rslts['pulse_resps'] = pulse_resps self.com_rslts['cursor_ix'] = cursor_ix self.com_rslts['sigma_Tx'] = np.sqrt(varTx) self.com_rslts['sigma_G'] = np.sqrt(varG) self.com_rslts['sigma_N'] = np.sqrt(varN) self.com_rslts['sigma_J'] = np.sqrt(varJ) self.com_rslts['sigma_ISI'] = np.sqrt(varISI) self.com_rslts['sigma_XT'] = np.sqrt(varXT) self.com_rslts['tISI'] = tISI self.com_rslts['hISI'] = hISI self.com_rslts['pG'] = pG self.com_rslts['pN'] = pN self.com_rslts['pJ'] = pJ self.com_rslts['pISI'] = pISI self.com_rslts['pXT'] = pXT self.com_rslts['py'] = py self.com_rslts['Py'] = Py self.com_rslts['y'] = y self.com_rslts['dfe_taps'] = dfe_tap_weights return (As, Ani, cursor_ix)
about_str = """ <H2><em>PyChOpMarg</em> - A Python implementation of COM, as per IEEE 802.3-22 Annex 93A/178A.</H2>\n <strong>By:</strong> David Banas <capn.freako@gmail.com><p>\n <strong>On:</strong> January 20, 2025<p>\n <strong>At:</strong> v3.1.0\n <H3>Useful Links</H3>\n (You'll probably need to: right click, select <em>Copy link address</em>, and paste into your browser.) <UL>\n <LI><a href="https://github.com/capn-freako/PyChOpMarg"><em>GitHub</em> Home</a> <LI><a href="https://pypi.org/project/PyChOpMarg/"><em>PyPi</em> Home</a> <LI><a href="https://readthedocs.org/projects/pychopmarg/"><em>Read the Docs</em> Home</a> </UL> """
def run_com( # pylint: disable=too-many-arguments,too-many-positional-arguments chnl_sets: list[tuple[ChnlGrpName, ChnlSet]], com_params: COMParams, opt_mode: OptMode = OptMode.MMSE, norm_mode: NormMode = NormMode.P8023dj, unit_amp: bool = True, dbg_dict: Optional[Dict[str, Any]] = None ) -> dict[ChnlGrpName, dict[ChnlSetName, COM]]: """ Run COM on a list of grouped channel sets. Args: chnl_sets: List of pairs, each consisting of: - ch_grp_name: The group name for this list of channel sets. - ch_sets: List of channel sets to run. params: The COM configuration parameters to use. Keyword Args: opt_mode: The optimization mode desired. Default: OptMode.MMSE norm_mode: The normalization mode desired for Rx FFE tap weights. Default: NormMode.P8023dj dbg_dict: Optional dictionary into which debugging values may be stashed, for later analysis. Default: None Returns: 2D dictionary indexed by channel group name, then by channel set name, containing the completed COM objects. """ theCOMs: dict[ChnlGrpName, dict[ChnlSetName, COM]] = {} for grp, ch_set in chnl_sets: lbl = ch_set['THRU'][0].stem print(f"{grp} : {lbl}") if dbg_dict is not None: theCOM = COM(com_params, ch_set, debug=True) else: theCOM = COM(com_params, ch_set) # Calling the object calculates the COM value, as well as many other intermediate results. theCOM(opt_mode=opt_mode, norm_mode=norm_mode, unit_amp=unit_amp, dbg_dict=dbg_dict) if grp in theCOMs: theCOMs[grp].update({ch_set['THRU'][0].stem: theCOM}) else: theCOMs.update({grp: {ch_set['THRU'][0].stem: theCOM}}) return theCOMs if __name__ == "__main__": # from pychopmarg.cli import cli # cli() print("Sorry, the PyChOpMarg package is currently only usable as a library.") print("It's GUI is currently broken.")