Source code for pychopmarg.com

"""
The Channel Operating Margin (COM) model, as per IEEE 802.3-22 Annex 93A.

Original author: David Banas <capn.freako@gmail.com>

Original date:   February 29, 2024

Copyright (c) 2024 David Banas; all rights reserved World wide.

Notes:
    1. Throughout this file, equation numbers refer to Annex 93A of the IEEE 802.3-22 standard.

ToDo:
    1. Provide type hints for imports.
"""

import numpy as np  # type: ignore
import skrf as rf  # type: ignore

from chaco.api import (
    ArrayPlotData,
    GridPlotContainer,
    Plot,
)  # type: ignore
from chaco.tools.api import ZoomTool
from scipy.interpolate import interp1d
from traits.api import (
    Array,
    cached_property,
    Enum,
    File,
    Float,
    HasTraits,
    Int,
    List,
    Property,
    Str,
)  # type: ignore
from typing import Any, Optional, TypeVar

from pychopmarg.common import Rvec, Cvec, COMParams, PI, TWOPI
from pychopmarg.utility import import_s32p, sdd_21

# Globals
# These are used by `calc_Hffe()`, to minimize the size of its cache.
# They are initialized by `COM.__init__()`.
gFreqs: Rvec = None  # type: ignore
gFb: float = None  # type: ignore
gC0min: float = None  # type: ignore
gNtaps: int = None  # type: ignore
PLOT_SPACING = 20

T = TypeVar('T', Any, Any)


def from_dB(x: float) -> float:
    """Convert from (dB) to real, assuming square law applies."""
    return pow(10, x / 20)


