"""
MS Excel importing utilities for PyChOpMarg.
Original author: David Banas <capn.freako@gmail.com>
Original date: January 17, 2025
Copyright (c) 2025 David Banas; all rights reserved World wide.
"""
from functools import reduce
from pathlib import Path
import re
from typing import Any, Callable, TypeVar
import numpy as np
from numpy import arange, array, concatenate, ones, zeros
from numpy.typing import NDArray
import pandas as pd
from pychopmarg.config.ieee_8023dj import IEEE_8023dj
from pychopmarg.config.template import COMParams
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
# Field name translation table (XLS => ``COMParams``)
com_fields: dict[str, str] = {
"z_p select": "zp_sel",
"z_p (TX)": "zp_tx",
"z_p (NEXT)": "zp_next",
"z_p (FEXT)": "zp_fext",
"z_p (RX)": "zp_rx",
"z_bp (TX)": "zbp_tx",
"z_bp (NEXT)": "zbp_next",
"z_bp (FEXT)": "zbp_fext",
"z_bp (RX)": "zbp_rx",
"c(0)": "c0_min",
"b_max(1)": "b_max1",
"b_min(1)": "b_min1",
"b_max(2..N_b)": "b_maxN",
"ffe_pre_tap_len": "dw",
"f_b": "fb",
"Delta_f": "fstep",
"g_DC_HP": "g_DC2",
"f_HP_PZ": "f_LF",
}
ignored_fields = [
"Operational control",
"COM Pass threshold",
"Include PCB",
"Table 92–12 parameters",
"Parameter",
"PMD_type",
"Histogram_Window_Weight",
]
[docs]
def first(f: Callable[[T1], T2]) -> Callable[[tuple[T1, T3]], tuple[T2, T3]]:
"Translation of Haskell ``first`` function."
return lambda pr: (f(pr[0]), pr[1])
[docs]
def second(f: Callable[[T1], T2]) -> Callable[[tuple[T3, T1]], tuple[T3, T2]]:
"Translation of Haskell ``second`` function."
return lambda pr: (pr[0], f(pr[1]))
[docs]
def compose(*functions):
"Function composition w/ NO TYPE CHECKING!"
return reduce(lambda f, g: lambda x: f(g(x)), functions)
[docs]
def apply2(
f: Callable[[T1], T2],
g: Callable[[T3], T4]
) -> Callable[[tuple[T1, T3]], tuple[T2, T4]]:
"Translation of Haskell ``***`` operator."
return compose(first(f), second(g))
[docs]
def alternative(
f: Callable[[T1], T2],
g: Callable[[T1], T2],
x: T1
) -> T2:
"""
Try ``f`` and if it fails apply ``g``.
Args:
f: First function to try.
g: Second function to try.
x: Function argument.
Notes:
1. It would be preferable to omit ``x`` from the argument list
and return a function, instead of a value, but we can't do that,
because Python doesn't allow a try/except block inside a lambda.
"""
try:
rslt = f(x)
except: # noqa=E722 pylint: disable=bare-except
rslt = g(x)
return rslt
[docs]
def parse_Mrange(mStr: str) -> NDArray:
"Parse a string containing an M-code range."
start, step, stop = list(map(float, mStr.strip("[] ").split(":")))
if start == stop == 0:
return array([0.0])
return arange(start, stop + step / 2, step)
[docs]
def parse_Mfloat(mStr: str) -> NDArray:
"Parse a string containing a single M-code number."
return array([float(mStr)])
[docs]
def parse_Mfloats(mStr: str) -> NDArray:
"Parse a string containing a repeated M-code number."
pattern = r"([-]?[0-9.]+)\*ones\(1,\s*([0-9]+)\)"
# match = re.search(pattern, mStr, re.DEBUG) # Leave as example.
match = re.search(pattern, mStr)
if match:
return array([float(match.group(1))] * int(match.group(2)))
raise RuntimeError(f"Couldn't parse: '{mStr}'.")
[docs]
def parse_Mlist(mStr: str) -> NDArray:
"Parse a string containing an M-code list of numbers."
tokens = list(map(lambda s: s.strip(","),
mStr.strip("[] ").split()))
return concatenate(list(map(lambda s: alternative(parse_Mfloat, parse_Mfloats, s),
tokens)))
[docs]
def parse_Marray(mStr: str) -> NDArray:
"Parse a string containing either an M-code range or list of numbers."
return array(list(filter(lambda x: x is not None,
alternative(parse_Mrange, parse_Mlist, mStr))))
[docs]
def parse_Mmatrix(mStr: str) -> NDArray:
"Parse a string containing an M-code matrix of numbers."
rslt = array(list(map(parse_Marray, mStr.strip("[]").split(";"))))
if rslt.shape[0] == 1:
return rslt.flatten()
return rslt
[docs]
def match_ignored_field_name_prefix(mName: str) -> bool:
"Match MATLAB field name prefix to any ignored field name."
for nm in ignored_fields:
if mName.startswith(nm):
return True
return False
[docs]
def cfg_trans(com_cfg: NDArray) -> dict[str, Any]:
"Translate/filter the names/values of the given 2D NumPy array."
return dict(map(apply2(lambda s: com_fields[s] if s in com_fields else s, # type: ignore
lambda s: parse_Mmatrix(s) if isinstance(s, str) else s),
filter(lambda pr: not pd.isna(pr[0]) and not match_ignored_field_name_prefix(pr[0]),
com_cfg)))
# Kept global, so I can inspect it later while debugging:
com_params_dict = {} # pylint: disable=global-statement
[docs]
def get_com_params(cfg_file: Path) -> COMParams: # noqa=E501 pylint: disable=too-many-locals,too-many-branches,too-many-statements
"Read a COM configuration XLS file and return an equivalent ``COMParams`` instance."
global com_params_dict # pylint: disable=global-statement
if cfg_file.suffix == "xlsx": # You'll need to manually export to `*.xls` from within Excel.
raise RuntimeError("Currently, *.XLSX files must first be manually converted to *.XLS.")
# com_cfg = pd.read_excel(cfg_file, engine_kwargs={"read_only": True}) # Doesn't work, currently.
com_cfg = pd.read_excel(cfg_file)
_com_cfg = com_cfg.to_numpy()
com_cfg1 = _com_cfg[:, range(0, 2)]
com_cfg2 = _com_cfg[:, range(9, 11)]
com_params_dict = vars(IEEE_8023dj).copy()
com_params_dict.update(cfg_trans(com_cfg1))
com_params_dict.update(cfg_trans(com_cfg2))
# Set the Tx tap ranges/steps.
N_TX_TAPS = 6 # not including the cursor
N_TX_TAPS_2 = N_TX_TAPS // 2
tx_taps_min = [0.] * N_TX_TAPS
tx_taps_max = [0.] * N_TX_TAPS
tx_taps_step = [0.] * N_TX_TAPS
def set_tap(ix, weights):
if isinstance(weights, (list, np.ndarray)):
tx_taps_min[ix] = min(weights)
tx_taps_max[ix] = max(weights)
tx_taps_step[ix] = (tx_taps_max[ix] - tx_taps_min[ix]) / (len(weights) - 1)
else: # Assume scalar.
tx_taps_min[ix] = weights
tx_taps_max[ix] = weights
tx_taps_step[ix] = 0
for n in range(N_TX_TAPS_2):
mKey = f"c(-{n + 1})"
pKey = f"c({n + 1})"
if mKey in com_params_dict:
set_tap(N_TX_TAPS_2 - (n + 1), com_params_dict[mKey])
if pKey in com_params_dict:
set_tap(N_TX_TAPS_2 + n, com_params_dict[pKey])
# Set Rx FFE config.
if "dw" in com_params_dict:
assert "ffe_post_tap_len" in com_params_dict, ValueError(
"Either both or neither of: 'ffe_pre_tap_len' & 'ffe_post_tap_len' must be given in the configuration spreadsheet!") # noqa=E501
nRxPreTaps = com_params_dict["dw"]
nRxPostTaps = com_params_dict["ffe_post_tap_len"]
nRxTaps = nRxPreTaps + 1 + nRxPostTaps # Includes cursor tap.
rx_taps_min = array([-0.7] * nRxTaps)
rx_taps_max = array([0.7] * nRxTaps)
# Set cursor max/min to 1.0 (i.e. - no clipping, because relative).
rx_taps_min[nRxPreTaps] = rx_taps_max[nRxPreTaps] = 1.0
if "ffe_pre_tap1_max" in com_params_dict:
rx_taps_max[nRxPreTaps - 1] = com_params_dict["ffe_pre_tap1_max"]
rx_taps_min[nRxPreTaps - 1] = -com_params_dict["ffe_pre_tap1_max"]
if "ffe_post_tap1_max" in com_params_dict:
rx_taps_max[nRxPreTaps + 1] = com_params_dict["ffe_post_tap1_max"]
rx_taps_min[nRxPreTaps + 1] = -com_params_dict["ffe_post_tap1_max"]
if "ffe_tapn_max" in com_params_dict:
rx_taps_max[:nRxPreTaps - 1] = com_params_dict["ffe_tapn_max"] * ones(nRxPreTaps - 1)
rx_taps_max[nRxPreTaps + 2:] = com_params_dict["ffe_tapn_max"] * ones(nRxTaps - nRxPreTaps - 2)
rx_taps_min[:nRxPreTaps - 1] = -com_params_dict["ffe_tapn_max"] * ones(nRxPreTaps - 1)
rx_taps_min[nRxPreTaps + 2:] = -com_params_dict["ffe_tapn_max"] * ones(nRxTaps - nRxPreTaps - 2)
# Make sure CTLE d.c. gains are both lists.
if not isinstance(com_params_dict["g_DC"], (list, np.ndarray)):
com_params_dict["g_DC"] = [com_params_dict["g_DC"]]
if not isinstance(com_params_dict["g_DC2"], (list, np.ndarray)):
com_params_dict["g_DC2"] = [com_params_dict["g_DC2"]]
# Set Rx DFE min./max.
dfe_max = dfe_min = array([]) # Default is an empty array and used if we don't find `N_b` in our parse results.
if "N_b" in com_params_dict:
N_b = int(com_params_dict["N_b"])
if N_b > 0:
dfe_max = zeros(N_b)
dfe_min = zeros(N_b)
if "b_max1" in com_params_dict:
dfe_max[0] = float(com_params_dict["b_max1"])
if "b_min1" in com_params_dict:
dfe_min[0] = float(com_params_dict["b_min1"])
if N_b > 1:
if "b_maxN" in com_params_dict:
dfe_max[1:] = dfe_min[1:] = float(com_params_dict["b_maxN"])
rslt = COMParams(
com_params_dict["fb"],
com_params_dict["fstep"],
com_params_dict["fmax"],
com_params_dict["L"],
com_params_dict["M"],
com_params_dict["DER_0"],
com_params_dict["T_r"],
com_params_dict["RLM"],
com_params_dict["A_v"],
com_params_dict["A_fe"],
com_params_dict["A_ne"],
com_params_dict["R_0"],
com_params_dict["A_DD"],
com_params_dict["SNR_TX"],
com_params_dict["eta_0"],
com_params_dict["sigma_Rj"],
com_params_dict["f_z"],
com_params_dict["f_p1"],
com_params_dict["f_p2"],
com_params_dict["f_LF"],
com_params_dict["g_DC"],
com_params_dict["g_DC2"],
tx_taps_min,
tx_taps_max,
tx_taps_step,
com_params_dict["c0_min"],
com_params_dict["f_r"],
dfe_min,
dfe_max,
rx_taps_min,
rx_taps_max,
com_params_dict["dw"],
com_params_dict["R_d"],
com_params_dict["C_d"],
com_params_dict["C_b"],
com_params_dict["C_p"],
com_params_dict["L_s"],
com_params_dict["z_c"],
com_params_dict["z_p"],
com_params_dict["gamma0"],
com_params_dict["a1"],
com_params_dict["a2"],
com_params_dict["tau"],
)
if "z_pB" in com_params_dict:
rslt.z_pB = com_params_dict["z_pB"]
return rslt