[docs] def all_combs(xss: list[list[T]]) -> list[list[T]]: """ Generate all combinations of input. Args: xss([[T]]): The lists of candidates for each position in the final output. Returns: All possible combinations of input lists. """ if not xss: return [[]] head, *tail = xss yss = all_combs(tail) return [[x] + ys for x in head for ys in yss]
[docs] def calc_Hffe(tap_weights: list[float], n_post: int) -> Cvec: """ Calculate the voltage transfer function, H(f), for the Tx FFE, according to (93A-21). Args: tap_weights: The vector of filter tap weights, excluding the cursor tap. n_post: The number of post-cursor taps. Returns: The complex voltage transfer function, H(f), for the Tx FFE. Raises: RuntimeError: If the global variables above haven't been initialized. ValueError: If the length of the given tap weight vector is incorrect. Notes: 1. This function has been (awkwardly) pulled outside of the `COM` class and made to use global variables, strictly for performance reasons. (Note that `@cached_property` decorated instance functions of `HasTraits` subclasses are not actually memoized, like `@cache` decorated ordinary functions are.) (It is used in the innermost layer of the nested loop structure used to find the optimal EQ solution. And its input argument is repeated often.) (See the `opt_eq()` method of the `COM` class.) 2. Currently, a single post-cursor tap is assumed. """ assert len(gFreqs) and gFb and gC0min and gNtaps, RuntimeError( f"Called before global variables were initialized!\n\tgFreqs: \ {gFreqs}, gFb: {gFb}, gC0min: {gC0min}, gNtaps: {gNtaps}") assert len(tap_weights) == gNtaps, ValueError( f"Length of given tap weight vector is incorrect!\n\tExpected: {gNtaps}; got: {len(tap_weights)}") c0 = 1 - sum(list(map(abs, tap_weights))) if c0 < gC0min: return np.ones(len(gFreqs)) else: cs = tap_weights cs.insert(-n_post, c0) return sum(list(map(lambda n_c: n_c[1] * np.exp(-1j * TWOPI * n_c[0] * gFreqs / gFb), enumerate(cs))))
def print_taps(ws: list[float]) -> str: """Return formatted tap weight values.""" n_ws = len(ws) if n_ws == 0: return "" res = f"{ws[0]:5.2f}" if n_ws > 1: if n_ws > 8: for w in ws[1:8]: res += f" {w:5.2f}" res += f"\n{ws[8]:5.2f}" for w in ws[9:]: res += f" {w:5.2f}" else: for w in ws[1:]: res += f" {w:5.2f}" return res
[docs] class COM(HasTraits): """ Encoding of the IEEE 802.3-22 Annex 93A "Channel Operating Margin" (COM) specification, as a Python class making use of the Enthought Traits/UI machinery, for both calculation efficiency and easy GUI display. """ status_str = Str("Ready") # Independent variable definitions (Defaults from IEEE 802.3by.) # Units are SI, except for: (dB), `zp`, and `eta0`. # - System time/frequency vectors (decoupled!) FB = 25e9 * 66. / 64. fb = Float(0) # Baud. rate. nspui = Int(32) # samples per UI. tmax = Float(10e-9) # system time vector maximum. fstep = Float(10e6) # system frequency vector step. fmax = Float(40e9) # system frequency vector maximum. # - Tx Drive Settings Av = Float(0.4) # victim drive voltage (V). Afe = Float(0.4) # FEXT drive voltage (V). Ane = Float(0.6) # NEXT drive voltage (V). L = Int(2) # number of output levels. RLM = Float(1.0) # ratio level mismatch. # - Linear EQ nTxTaps = 6 tx_n_post = Int(3) tx_taps_pos = Array(shape=(1, (1, nTxTaps)), dtype=int, value=[[-3, -2, -1, 1, 2, 3],]) tx_taps_min = Array(shape=(1, (1, nTxTaps)), dtype=float, value=[[0., 0., 0., 0., 0., 0.],]) tx_taps_max = Array(shape=(1, (1, nTxTaps)), dtype=float, value=[[0., 0., 0., 0., 0., 0.],]) tx_taps_step = Array(shape=(1, (1, nTxTaps)), dtype=float, value=[[0., 0., 0.02, 0.02, 0., 0.],]) tx_taps = Array(shape=(1, (1, nTxTaps)), dtype=float, value=[[0.] * nTxTaps,]) # Tx FFE tap weights. c0_min = Float(0) # minimum allowed Tx FFE main tap value. fr = Float(0.75) # AFE corner frequency (fb) gDC_vals = List([-x for x in range(13)]) # D.C. gain of Rx CTLE first stage (dB). gDC = Enum(0, values="gDC_vals") gDC2_vals = List([0.,]) # D.C. gain of Rx CTLE second stage (dB). gDC2 = Enum(0, values="gDC2_vals") fz = Float(FB / 4.) # CTLE zero frequency. fp1 = Float(FB / 4.) # CTLE first pole frequency. fp2 = Float(FB) # CTLE second pole frequency. fLF = Float(1e6) # CTLE low-f corner frequency. # - DFE N_DFE = 14 # number of DFE taps. nDFE = Int(N_DFE) # number of DFE taps. bmax = List([1.0] * N_DFE) # DFE maximum tap values. bmin = List([-1.0] * N_DFE) # DFE minimum tap values. # - Package & Die Modeling R0 = Float(50.0) # system reference impedance. Rd = Float(55.0) # on-die termination impedance. Cd = Float(0.25e-12) # parasitic die capacitance. Cb = Float(0.00e-12) # parasitic bump capacitance. Cp = Float(0.18e-12) # parasitic ball capacitance. zp_vals = List([12, 30]) # package transmission line lengths (mm). zp = Enum(12, values="zp_vals") zc = Float(78.2) # package transmission line characteristic impedance. # - Noise & DER sigma_Rj = Float(0.01) # random jitter standard deviation (ui). Add = Float(0.05) # deterministic jitter amplitude (ui). eta0 = Float(5.2e-8) # spectral noise density (V^2/GHz). TxSNR = Float(27) # Tx signal-to-noise ratio (dB). DER0 = Float(1e-5) # detector error rate threshold. # - Channel file(s) chnl_s32p = File("", entries=5, filter=["*.s32p"], exists=False) chnl_s4p_thru = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_fext1 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_fext2 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_fext3 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_next1 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_next2 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_next3 = File("", entries=5, filter=["*.s4p"], exists=True) chnl_s4p_next4 = File("", entries=5, filter=["*.s4p"], exists=True) vic_chnl_ix = Int(1) # - Results # -- COM com = Float(0.0) com_As = Float(0.0) com_cursor_ix = Int(0) com_sigma_Tx = Float(0.0) com_sigma_G = Float(0.0) com_sigma_N = Float(0.0) com_dfe_taps = Str(print_taps([0.0] * N_DFE)) # -- FOM fom = Float(0.0) fom_As = Float(0.0) Ani = Float(0.0) fom_cursor_ix = Int(0) sigma_ISI = Float(0.0) sigma_J = Float(0.0) sigma_XT = Float(0.0) sigma_Tx = Float(0.0) sigma_N = Float(0.0) fom_tx_taps = Str(print_taps([0.0] * nTxTaps)) fom_dfe_taps = Str(print_taps([0.0] * N_DFE)) # Plots (plot containers, actually). plotdata = ArrayPlotData() plotdata.set_data("t_ns", np.linspace(0., 10., 100)) plotdata.set_data("pulse_raw", np.zeros(100)) plotdata.set_data("pulse_pkg", np.zeros(100)) plotdata.set_data("pulse_eq", np.zeros(100)) # - pulse responses. plot1 = Plot(plotdata, padding_left=75) plot1.plot(("t_ns", "pulse_raw"), name="Raw", type="line", color="red") plot1.plot(("t_ns", "pulse_pkg"), name="w/ Pkg", type="line", color="green") plot1.plot(("t_ns", "pulse_eq"), name="w/ EQ", type="line", color="blue") plot1.title = "Pulse Responses" plot1.index_axis.title = "Time (ns)" plot1.value_axis.title = "p(t) (mV)" plot1.legend.visible = True plot1.legend.align = "ur" zoom1 = ZoomTool(plot1, tool_mode="range", axis="index", always_on=False) plot1.overlays.append(zoom1) cont1 = GridPlotContainer(shape=(1, 1), spacing=(PLOT_SPACING, PLOT_SPACING)) cont1.add(plot1) about_str = """ <H2><em>PyChOpMarg</em> - A Python implementation of COM, as per IEEE 802.3-22 Annex 93A.</H2>\n <strong>By:</strong> David Banas <capn.freako@gmail.com><p>\n <strong>On:</strong> August 12, 2024<p>\n <strong>At:</strong> v1.1.3\n <H3>Useful Links</H3>\n (You'll probably need to: right click, select <em>Copy link address</em>, and paste into your browser.) <UL>\n <LI><a href="https://github.com/capn-freako/PyChOpMarg"><em>GitHub</em> Home</a> <LI><a href="https://pypi.org/project/PyChOpMarg/"><em>PyPi</em> Home</a> <LI><a href="https://readthedocs.org/projects/pychopmarg/"><em>Read the Docs</em> Home</a> </UL> """ def _fb_changed(self): "Keep global version in sync." global gFb gFb = self.fb def _c0_min_changed(self): "Keep global version in sync." global gC0min gC0min = self.c0_min def _tx_taps_min_changed(self): "Keep global version in sync." global gNtaps gNtaps = len(self.tx_taps_min.flatten()) # Dependent variable definitions # - Unit interval (s). ui = Property(Float, depends_on=["fb"]) @cached_property def _get_ui(self): """Unit interval (s).""" return 1 / self.fb # - system time vector; decoupled from system frequency vector! times = Property(Array, depends_on=["fb", "nspui", "tmax"]) @cached_property def _get_times(self): """System times (s).""" tstep = self.ui / self.nspui rslt = np.arange(0, self.tmax + tstep, tstep) self.plotdata.set_data("t_ns", rslt * 1e9) return rslt # - system frequency vector; decoupled from system time vector! freqs = Property(Array, depends_on=["fstep", "fmax"]) @cached_property def _get_freqs(self): """System frequencies (Hz).""" global gFreqs freqs = np.arange(0, self.fmax + self.fstep, self.fstep) gFreqs = freqs return freqs # - `irfft()` time vector; decoupled from system time vector! t_irfft = Property(Array, depends_on=["freqs"]) @cached_property def _get_t_irfft(self): """`irfft()` result time index (s).""" Ts = 0.5 / self.fmax # Sample period satisfying Nyquist criteria. return np.array([n * Ts for n in range(2 * (len(self.freqs) - 1))]) # - Rect(ui) in the frequency domain. Xsinc = Property(Array, depends_on=["ui", "freqs"]) @cached_property def _get_Xsinc(self): """Frequency domain sinc(f) corresponding to Rect(ui).""" ui = self.ui Ts = self.t_irfft[1] w = int(ui / Ts) return w * np.sinc(ui * self.freqs) # - Rx AFE voltage transfer function. Hr = Property(Array, depends_on=['freqs']) @cached_property def _get_Hr(self): """ Return the voltage transfer function, H(f), of the Rx AFE, according to (93A-20). """ f = self.freqs / (self.fr * self.fb) return 1 / (1 - 3.414214 * f**2 + f**4 + 2.613126j * (f - f**3)) # - Rx CTLE voltage transfer function. Hctf = Property(Array, depends_on=['freqs', 'gDC', 'gDC2', 'fz', 'fp1', 'fp2', 'fLF']) @cached_property def _get_Hctf(self): return self.calc_Hctf(self.gDC, self.gDC2)
[docs] def calc_Hctf(self, gDC: Optional[float] = None, gDC2: Optional[float] = None) -> Cvec: """ Return the voltage transfer function, H(f), of the Rx CTLE, according to (93A-22). Keyword Args: gDC: CTLE first stage d.c. gain (dB). Default: None gDC2: CTLE second stage d.c. gain (dB). Default: None Notes: 1. The instance's current value(s) for `gDC` and `gDC2` are used if not provided. (Necessary, to accommodate sweeping when optimizing EQ.) """ if gDC is None: gDC = self.gDC if gDC2 is None: gDC2 = self.gDC2 g1 = from_dB(gDC) g2 = from_dB(gDC2) f = self.freqs fz = self.fz fp1 = self.fp1 fp2 = self.fp2 fLF = self.fLF num = (g1 + 1j * f / fz) * (g2 + 1j * f / fLF) den = (1 + 1j * f / fp1) * (1 + 1j * f / fp2) * (1 + 1j * f / fLF) return num / den
# - All possible Tx tap weight combinations. tx_combs = Property(List, depends_on=['tx_taps_min', 'tx_taps_max', 'tx_taps_step']) @cached_property def _get_tx_combs(self): """ All possible Tx tap weight combinations. """ trips = list(zip(self.tx_taps_min.flatten(), self.tx_taps_max.flatten(), self.tx_taps_step.flatten())) ranges = [] for trip in trips: if trip[2]: # non-zero step? ranges.append(list(np.arange(trip[0], trip[1] + trip[2], trip[2]))) else: ranges.append([0.0]) return all_combs(ranges) # - Reflection coefficients gamma1 = Property(Float, depends_on=['Rd', 'R0']) gamma2 = Property(Float, depends_on=['gamma1']) @cached_property def _get_gamma1(self): """ Reflection coefficient looking out of the left end of the channel. """ Rd = self.Rd R0 = self.R0 return (Rd - R0) / (Rd + R0) @cached_property def _get_gamma2(self): """ Reflection coefficient looking out of the right end of the channel. """ return self.gamma1 # - Package response sPkgRx = Property(depends_on=['zc', 'R0', 'zp', 'freqs', 'Cd', 'Cp']) # noqa E221 sPkgTx = Property(depends_on=['zc', 'R0', 'zp', 'freqs', 'Cd', 'Cp']) # noqa E221 sPkgNEXT = Property(depends_on=['zc', 'R0', 'zp', 'freqs', 'Cd', 'Cp']) # noqa E221 sZp = Property(depends_on=['zc', 'R0', 'zp', 'freqs']) # noqa E221 sZpNEXT = Property(depends_on=['zc', 'R0', 'zp', 'freqs']) # noqa E221 @cached_property def _get_sPkgRx(self) -> rf.Network: """ Rx package response. """ return self.sC(self.Cp) ** self.sZp ** self.sC(self.Cd) @cached_property def _get_sPkgTx(self) -> rf.Network: """ Tx package response. """ return self.sC(self.Cd) ** self.sZp ** self.sC(self.Cp) @cached_property def _get_sPkgNEXT(self) -> rf.Network: """ NEXT package response. """ return self.sC(self.Cd) ** self.sZpNEXT ** self.sC(self.Cp) @cached_property def _get_sZp(self) -> rf.Network: return self.calc_sZp() @cached_property def _get_sZpNEXT(self) -> rf.Network: return self.calc_sZp(NEXT=True)
[docs] def calc_sZp(self, NEXT: bool = False) -> rf.Network: """ Return the 2-port network corresponding to a package transmission line, according to (93A-9:14). Keyword Args: NEXT: Use first package T-line length option when True. Default: False Returns: 2-port network equivalent to package transmission line. """ zc = self.zc r0 = self.R0 if NEXT: zp = self.zp_vals[0] else: zp = self.zp f_GHz = self.freqs / 1e9 # noqa E221 a1 = 1.734e-3 # sqrt(ns)/mm # noqa E221 a2 = 1.455e-4 # ns/mm # noqa E221 tau = 6.141e-3 # ns/mm # noqa E221 rho = (zc - 2 * r0) / (zc + 2 * r0) # noqa E221 gamma0 = 0 # 1/mm gamma1 = a1 * (1 + 1j) def gamma2(f: float) -> complex: "f in GHz!" return a2 * (1 - 1j * (2 / PI) * np.log(f)) + 1j * TWOPI * tau def gamma(f: float) -> complex: "Return complex propagation coefficient at frequency f (GHz)." if f == 0: return gamma0 else: return gamma0 + gamma1 * np.sqrt(f) + gamma2(f) * f g = np.array(list(map(gamma, f_GHz))) s11 = s22 = rho * (1 - np.exp(-g * 2 * zp)) / (1 - rho**2 * np.exp(-g * 2 * zp)) s21 = s12 = (1 - rho**2) * np.exp(-g * zp) / (1 - rho**2 * np.exp(-g * 2 * zp)) s2p = np.array( [[[_s11, _s12], [_s21, _s22]] for _s11, _s12, _s21, _s22 in zip(s11, s12, s21, s22)]) return rf.Network(s=s2p, f=self.freqs, z0=[2 * r0, 2 * r0])
# - Channels
[docs] def get_chnls(self) -> list[tuple[rf.Network, str]]: """Import all channels from Touchstone file(s).""" if self.chnl_s32p: return self.get_chnls_s32p_wPkg() else: return self.get_chnls_s4p_wPkg()
[docs] def get_chnls_s32p_wPkg(self) -> list[tuple[rf.Network, str]]: """Augment imported s32p channels, w/ package response.""" return self.add_pkgs(self.get_chnls_s32p_noPkg())
[docs] def get_chnls_s4p_wPkg(self) -> list[tuple[rf.Network, str]]: """Augment imported s4p channels, w/ package response.""" return self.add_pkgs(self.get_chnls_s4p_noPkg())
[docs] def get_chnls_s32p_noPkg(self) -> list[tuple[rf.Network, str]]: """Import s32p file, w/o package.""" if not self.chnl_s32p: return [] ntwks = import_s32p(self.chnl_s32p, self.vic_chnl_ix) self.fmax = ntwks[0][0].f[-1] # Generate the pulse responses before adding the packages, for reference. pulse_resps_nopkg = self.gen_pulse_resps(ntwks, [], apply_eq=False) self.plotdata.set_data("pulse_raw", pulse_resps_nopkg[0]) self.pulse_resps_nopkg = pulse_resps_nopkg return ntwks
# @cached_property
[docs] def get_chnls_s4p_noPkg(self) -> list[tuple[rf.Network, str]]: """Import s4p files, w/o package.""" if not self.chnl_s4p_thru: return [] ntwks = [(sdd_21(rf.Network(self.chnl_s4p_thru)), 'THRU')] fmax = ntwks[0][0].f[-1] for fname in [self.chnl_s4p_fext1, self.chnl_s4p_fext2, self.chnl_s4p_fext3]: if fname: ntwk = sdd_21(rf.Network(fname)) ntwks.append((ntwk, 'FEXT')) if ntwk.f[-1] < fmax: fmax = ntwk.f[-1] for fname in [self.chnl_s4p_next1, self.chnl_s4p_next2, self.chnl_s4p_next3, self.chnl_s4p_next4]: if fname: ntwk = sdd_21(rf.Network(fname)) ntwks.append((sdd_21(rf.Network(fname)), 'NEXT')) if ntwk.f[-1] < fmax: fmax = ntwk.f[-1] self.fmax = fmax # Generate the pulse responses before adding the packages, for reference. pulse_resps_nopkg = self.gen_pulse_resps(ntwks, [], apply_eq=False) self.plotdata.set_data("pulse_raw", pulse_resps_nopkg[0]) self.pulse_resps_nopkg = pulse_resps_nopkg return ntwks
[docs] def add_pkgs(self, ntwks: list[tuple[rf.Network, str]]) -> list[tuple[rf.Network, str]]: """Add package response to raw channels and generate pulse responses.""" if not ntwks: return [] _ntwks = list(map(self.add_pkg, ntwks)) pulse_resps_noeq = self.gen_pulse_resps(_ntwks, [], apply_eq=False) self.rslts['vic_pulse_pk'] = max(pulse_resps_noeq[0]) * 1_000 # (mV) self.pulse_resps_noeq = pulse_resps_noeq self.plotdata.set_data("pulse_pkg", pulse_resps_noeq[0]) return _ntwks
[docs] def add_pkg(self, ntwk: tuple[rf.Network, str]) -> tuple[rf.Network, str]: """Add package response to raw channel.""" ntype = ntwk[1] if ntype == 'NEXT': return (self.sPkgNEXT ** ntwk[0] ** self.sPkgRx, ntype) else: return (self.sPkgTx ** ntwk[0] ** self.sPkgRx, ntype)
# Reserved functions def __init__(self): """ Instance creation only! Must still initialize. """ # Super-class initialization is ABSOLUTELY NECESSARY, in order # to get all the Traits/UI machinery setup correctly. super().__init__() self.rslts = {} self.fom_rslts = {} self.dbg = {} self.fb = self.FB self.c0_min = 0.62 self.tx_taps_min = [[0., 0., -0.18, -0.38, 0., 0.],]
[docs] @classmethod def init(cls, params: dict, chnl_fnames: list[str], vic_id: int, zp_sel: int = 1, num_ui: int = 500, gui: bool = False): """ Legacy initializer supports my VITA notebook, which was created before PyChOpMarg was altered to support a GUI. """ obj = cls() obj.set_params(params) assert zp_sel > 0 and zp_sel <= len(obj.zp_vals), RuntimeError( "`zp_sel` out of range!") if len(chnl_fnames) == 1: obj.chnl_s32p = chnl_fnames[0] else: obj.chnl_s4p_thru = chnl_fnames[0] if len(chnl_fnames) > 1: obj.chnl_s4p_fext1 = chnl_fnames[1] if len(chnl_fnames) > 2: obj.chnl_s4p_fext2 = chnl_fnames[2] if len(chnl_fnames) > 3: obj.chnl_s4p_next1 = chnl_fnames[3] if len(chnl_fnames) > 4: obj.chnl_s4p_next2 = chnl_fnames[4] if len(chnl_fnames) > 5: obj.chnl_s4p_next3 = chnl_fnames[5] obj.vic_chnl_ix = vic_id obj.zp = obj.zp_vals[zp_sel - 1] obj.num_ui = num_ui obj.gui = gui return obj
def __call__(self, do_opt_eq=True, tx_taps: Rvec = None): """ Calculate the COM value. Keyword Args: opt_eq: Perform optimization of linear equalization when True. Default: True tx_taps: Used when `do_opt_eq` = False. Default: None """ assert self.chnl_s32p or self.chnl_s4p_thru, RuntimeError( "You must, at least, select a thru path channel file, either 32 or 4 port.") self.chnls = self.get_chnls() self.status_str = "Optimizing EQ..." assert self.opt_eq(do_opt_eq=do_opt_eq, tx_taps=tx_taps), RuntimeError( "EQ optimization failed!") self.status_str = "Calculating noise..." As, Ani, self.cursor_ix = self.calc_noise() com = 20 * np.log10(As / Ani) self.As = As self.Ani = Ani self.com = com self.rslts['com'] = com self.rslts['As'] = As * 1e3 self.rslts['Ani'] = Ani * 1e3 self.status_str = f"Ready; COM = {com: 5.1f} dB" return com # Initializers
[docs] def set_params(self, params: COMParams) -> None: """ Set the COM instance parameters, according the the given dictionary. Args: params: Dictionary of COM parameter values. Raises: KeyError: If an expected key is not found in the provided dictionary. """ # Set default parameter values, as necessary. if 'z_c' not in params: params['zc'] = 78.2 if 'C_b' not in params: params['C_b'] = 0.0 if 'f_LF' not in params: params['f_LF'] = 0.001 if 'g_DC2' not in params: params['g_DC2'] = 0.0 # Capture parameters, adjusting units as necessary to keep all but (dB) and `eta0` SI. self.fb = params['fb'] * 1e9 self.nspui = params['M'] self.L = params['L'] self.fstep = params['fstep'] self.tx_taps_min = params['tx_taps_min'] self.tx_taps_max = params['tx_taps_max'] self.tx_taps_step = params['tx_taps_step'] self.fr = params['f_r'] self.fz = params['f_z'] * 1e9 self.fp1 = params['f_p1'] * 1e9 self.fp2 = params['f_p2'] * 1e9 self.fLF = params['f_LF'] * 1e9 self.gDC_vals = params['g_DC'] self.gDC2_vals = params['g_DC2'] self.Rd = params['R_d'] self.R0 = params['R_0'] self.Cd = params['C_d'] / 1e12 self.Cb = params['C_b'] / 1e12 self.Cp = params['C_p'] / 1e12 self.Av = params['A_v'] self.Afe = params['A_fe'] self.Ane = params['A_ne'] self.DER0 = params['DER_0'] self.sigma_Rj = params['sigma_Rj'] self.Add = params['A_DD'] self.eta0 = params['eta_0'] self.TxSNR = params['SNR_TX'] self.zc = params['z_c'] self.zp_vals = params['z_p'] # Stash input parameters, for future reference. self.params = params
# General functions
[docs] def sC(self, c: float) -> rf.Network: """ Return the 2-port network corresponding to a shunt capacitance, according to (93A-8). Args: c: Value of shunt capacitance (F). Returns: 2-port network equivalent to shunt capacitance, calculated at given frequencies. Raises: None """ r0 = self.R0 freqs = self.freqs w = TWOPI * freqs s = 1j * w s2p = np.array( [1 / (2 + _s * c * r0) * np.array( [[-_s * c * r0, 2], [2, -_s * c * r0]]) for _s in s]) return rf.Network(s=s2p, f=freqs, z0=[2 * r0, 2 * r0])
[docs] def H21(self, s2p: rf.Network) -> Cvec: """ Return the voltage transfer function, H21(f), of a terminated two port network, according to (93A-18). Args: s2p: Two port network of interest. Returns: Complex voltage transfer function at given frequencies. Raises: ValueError: If given network is not two port. Notes: 1. It is at this point in the analysis that the "raw" Touchstone data gets interpolated to our system frequency vector. 2. After this step, the package and R0/Rd mismatch have been accounted for, but not the EQ. """ assert s2p.s[0].shape == (2, 2), ValueError("I can only convert 2-port networks.") s2p.interpolate_self(self.freqs) g1 = self.gamma1 g2 = self.gamma2 s11 = s2p.s11.s.flatten() s12 = s2p.s12.s.flatten() s21 = s2p.s21.s.flatten() s22 = s2p.s22.s.flatten() dS = s11 * s22 - s12 * s21 return (s21 * (1 - g1) * (1 + g2)) / (1 - s11 * g1 - s22 * g2 + g1 * g2 * dS)
[docs] def H(self, s2p: rf.Network, tap_weights: Rvec, gDC: Optional[float] = None, gDC2: Optional[float] = None) -> Cvec: """ Return the voltage transfer function, H(f), of a complete COM signal path, according to (93A-19). Args: s2p: Two port network of interest. tap_weights: Tx FFE tap weights. Keyword Args: gDC: CTLE first stage d.c. gain. gDC2: CTLE second stage d.c. gain. Returns: Complex voltage transfer function of complete path. Raises: ValueError: If given network is not two port, or length of `tap_weights` is incorrect. Notes: 1. Assumes `self.gDC` and `self.gDC2` have been set correctly. 2. It is in this processing step that linear EQ is first applied. """ assert s2p.s[0, :, :].shape == (2, 2), ValueError( f"I can only convert 2-port networks. {s2p}") H_tx = calc_Hffe(list(tap_weights.flatten()), self.tx_n_post) H_rx = self.H21(s2p) * self.Hr * self.calc_Hctf(gDC, gDC2) return (H_tx * H_rx)
[docs] def pulse_resp(self, H: Cvec) -> Rvec: """ Return the unit pulse response, p(t), corresponding to the given voltage transfer function, H(f), according to (93A-24). Args: H: The voltage transfer function, H(f). Note: Possitive frequency components only, including fN. Returns: The pulse response corresponding to the given voltage transfer function. Raises: ValueError: If the length of the given voltage transfer function differs from that of the system frequency vector. Notes: 1. It is at this point in the signal processing chain that we change time domains. """ assert len(H) == len(self.freqs), ValueError( "Length of given H(f) does not match length of f!") Xsinc = self.Xsinc Ts = self.t_irfft[1] p = np.fft.irfft(Xsinc * H) p_mag = np.abs(p) p_beg = np.where(p_mag > 0.01 * max(p_mag))[0][0] - int(5 * self.ui / Ts) # Give it some "front porch". spln = interp1d(self.t_irfft, np.roll(p, -p_beg)) # `p` is not yet in our system time domain! return spln(self.times) # Now, it is.
[docs] def gen_pulse_resps(self, ntwks: list[tuple[rf.Network, str]], tx_taps: Rvec, gDC: Optional[float] = None, gDC2: Optional[float] = None, apply_eq: bool = True) -> list[Rvec]: """ Generate pulse responses for all networks. Args: ntwks: The list of networks to generate pulse responses for. tx_taps: Desired Tx tap weights. Keyword Args: apply_eq: Include linear EQ when True; otherwise, exclude it. Default: True Returns: List of pulse responses. Raises: None Notes: 1. Assumes `self.gDC` and `self.gDC2` have been set correctly, if not provided. """ pulse_resps = [] for ntwk, ntype in ntwks: if apply_eq: if ntype == 'NEXT': pr = self.pulse_resp(self.H(ntwk, np.zeros(tx_taps.shape), gDC, gDC2)) else: pr = self.pulse_resp(self.H(ntwk, tx_taps, gDC, gDC2)) else: pr = self.pulse_resp(self.H21(ntwk)) if ntype == 'THRU': pr *= self.Av elif ntype == 'NEXT': pr *= self.Ane else: pr *= self.Afe pulse_resps.append(pr) return pulse_resps
[docs] def filt_pr_samps(self, pr_samps: Rvec, As: float, rel_thresh: float = 0.001) -> Rvec: """ Filter a list of pulse response samples for minimum magnitude. Args: pr_samps: The pulse response samples to filter. As: Signal amplitude, as per 93A.1.6.c. Keyword Args: rel_thresh: Filtration threshold (As). Default: 0.001 (i.e. - 0.1%, as per Note 2 of 93A.1.7.1) Returns: The subset of `pr_samps` passing filtration. """ thresh = As * rel_thresh return np.array(list(filter(lambda x: abs(x) >= thresh, pr_samps)))
[docs] def calc_hJ(self, pulse_resp: Rvec, As: float, cursor_ix: int, rel_thresh: float = 0.001) -> Rvec: """ Calculate the set of slopes for valid pulse response samples. Args: pulse_resp: The pulse response of interest. As: Signal amplitude, as per 93A.1.6.c. cursor_ix: Cursor index. Keyword Args: rel_thresh: Filtration threshold (As). Default: 0.001 (i.e. - 0.1%, as per Note 2 of 93A.1.7.1) Returns: The calculated slopes around the valid samples. """ M = self.nspui thresh = As * rel_thresh valid_pr_samp_ixs = np.array(list(filter(lambda ix: abs(pulse_resp[ix]) >= thresh, range(cursor_ix, len(pulse_resp) - 1, M)))) m1s = pulse_resp[valid_pr_samp_ixs - 1] p1s = pulse_resp[valid_pr_samp_ixs + 1] return (p1s - m1s) / (2 / M) # (93A-28)
[docs] def loc_curs(self, pulse_resp: Rvec, max_range: int = 4) -> int: """ Locate the cursor position for the given pulse response, according to (93A-25) and (93A-26). Args: pulse_resp: The pulse response of interest. Keyword Args: max_range: The search radius, from the peak. Returns: The index in the given pulse response vector of the cursor. Notes: 1. As per v3.70 of the COM MATLAB code, we only minimize the residual of (93A-25); we don't try to solve it exactly. """ M = self.nspui dfe_max = self.bmax dfe_min = self.bmin # Find zero crossings. peak_loc = np.argmax(pulse_resp) peak_val = pulse_resp[peak_loc] search_start = max(0, peak_loc - 4 * M) zxi = np.where(np.diff(np.sign(pulse_resp[search_start:peak_loc] - .01 * peak_val)) >= 1)[0] + search_start assert zxi, RuntimeError("No zero crossings found!") zxi = zxi[-1] # Minimize Muller-Mueller criterion within a 2UI range after zero crossing. ix_best = zxi res_min = 1e6 for ix in range(zxi, zxi + 2 * M): b_1 = min(dfe_max[0], max(dfe_min[0], pulse_resp[ix + M] / pulse_resp[ix])) # (93A-26) res = abs(pulse_resp[ix - M] - (pulse_resp[ix + M] - b_1 * pulse_resp[ix])) # (93A-25) if res < res_min: ix_best = ix res_min = res return ix_best
[docs] def calc_fom(self, tx_taps: Rvec, gDC: Optional[float] = None, gDC2: Optional[float] = None) -> float: """ Calculate the *figure of merit* (FOM), given the existing linear EQ settings. Args: tx_taps: The Tx FFE tap weights, excepting the cursor. (The cursor takes whatever is left.) Keyword Args: gDC: CTLE first stage d.c. gain. gDC2: CTLE second stage d.c. gain. Returns: The resultant figure of merit. Raises: None Notes: 1. See: IEEE 802.3-2022 93A.1.6. """ L = self.L M = self.nspui freqs = self.freqs chnls = self.chnls # Step a - Pulse response construction. pulse_resps = self.gen_pulse_resps(chnls, np.array(tx_taps), gDC=gDC, gDC2=gDC2) # Step b - Cursor identification. try: vic_pulse_resp = np.array(pulse_resps[0]) except Exception: print(len(pulse_resps), len(chnls)) raise vic_peak_loc = np.argmax(vic_pulse_resp) cursor_ix = self.loc_curs(vic_pulse_resp) # Step c - As. vic_curs_val = vic_pulse_resp[cursor_ix] As = self.RLM * vic_curs_val / (L - 1) # Step d - Tx noise. varX = (L**2 - 1) / (3 * (L - 1)**2) # (93A-29) varTx = vic_curs_val**2 * pow(10, -self.TxSNR / 10) # (93A-30) # Step e - ISI. nDFE = len(self.bmin) # This is not compliant to the standaard, but is consistent w/ v2.60 of MATLAB code. n_pre = cursor_ix // M first_pre_ix = cursor_ix - n_pre * M vic_pulse_resp_isi_samps = np.concatenate((vic_pulse_resp[first_pre_ix:cursor_ix:M], vic_pulse_resp[cursor_ix + M::M])) vic_pulse_resp_post_samps = vic_pulse_resp_isi_samps[n_pre:] dfe_tap_weights = np.maximum( # (93A-26) self.bmin, np.minimum( self.bmax, (vic_pulse_resp_post_samps[:nDFE] / vic_curs_val))) hISI = vic_pulse_resp_isi_samps \ - vic_curs_val * np.pad(dfe_tap_weights, # noqa E127 (n_pre, len(vic_pulse_resp_post_samps) - nDFE), mode='constant', constant_values=0) # (93A-27) varISI = varX * sum(hISI**2) # (93A-31) self.dbg['vic_pulse_resp_isi_samps'] = vic_pulse_resp_isi_samps self.dbg['vic_pulse_resp_post_samps'] = vic_pulse_resp_post_samps self.dbg['hISI'] = hISI # Step f - Jitter noise. hJ = self.calc_hJ(vic_pulse_resp, As, cursor_ix) varJ = (self.Add**2 + self.sigma_Rj**2) * varX * sum(hJ**2) # (93A-32) # Step g - Crosstalk. varXT = 0 for pulse_resp in pulse_resps[1:]: # (93A-34) varXT += max([sum(np.array(self.filt_pr_samps(pulse_resp[m::M], As))**2) for m in range(M)]) # (93A-33) varXT *= varX # Step h - Spectral noise. df = freqs[1] varN = self.eta0 * sum(abs(self.Hr * self.calc_Hctf(gDC=gDC, gDC2=gDC2))**2) * (df / 1e9) # (93A-35) # Step i - FOM calculation. fom = 10 * np.log10(As**2 / (varTx + varISI + varJ + varXT + varN)) # (93A-36) # Stash our calculation results. self.fom_rslts['pulse_resps'] = pulse_resps self.fom_rslts['vic_pulse_resp'] = vic_pulse_resp self.fom_rslts['vic_peak_loc'] = vic_peak_loc self.fom_rslts['cursor_ix'] = cursor_ix self.fom_rslts['As'] = As self.fom_rslts['varTx'] = varTx self.fom_rslts['dfe_tap_weights'] = dfe_tap_weights self.fom_rslts['varISI'] = varISI self.fom_rslts['varJ'] = varJ self.fom_rslts['varXT'] = varXT self.fom_rslts['varN'] = varN return fom
[docs] def opt_eq(self, do_opt_eq: bool = True, tx_taps: Rvec = None) -> bool: """ Find the optimum values for the linear equalization parameters: c(-2), c(-1), c(1), gDC, and gDC2, as per IEEE 802.3-22 93A.1.6. Keyword Args: do_opt_eq: Perform optimization of linear EQ when True. Default: True tx_taps: Used when `do_opt_eq` = False. Default: None Returns: True if no errors encountered; False otherwise. """ if do_opt_eq: # Run the nested optimization loops. def check_taps(tx_taps: Rvec) -> bool: if (1 - sum(abs(np.array(tx_taps)))) < self.c0_min: return False else: return True fom_max = -100.0 fom_max_changed = False foms = [] for gDC2 in self.gDC2_vals: for gDC in self.gDC_vals: for n, tx_taps in enumerate(self.tx_combs): if not check_taps(np.array(tx_taps)): continue fom = self.calc_fom(tx_taps, gDC=gDC, gDC2=gDC2) foms.append(fom) if fom > fom_max: fom_max_changed = True fom_max = fom gDC2_best = gDC2 gDC_best = gDC tx_taps_best = tx_taps dfe_tap_weights_best = self.fom_rslts['dfe_tap_weights'] cursor_ix_best = self.fom_rslts['cursor_ix'] As_best = self.fom_rslts['As'] varTx_best = self.fom_rslts['varTx'] varISI_best = self.fom_rslts['varISI'] varJ_best = self.fom_rslts['varJ'] varXT_best = self.fom_rslts['varXT'] varN_best = self.fom_rslts['varN'] vic_pulse_resp = self.fom_rslts['vic_pulse_resp'] else: assert tx_taps, RuntimeError("You must define `tx_taps` when setting `do_opt_eq` False!") fom = self.calc_fom(tx_taps) foms = [fom] fom_max = fom fom_max_changed = True gDC2_best = self.gDC2 gDC_best = self.gDC tx_taps_best = tx_taps dfe_tap_weights_best = self.fom_rslts['dfe_tap_weights'] cursor_ix_best = self.fom_rslts['cursor_ix'] As_best = self.fom_rslts['As'] varTx_best = self.fom_rslts['varTx'] varISI_best = self.fom_rslts['varISI'] varJ_best = self.fom_rslts['varJ'] varXT_best = self.fom_rslts['varXT'] varN_best = self.fom_rslts['varN'] vic_pulse_resp = self.fom_rslts['vic_pulse_resp'] # Check for error and save the best results. if not fom_max_changed: return False self.fom = fom_max self.gDC2 = gDC2_best self.gDC = gDC_best self.tx_taps = [tx_taps_best,] self.fom_tx_taps = print_taps(tx_taps_best) self.fom_dfe_taps = print_taps(dfe_tap_weights_best) self.fom_cursor_ix = cursor_ix_best self.fom_As = As_best self.sigma_ISI = np.sqrt(varISI_best) self.sigma_J = np.sqrt(varJ_best) self.sigma_XT = np.sqrt(varXT_best) # These two are also calculated by `calc_noise()`. self.sigma_Tx = np.sqrt(varTx_best) self.sigma_N = np.sqrt(varN_best) self.foms = foms self.plotdata.set_data("pulse_eq", vic_pulse_resp) self.rslts['fom'] = fom_max self.rslts['gDC'] = gDC_best self.rslts['gDC2'] = gDC2_best self.rslts['tx_taps'] = tx_taps_best[2:4] self.rslts['dfe_taps'] = dfe_tap_weights_best self.rslts['sigma_ISI'] = self.sigma_ISI * 1e3 self.rslts['sigma_XT'] = self.sigma_XT * 1e3 self.rslts['sigma_J'] = self.sigma_J * 1e3 return True
[docs] def calc_noise(self) -> tuple[float, float, int]: """ Calculate the interference and noise for COM. Returns: - signal amplitude - noise + interference amplitude (V) - cursor location within victim pulse response vector Notes: 1. Assumes the following instance variables have been set: - gDC - gDC2 - tx_taps 2. Warns if `2*As/npts` rises above 10 uV, against standard's recommendation. """ L = self.L M = self.nspui freqs = self.freqs nDFE = len(self.bmin) pulse_resps = self.gen_pulse_resps(self.chnls, np.array(self.tx_taps)) vic_pulse_resp = pulse_resps[0] cursor_ix = self.loc_curs(vic_pulse_resp) vic_curs_val = vic_pulse_resp[cursor_ix] As = self.RLM * vic_curs_val / (L - 1) npts = 2 * max(int(As / 0.001), 1_000) + 1 # Note 1 of 93A.1.7.1; MUST BE ODD! y = np.linspace(-As, As, npts) ystep = 2 * As / (npts - 1) delta = np.zeros(npts) delta[npts // 2] = 1 varX = (L**2 - 1) / (3 * (L - 1)**2) # (93A-29) df = freqs[1] def pn(hn: float) -> Rvec: """ (93A-39) """ return 1 / L * sum([np.roll(delta, int((2 * el / (L - 1) - 1) * hn / ystep)) for el in range(L)]) def p(h_samps: Rvec) -> Rvec: """ Calculate the "delta-pmf" for a set of pulse response samples, as per (93A-40). Args: h_samps: Vector of pulse response samples. As: Signal amplitude, as per 93A.1.6.c. Returns: Vector of "deltas" giving amplitude probability distribution. Raises: None Notes: 1. The input set of pulse response samples is filtered, as per Note 2 of 93A.1.7.1. """ pns = [] rslts = [] rslt = delta rslts.append(rslt) for hn in h_samps: _pn = pn(hn) pns.append(_pn) rslt = np.convolve(rslt, _pn, mode='same') rslts.append(rslt) rslt /= sum(rslt) # Enforce a PMF. Commenting out didn't make a difference. self.dbg['pns'] = pns self.dbg['rslts'] = rslts return rslt # Sec. 93A.1.7.2 varN = self.eta0 * sum(abs(self.Hr * self.Hctf)**2) * (df / 1e9) # (93A-35) varTx = vic_curs_val**2 * pow(10, -self.TxSNR / 10) # (93A-30) hJ = self.calc_hJ(vic_pulse_resp, As, cursor_ix) pJ = p(self.Add * hJ) self.dbg['pJ'] = pJ self.dbg['hJ'] = hJ varG = varTx + self.sigma_Rj**2 * varX * sum(hJ**2) + varN # (93A-41) pG = np.exp(-y**2 / (2 * varG)) / np.sqrt(TWOPI * varG) # (93A-42) pN = np.convolve(pG, pJ, mode='same') # (93A-43) # Sec. 93A.1.7.3 # - ISI (Inconsistent w/ IEEE 802.3-22, but consistent w/ v2.60 of MATLAB code.) n_pre = cursor_ix // M first_pre_ix = cursor_ix - n_pre * M vic_pulse_resp_isi_samps = np.concatenate((vic_pulse_resp[first_pre_ix:cursor_ix:M], vic_pulse_resp[cursor_ix + M::M])) vic_pulse_resp_post_samps = vic_pulse_resp_isi_samps[n_pre:] dfe_tap_weights = np.maximum( # (93A-26) self.bmin, np.minimum( self.bmax, (vic_pulse_resp_post_samps[:nDFE] / vic_curs_val))) hISI = vic_pulse_resp_isi_samps \ - vic_curs_val * np.pad(dfe_tap_weights, # noqa E127 (n_pre, len(vic_pulse_resp_post_samps) - nDFE), mode='constant', constant_values=0) # (93A-27) hISI = self.filt_pr_samps(hISI, As) py = p(hISI) # `hISI` from (93A-27); `p(y)` as per (93A-40) # - Crosstalk self.rslts['py0'] = py.copy() # For debugging. xt_samps = [] pks = [] # For debugging. for pulse_resp in pulse_resps[1:]: # (93A-44) i = np.argmax([sum(np.array(self.filt_pr_samps(pulse_resp[m::M], As))**2) for m in range(M)]) # (93A-33) samps = self.filt_pr_samps(pulse_resp[i::M], As) xt_samps.append(samps) pk = p(samps) # For debugging. pks.append(pk) py = np.convolve(py, pk, mode='same') self.rslts['py1'] = py.copy() # For debugging. self.xt_samps = xt_samps self.pks = pks py = np.convolve(py, pN, mode='same') # (93A-45) # Final calculation Py = np.cumsum(py) Py /= Py[-1] # Enforce cumulative probability distribution. # Store some results. self.pulse_resps = pulse_resps self.com_cursor_ix = cursor_ix self.com_sigma_Tx = np.sqrt(varTx) self.com_sigma_G = np.sqrt(varG) self.com_sigma_N = np.sqrt(varN) self.rslts['pG'] = pG self.rslts['pN'] = pN self.rslts['py'] = py self.rslts['Py'] = Py self.rslts['y'] = y self.com_dfe_taps = print_taps(dfe_tap_weights) self.com_As = As self.rslts['sigma_G'] = self.com_sigma_G * 1e3 self.rslts['sigma_Tx'] = self.com_sigma_Tx * 1e3 self.rslts['sigma_N'] = self.com_sigma_N * 1e3 return (As, abs(np.where(Py >= self.DER0)[0][0] - npts // 2) * ystep, cursor_ix)