Skip to content

Utilities

Utility functions for scpviz.

This package provides helper and processing functions used throughout scpviz. Import as:

from scpviz import utils as scutils

Submodules (for maintainers): formatting, data, class_filter, id_maps, stats.

Text / formatting

Functions:

Name Description
format_log_prefix

Return standardized log prefixes for messages.

Data access + transformation

Functions:

Name Description
parse_filename_index

Parse sample metadata from filename columns.

get_samplenames

Resolve sample names for given classes from .obs.

get_classlist

Return unique class values for specified .obs columns.

get_adata_layer

Safely extract a matrix from .X or .layers.

get_adata

Retrieve the .prot or .pep AnnData from a pAnnData object.

get_abundance

Extract abundance data from pAnnData or AnnData.

resolve_accessions

Map gene names or accessions to .var_names.

get_pep_prot_mapping

Determine peptide-to-protein mapping column.

update_layer_provenance

Register a matrix layer in adata.uns['layer_provenance'].

resolve_input_layer

Map layer='X' to uns['current_X_layer'] for provenance.

infer_layer_is_log

Infer log-transformed layers via provenance or name heuristic.

Sample selection / set logic

Functions:

Name Description
format_class_filter

Standardize class/value inputs for filtering.

filter

Legacy sample filtering (prefer pAnnData.filter_sample_values).

resolve_class_filter

Resolve class/value pairs and apply filtering.

get_upset_contents

Build contents for UpSet plots from pAnnData.

get_upset_query

Query features present/absent in UpSet contents.

Identifier mappings (UniProt / STRING)

Functions:

Name Description
get_uniprot_fields_worker

Low-level UniProt REST API query function (batch up to 1024).

get_uniprot_fields

High-level UniProt API wrapper with batching.

standardize_uniprot_columns

Normalize UniProt column names for stable downstream use.

get_string_mappings

Map UniProt accessions to STRING IDs (UniProt + STRING fallback).

convert_identifiers

Convert between accession / gene / STRING / organism_id.

Statistics

Functions:

Name Description
pairwise_log2fc

Compute pairwise median log2 fold change between groups.

de_adata

Differential expression helper over AnnData matrices.

get_pca_importance

Identify most important features for PCA components.

get_protein_clusters

Retrieve hierarchical clusters from stored linkage.

Warning

Many functions here are internal helpers. For common workflows (filtering, plotting, enrichment), prefer the corresponding pAnnData methods when available.

convert_identifiers

convert_identifiers(
    ids: list[str],
    from_type: str,
    to_type: str | list[str],
    pdata: pAnnData | None = None,
    use_cache: bool = True,
    return_type: str = "dict",
    verbose: bool = True,
) -> (
    dict[str, dict[str, Any]]
    | pd.DataFrame
    | tuple[dict[str, dict[str, Any]], pd.DataFrame]
)

Convert identifiers between UniProt-compatible types.

Supports mapping between protein accessions, gene names, STRING IDs, and organism IDs. Multiple output types may be requested at once.

Parameters:

Name Type Description Default
ids list of str

Input identifiers.

required
from_type str

Source identifier type ('accession', 'gene'). 'organism_id' cannot be used as a source.

required
to_type str or list of str

Target identifier type(s). May include any of: ['gene', 'string', 'organism_id'].

required
pdata pAnnData

pAnnData object providing cached accession–gene mappings. If provided, use_cache is automatically set to True.

None
use_cache bool

Whether to use cached mappings from pdata. (default: True)

True
return_type str

Output format: - 'dict': {input → {to_type → value}} - 'df': DataFrame with columns [from_type, *to_type] - 'both': (dict, DataFrame)

'dict'
verbose bool

Whether to print progress messages.

True

Returns:

Type Description
dict[str, dict[str, Any]] | DataFrame | tuple[dict[str, dict[str, Any]], DataFrame]

dict, pandas.DataFrame, or tuple: Depending on return_type.

Example

convert_identifiers(["P12345", "Q9XYZ1"], "accession", "gene", pdata=pdata) convert_identifiers(["P12345"], "accession", ["gene", "string", "organism_id"], return_type="df")

Source code in src/scpviz/utils/id_maps.py
def convert_identifiers(
    ids: list[str],
    from_type: str,
    to_type: str | list[str],
    pdata: pAnnData | None = None,
    use_cache: bool = True,
    return_type: str = "dict",
    verbose: bool = True,
) -> dict[str, dict[str, Any]] | pd.DataFrame | tuple[dict[str, dict[str, Any]], pd.DataFrame]:
    """
    Convert identifiers between UniProt-compatible types.

    Supports mapping between protein accessions, gene names, STRING IDs,
    and organism IDs. Multiple output types may be requested at once.

    Args:
        ids (list of str): Input identifiers.
        from_type (str): Source identifier type ('accession', 'gene').
            'organism_id' cannot be used as a source.
        to_type (str or list of str): Target identifier type(s).
            May include any of: ['gene', 'string', 'organism_id'].
        pdata (pAnnData, optional): pAnnData object providing cached
            accession–gene mappings. If provided, `use_cache` is
            automatically set to True.
        use_cache (bool): Whether to use cached mappings from `pdata`.
            (default: True)
        return_type (str): Output format:
            - 'dict': {input → {to_type → value}}
            - 'df': DataFrame with columns [from_type, *to_type]
            - 'both': (dict, DataFrame)
        verbose (bool): Whether to print progress messages.

    Returns:
        dict, pandas.DataFrame, or tuple: Depending on `return_type`.

    Example:
        >>> convert_identifiers(["P12345", "Q9XYZ1"], "accession", "gene", pdata=pdata)
        >>> convert_identifiers(["P12345"], "accession", ["gene", "string", "organism_id"], return_type="df")
    """
    import pandas as pd
    import numpy as np
    import scpviz.utils as _u

    if not ids:
        empty_df = pd.DataFrame(columns=[from_type] + ([to_type] if isinstance(to_type, str) else list(to_type)))
        return {} if return_type != "df" else empty_df

    if pdata is not None:
        use_cache = True

    from_col, to_cols, search_fields = _map_uniprot_field(from_type, to_type)
    if isinstance(to_type, str):
        to_type = [to_type]

    # canonical UniProt field map (consistent with standardize_uniprot_columns)
    _FIELD_MAP = {
        "accession": "accession",
        "gene": "gene_primary",
        "string": "xref_string",
        "organism_id": "organism_id",
    }

    # --- Logging
    if verbose:
        print(f"{format_log_prefix('search', indent=1)} Converting from '{from_type}' to {to_type} for {len(ids)} identifiers...")
        if pdata is not None:
            cacheable_types = {"accession", "gene"}
            api_needed = [t for t in to_type if t not in cacheable_types]
            if set([from_type] + to_type).issubset(cacheable_types):
                print(f"{format_log_prefix('info_only', indent=2)} Using cached mapping from pdata (no UniProt queries).")
            elif api_needed:
                api_list = ", ".join(api_needed)
                print(f"{format_log_prefix('info_only', indent=2)} Using cached mapping for gene/accession; UniProt lookup required for: {api_list}.")
        else:
            print(f"{format_log_prefix('info_only', indent=2)} No pdata provided — querying UniProt for all target fields.")

    # --- Tier 1: cache lookup (only accession <-> gene)
    resolved = {id_: {t: None for t in to_type} for id_ in ids}
    to_query = list(ids)

    if pdata is not None and use_cache and {"accession", "gene"}.issuperset({from_type, *to_type}):
        if from_type == "accession" and "gene" in to_type:
            _, acc_to_gene = pdata.get_identifier_maps(on="protein")
            for acc in ids:
                if acc in acc_to_gene:
                    resolved[acc]["gene"] = acc_to_gene[acc]
        elif from_type == "gene" and "accession" in to_type:
            gene_to_acc, _ = pdata.get_identifier_maps(on="protein")
            for gene in ids:
                if gene in gene_to_acc:
                    resolved[gene]["accession"] = gene_to_acc[gene]

        # Filter unmapped
        to_query = [x for x, v in resolved.items() if not any(vv for vv in v.values())]

    # --- Tier 3: UniProt API
    df = pd.DataFrame()
    if len(to_query) > 0:
        # Hybrid case: gene → STRING / organism_id
        if from_type == "gene":
            gene_to_acc = convert_identifiers(to_query, "gene", "accession", pdata=pdata, use_cache=use_cache, verbose=False)
            accs = [v.get("accession") for v in gene_to_acc.values() if v.get("accession")]
            if accs:
                df = _u.get_uniprot_fields(accs, search_fields=search_fields, standardize=True)
                df = _u.standardize_uniprot_columns(df)
                df = df.drop_duplicates(subset="accession", keep="first")

                # Build per-target maps
                per_target_maps = {}
                for t in to_type:
                    col = _FIELD_MAP[t]
                    if col in df.columns:
                        per_target_maps[t] = dict(zip(df["accession"], df[col]))
                    else:
                        per_target_maps[t] = {}

                # Assign results
                for g, acc_dict in gene_to_acc.items():
                    acc = acc_dict.get("accession")
                    for t in to_type:
                        resolved[g][t] = per_target_maps[t].get(acc) if acc else None
            else:
                for g in to_query:
                    for t in to_type:
                        resolved[g][t] = None

        else:
            # Direct mapping (accession → X)
            df = _u.get_uniprot_fields(to_query, search_fields=search_fields, standardize=True)

            # --- Clean up STRING results if present
            if not df.empty:
                if "xref_string" in df.columns and isinstance(df["xref_string"], pd.Series):
                    df["xref_string"] = (
                        df["xref_string"]
                        .astype(str)
                        .apply(lambda s: s.replace(";", "").strip() if isinstance(s, str) else np.nan)
                        .replace({"nan": np.nan, "None": np.nan, "": np.nan})
                    )
                elif "string" in to_type and verbose:
                    print(f"{format_log_prefix('warn_only', indent=3)} UniProt did not return 'xref_string' field — possible API schema drift.")

            if not df.empty and from_col in df.columns:
                per_target_maps = {}
                for t in to_type:
                    col = _FIELD_MAP[t]
                    if col in df.columns:
                        per_target_maps[t] = dict(zip(df[from_col], df[col]))
                    else:
                        per_target_maps[t] = {}

                for id_ in to_query:
                    for t in to_type:
                        resolved[id_][t] = per_target_maps[t].get(id_)
            else:
                for id_ in to_query:
                    for t in to_type:
                        resolved[id_][t] = None

    # --- Reporting
    resolved_count = sum(
        any(vv is not None and not pd.isna(vv) for vv in v.values()) for v in resolved.values()
    )
    missing = [k for k, v in resolved.items() if all(vv is None or pd.isna(vv) for vv in v.values())]

    if verbose:
        local_resolved = len(ids) - len(to_query)
        api_resolved = resolved_count - local_resolved
        print(f"{format_log_prefix('result_only', indent=2)} {resolved_count}/{len(ids)} identifiers successfully converted "
            f"({local_resolved} local, {api_resolved} via UniProt).")
        if missing:
            print(f"{format_log_prefix('warn_only', indent=2)} {len(missing)} identifiers could not be resolved:")
            print("        " + ", ".join(missing[:10]) + ("..." if len(missing) > 10 else ""))

    # --- Output
    result_df = pd.DataFrame({from_type: list(resolved.keys())})
    for t in to_type:
        result_df[t] = [resolved[i][t] for i in result_df[from_type]]

    if return_type == "dict":
        return resolved
    elif return_type == "df":
        return result_df
    elif return_type == "both":
        return resolved, result_df
    else:
        raise ValueError("Invalid return_type. Choose from {'dict', 'df', 'both'}.")

de_adata

de_adata(
    adata: AnnData,
    values: list[dict[str, Any] | list[str]] | None = None,
    class_type: str | list[str] | None = None,
    method: str = "ttest",
    fold_change_mode: str = "mean",
    layer: str = "X",
    pval: float = 0.05,
    log2fc: float = 1.0,
    data_is_log: bool = False,
    log_base: float = 2.0,
    pseudocount: float = 1.0,
    gene_col: str | None = None,
) -> pd.DataFrame

Standalone DE analysis for AnnData. Produces a volcano-ready DataFrame identical to pdata.de().

Supports
  • Legacy-style: class_type="condition", values=["A","B"]
  • Legacy multi-col: class_type=["cellline","treatment"], values=[["HCT116","DMSO"], ["HCT116","Drug"]]
  • Dictionary-style: values=[{"cellline":"HCT116","treatment":"DMSO"}, {...}]

Parameters:

Name Type Description Default
adata AnnData

AnnData object.

required
values list of dict or list of list

Sample group filters to compare.

  • Dictionary-style (recommended): [{'cellline': 'HCT116', 'treatment': 'DMSO'}, {...}]
  • Legacy-style (if class_type is provided): [['HCT116', 'DMSO'], ['HCT116', 'DrugX']]
None
class_type str or list of str

Legacy-style class label(s) to interpret values.

None
method str

'ttest', 'mannwhitneyu', 'wilcoxon'.

'ttest'
fold_change_mode str

'mean' or 'pairwise_median'.

'mean'
layer str

Layer to use. Default is 'X'.

'X'
pval_thresh float

p-value threshold.

required
log2fc_thresh float

log2 fold change threshold.

required
data_is_log bool

If True, treat layer as log-transformed and un-log to compute fold changes.

False
log_base float

Base of the log used in layer. Default 2.0.

2.0
pseudocount float

If data is log of (x + pseudocount), provide that here (e.g., 1.0 for log2(x+1)).

1.0
gene_col str

Column in adata.var to use for the "Genes" field in the output. Will use: - adata.var['Genes'] by default, - adata.var[<gene_col>] if provided by the user, otherwise - adata.var_names if the above do not exist.

None

Returns:

Type Description
DataFrame

pandas.DataFrame: DE results with volcano-ready columns.

Source code in src/scpviz/utils/stats.py
def de_adata(
    adata: ad.AnnData,
    values: list[dict[str, Any] | list[str]] | None = None,
    class_type: str | list[str] | None = None,
    method: str = "ttest",
    fold_change_mode: str = "mean",
    layer: str = "X",
    pval: float = 0.05,
    log2fc: float = 1.0,
    data_is_log: bool = False,
    log_base: float = 2.0,
    pseudocount: float = 1.0,
    gene_col: str | None = None,
) -> pd.DataFrame:
    """
    Standalone DE analysis for AnnData. Produces a volcano-ready DataFrame identical to pdata.de().

    Supports:
        - Legacy-style: class_type="condition", values=["A","B"]
        - Legacy multi-col: class_type=["cellline","treatment"],
                            values=[["HCT116","DMSO"], ["HCT116","Drug"]]
        - Dictionary-style: values=[{"cellline":"HCT116","treatment":"DMSO"}, {...}]    

    Args:
        adata (AnnData): AnnData object.
        values (list of dict or list of list): Sample group filters to compare.

            - Dictionary-style (recommended): [{'cellline': 'HCT116', 'treatment': 'DMSO'}, {...}]
            - Legacy-style (if `class_type` is provided): [['HCT116', 'DMSO'], ['HCT116', 'DrugX']]

        class_type (str or list of str, optional): Legacy-style class label(s) to interpret `values`.
        method (str): 'ttest', 'mannwhitneyu', 'wilcoxon'.
        fold_change_mode (str): 'mean' or 'pairwise_median'.
        layer (str): Layer to use. Default is 'X'.
        pval_thresh (float): p-value threshold.
        log2fc_thresh (float): log2 fold change threshold.
        data_is_log (bool): If True, treat `layer` as log-transformed and
            un-log to compute fold changes.
        log_base (float): Base of the log used in `layer`. Default 2.0.
        pseudocount (float): If data is log of (x + pseudocount), provide that
            here (e.g., 1.0 for log2(x+1)).
        gene_col (str, optional): Column in `adata.var` to use for the "Genes"
            field in the output. Will use:
            - `adata.var['Genes']` by default,
            - `adata.var[<gene_col>]` if provided by the user, otherwise
            - `adata.var_names` if the above do not exist.

    Returns:
        pandas.DataFrame: DE results with volcano-ready columns.
    """

    def to_dict_list(class_type, val):
        """Convert legacy values into a list of dictionary filters."""
        if isinstance(val, dict):
            return [val]

        # if class_type is singular
        if isinstance(class_type, str):
            return [{class_type: val}]

        # if class_type is list (multi-column)
        if isinstance(class_type, list) and isinstance(val, list):
            if len(class_type) != len(val):
                raise ValueError("Length mismatch: class_type and values.")
            return [dict(zip(class_type, val))]

        raise ValueError("Invalid legacy DE input format.")

    def _unlog(data, data_is_log, log_base=2.0, pseudocount=0.0):
        """Convert log-transformed data back to linear scale for FC calc."""
        if not data_is_log:
            return data

        # data are log_base(x + pseudocount)
        with np.errstate(over='ignore', invalid='ignore'):
            if log_base == 2.0:
                lin = np.power(2.0, data) - pseudocount
            elif log_base == np.e:
                lin = np.exp(data) - pseudocount
            else:
                lin = np.power(log_base, data) - pseudocount

        # Clamp small negatives due to numerical noise
        lin[lin < 0] = 0.0
        return lin

    # identify sample indices for each group
    def filter_indices(adata, filters):
        """Return sample indices matching a list of dict filters."""
        mask = np.ones(len(adata), dtype=bool)
        for f in filters:
            for col, val in f.items():
                mask &= (adata.obs[col].astype(str) == str(val))
        return np.where(mask)[0]

    # create readable labels for groups
    def _label_group(filters):
        # filters is a list of dicts; we want one dict describing that group
        d = filters[0] if isinstance(filters, list) else filters
        return "_".join(str(v) for v in d.values())

    if values is None:
        raise ValueError("Please supply `values` (2 groups) for DE.")

    if len(values) != 2:
        raise ValueError("`values` must contain exactly two group definitions.")

    if values[0] == values[1]:
        raise ValueError("Both groups in `values` refer to the same condition. Please provide two distinct groups.")

    # convert values to standardized dict format
    if isinstance(values[0], dict):
        group1_filters = [values[0]]
        group2_filters = [values[1]]
    else:
        if class_type is None:
            raise ValueError("class_type must be provided for legacy DE format.")
        group1_filters = to_dict_list(class_type, values[0])
        group2_filters = to_dict_list(class_type, values[1])


    idx1 = filter_indices(adata, group1_filters)
    idx2 = filter_indices(adata, group2_filters)

    if len(idx1) == 0 or len(idx2) == 0:
        raise ValueError("One of the groups has zero samples.")

    # extract matrices
    if layer == "X":
        X = adata.X
    else:
        if layer not in adata.layers:
            raise KeyError(f"Layer '{layer}' not found in adata.layers.")
        X = adata.layers[layer]

    X = X.toarray() if sparse.issparse(X) else np.asarray(X)
    data1 = X[idx1, :]
    data2 = X[idx2, :]

    data1_fc = _unlog(data1, data_is_log=data_is_log, log_base=log_base, pseudocount=pseudocount)
    data2_fc = _unlog(data2, data_is_log=data_is_log, log_base=log_base, pseudocount=pseudocount)

    # log2FC computation

    if fold_change_mode == 'mean':
        with np.errstate(all='ignore'):
            m1 = np.nanmean(data1_fc, axis=0)
            m2 = np.nanmean(data2_fc, axis=0)
            mask_invalid = (m1 == 0) | (m2 == 0) | np.isnan(m1) | np.isnan(m2)
            log2fc_vals = np.log2(m1 / m2)
            log2fc_vals[mask_invalid] = np.nan

    elif fold_change_mode == 'pairwise_median':
        mask_invalid = ( # Detect invalid features (any 0 or NaN in either group)
            np.any((data1 == 0) | np.isnan(data1), axis=0) |
            np.any((data2 == 0) | np.isnan(data2), axis=0)
        )
        # Compute median pairwise log2FC
        log2fc_vals = pairwise_log2fc(data1, data2)
        log2fc_vals[mask_invalid] = np.nan # Mark invalid features as NaN
        n_invalid = np.sum(mask_invalid)
        if n_invalid > 0:
            print(f"{format_log_prefix('info',2)} {n_invalid} proteins were not comparable (zero or NaN mean in one group).")

    else:
        raise ValueError(f"Unsupported fold_change_mode '{fold_change_mode}'")

    # statistical test

    pvals = []
    stats = []

    for i in range(X.shape[1]):
        x1, x2 = data1[:, i], data2[:, i]
        if method not in {"ttest", "mannwhitneyu", "wilcoxon"}:
            raise ValueError(f"Unsupported method '{method}'")

        try:
            if method == 'ttest':
                res = ttest_ind(x1, x2, nan_policy='omit')
            elif method == 'mannwhitneyu':
                res = mannwhitneyu(x1, x2, alternative='two-sided')
            elif method == 'wilcoxon':
                res = wilcoxon(x1, x2)
            pvals.append(res.pvalue)
            stats.append(res.statistic)
        except Exception:
            pvals.append(np.nan)
            stats.append(np.nan)


    pvals = np.array(pvals)
    neglog10 = -np.log10(np.where(pvals == 0, np.nan, pvals))

    # mean abundance
    mean1 = np.nanmean(data1, axis=0)
    mean2 = np.nanmean(data2, axis=0)

    group1_label = _label_group(group1_filters)
    group2_label = _label_group(group2_filters)

    # assemble DataFrame (pAnnData-compatible)
    df = pd.DataFrame(index=adata.var_names)

    if gene_col is not None:
        # User-specified or default "Genes"
        if gene_col in adata.var.columns:
            df["Genes"] = adata.var[gene_col].astype(str).values
        else:
            raise KeyError(
                f"Requested gene_col='{gene_col}', but this column is not in adata.var.\n"
                f"Available columns: {list(adata.var.columns)}"
            )
    else:
        # Fallback logic: use adata.var['Genes'] if it exists
        if "Genes" in adata.var.columns:
            df["Genes"] = adata.var["Genes"].astype(str).values
        else:
            df["Genes"] = adata.var_names.astype(str)

    df[group1_label] = mean1
    df[group2_label] = mean2
    df["log2fc"] = log2fc_vals
    df["p_value"] = pvals
    df["test_statistic"] = stats
    df["-log10(p_value)"] = neglog10
    df["significance_score"] = df["-log10(p_value)"] * df["log2fc"]

    # significance classification
    df["significance"] = "not significant"
    df.loc[df["log2fc"].isna(), "significance"] = "not comparable"
    df.loc[(df["p_value"] < pval) & (df["log2fc"] > log2fc), "significance"] = "upregulated"
    df.loc[(df["p_value"] < pval) & (df["log2fc"] < -log2fc), "significance"] = "downregulated"

    df["significance"] = pd.Categorical(
        df["significance"],
        categories=["upregulated", "downregulated", "not significant", "not comparable"],
        ordered=True,
    )

    # group labels for plotting annotation
    df.attrs["group1_label"] = group1_label
    df.attrs["group2_label"] = group2_label

    return df

filter

filter(
    pdata: pAnnData | AnnData,
    class_type: str | list[str],
    values: dict[str, Any] | list[Any] | str,
    exact_cases: bool = False,
    debug: bool = False,
) -> pAnnData | ad.AnnData

Legacy-style filtering of samples in pAnnData or AnnData objects.

This function filters samples based on metadata values using the older (class_type, values) interface. For pAnnData objects, it automatically delegates to .filter_sample_values() after converting the input into the recommended dictionary-style format.

Warning

For pAnnData users, prefer .filter_sample_values() with dictionary-style input, as it is more flexible and consistent. The filter() utility is retained primarily for backward compatibility and direct AnnData usage.

Parameters:

Name Type Description Default
pdata pAnnData or AnnData

Input data object to filter.

required
class_type str or list of str

Metadata field(s) in .obs to filter on. Example: "treatment", or ["cell_type", "treatment"].

required
values list, dict, or list of dict

Metadata values to match. - If exact_cases=False: Provide a dictionary or list-of-values per class. - If exact_cases=True: Provide a list of dictionaries specifying exact combinations across fields.

required
exact_cases bool

Whether to interpret values as exact combinations (AND logic). Defaults to False, which applies OR logic within each class type.

False
debug bool

If True, print the query string used for filtering.

False

Returns:

Name Type Description
filtered pAnnData or AnnData

A filtered object of the same type as pdata.

Raises:

Type Description
ValueError

If input types are invalid, if fields are missing in .obs, or if values format does not match exact_cases.

Example

Filter samples by a single metadata field:

samples = utils.filter(pdata, class_type="treatment", values="kd")

Filter by multiple fields with OR logic:

samples = utils.filter(
        adata,
        class_type=["cell_type", "treatment"],
        values=[["wt", "kd"], ["control", "treatment"]]
    ) 
# returns samples where cell_type is either 'wt' or 'kd' and treatment is either 'control' or 'treatment'

Filter by exact case combinations:

samples = utils.filter(
        adata,
        class_type=["cell_type", "treatment"],
        values=[{"cell_type": "wt", "treatment": "control"},
                {"cell_type": "kd", "treatment": "treatment"}],
        exact_cases=True
    )
# returns samples where cell_type is 'wt' and treatment is 'kd', or cell_type is 'control' and treatment is 'treatment'

Source code in src/scpviz/utils/class_filter.py
def filter(
    pdata: pAnnData | ad.AnnData,
    class_type: str | list[str],
    values: dict[str, Any] | list[Any] | str,
    exact_cases: bool = False,
    debug: bool = False,
) -> pAnnData | ad.AnnData:
    """
    Legacy-style filtering of samples in pAnnData or AnnData objects.

    This function filters samples based on metadata values using the older
    `(class_type, values)` interface. For pAnnData objects, it automatically
    delegates to `.filter_sample_values()` after converting the input into the
    recommended dictionary-style format.

    !!! warning

        For pAnnData users, prefer `.filter_sample_values()` with dictionary-style
        input, as it is more flexible and consistent. The `filter()` utility is
        retained primarily for backward compatibility and direct AnnData usage.


    Args:
        pdata (pAnnData or AnnData): Input data object to filter.
        class_type (str or list of str): Metadata field(s) in `.obs` to filter on.
            Example: `"treatment"`, or `["cell_type", "treatment"]`.
        values (list, dict, or list of dict): Metadata values to match.
            - If `exact_cases=False`: Provide a dictionary or list-of-values per class.
            - If `exact_cases=True`: Provide a list of dictionaries specifying
              exact combinations across fields.
        exact_cases (bool): Whether to interpret `values` as exact combinations (AND logic).
            Defaults to False, which applies OR logic within each class type.
        debug (bool): If True, print the query string used for filtering.

    Returns:
        filtered (pAnnData or AnnData): A filtered object of the same type as `pdata`.


    Raises:
        ValueError: If input types are invalid, if fields are missing in `.obs`,
            or if `values` format does not match `exact_cases`.

    Example:
        Filter samples by a single metadata field:
            ```python
            samples = utils.filter(pdata, class_type="treatment", values="kd")
            ```

        Filter by multiple fields with OR logic: 
            ```python
            samples = utils.filter(
                    adata,
                    class_type=["cell_type", "treatment"],
                    values=[["wt", "kd"], ["control", "treatment"]]
                ) 
            # returns samples where cell_type is either 'wt' or 'kd' and treatment is either 'control' or 'treatment'
            ```

        Filter by exact case combinations:
            ```python 
            samples = utils.filter(
                    adata,
                    class_type=["cell_type", "treatment"],
                    values=[{"cell_type": "wt", "treatment": "control"},
                            {"cell_type": "kd", "treatment": "treatment"}],
                    exact_cases=True
                )
            # returns samples where cell_type is 'wt' and treatment is 'kd', or cell_type is 'control' and treatment is 'treatment'
            ```
    """

    if hasattr(pdata, "filter_sample_values"):
        warnings.warn(
            "You passed a pAnnData object to `filter()`. "
            "It is recommended to use `pdata.filter_sample_values()` directly.",
            UserWarning)

        print("UserWarning: It is recommended to use the class method `.filter_sample_values()` with dictionary-style input for cleaner and more consistent filtering.")

    formatted_values = format_class_filter(class_type, values, exact_cases)

    # pAnnData input
    if hasattr(pdata, "filter_sample_values"):
        return pdata.filter_sample_values(
            values=formatted_values,
            exact_cases=exact_cases,
            debug=debug,
            return_copy=True
        )

    # plain AnnData input
    elif isinstance(pdata, ad.AnnData):
        adata = pdata
        obs_keys = adata.obs.columns

        if exact_cases:
            if not isinstance(formatted_values, list) or not all(isinstance(v, dict) for v in formatted_values):
                raise ValueError("When exact_cases=True, `values` must be a list of dictionaries.")

            for case in formatted_values:
                if not case:
                    raise ValueError("Empty dictionary found in values.")
                for key in case:
                    if key not in obs_keys:
                        raise ValueError(f"Field '{key}' not found in adata.obs.")

            query = " | ".join([
                " & ".join([
                    f"(adata.obs['{k}'] == '{v}')" for k, v in case.items()
                ])
                for case in formatted_values
            ])

        else:
            if not isinstance(formatted_values, dict):
                raise ValueError("When exact_cases=False, `values` must be a dictionary.")

            for key in formatted_values:
                if key not in obs_keys:
                    raise ValueError(f"Field '{key}' not found in adata.obs.")

            query_parts = []
            for k, v in formatted_values.items():
                v_list = v if isinstance(v, list) else [v]
                part = " | ".join([f"(adata.obs['{k}'] == '{val}')" for val in v_list])
                query_parts.append(f"({part})")
            query = " & ".join(query_parts)

        if debug:
            print(f"Filter query: {query}")

        return adata[eval(query)]

    else:
        raise ValueError("Input must be a pAnnData or AnnData object.")

format_class_filter

format_class_filter(
    classes: str | list[str],
    class_value: str | list[str] | list[list[str]],
    exact_cases: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]

Convert legacy-style filter input into dictionary-style format.

This function standardizes (classes, class_value) input into the dictionary format expected by pAnnData.filter_sample_values(). It supports both loose OR-style filtering and exact case matching across multiple metadata fields.

Parameters:

Name Type Description Default
classes str or list of str

Metadata field(s) to filter on. Example: "treatment" or ["cellline", "treatment"].

required
class_value str, list of str, or list of list

Values to filter by. - str: May be underscore-joined (e.g. "kd_AS"). - list of str: Multiple values, interpreted as OR (if exact_cases=False) or split into combinations (if exact_cases=True). - list of list: Each inner list defines a full set of values across classes.

required
exact_cases bool

If True, return a list of dictionaries representing exact combinations across fields. If False, return a dictionary with OR logic applied.

False

Returns:

Name Type Description
formatted dict or list of dict

Dictionary-style filter input compatible

dict[str, Any] | list[dict[str, Any]]

with .filter_sample_values().

Raises:

Type Description
ValueError

If input shapes are inconsistent with the number of classes, or if class_value entries are not valid strings/lists.

Example

Single class with OR logic:

format_class_filter("treatment", ["kd", "sc"])
{'treatment': ['kd', 'sc']}

Multiple classes with loose matching:

format_class_filter(["cellline", "treatment"], ["AS", "kd"])
{'cellline': 'AS', 'treatment': 'kd'}

Multiple classes with exact cases (underscore-joined strings):

format_class_filter(
    ["cellline", "treatment"],
    ["AS_kd", "BE_sc"],
    exact_cases=True
)
[{'cellline': 'AS', 'treatment': 'kd'},
 {'cellline': 'BE', 'treatment': 'sc'}]

Multiple classes with exact cases (list of lists):

format_class_filter(
    ["cellline", "treatment"],
    [["AS", "kd"], ["BE", "sc"]],
    exact_cases=True
)
# [{'cellline': 'AS', 'treatment': 'kd'},
 {'cellline': 'BE', 'treatment': 'sc'}]

Note

This function is primarily used internally by utils.filter() and pAnnData.filter_sample_values(). End users should generally call .filter_sample_values() directly on pAnnData objects instead of using this helper.

Source code in src/scpviz/utils/class_filter.py
def format_class_filter(
    classes: str | list[str],
    class_value: str | list[str] | list[list[str]],
    exact_cases: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
    """
    Convert legacy-style filter input into dictionary-style format.

    This function standardizes `(classes, class_value)` input into the dictionary
    format expected by `pAnnData.filter_sample_values()`. It supports both loose
    OR-style filtering and exact case matching across multiple metadata fields.

    Args:
        classes (str or list of str): Metadata field(s) to filter on.
            Example: `"treatment"` or `["cellline", "treatment"]`.
        class_value (str, list of str, or list of list): Values to filter by.
            - str: May be underscore-joined (e.g. `"kd_AS"`).
            - list of str: Multiple values, interpreted as OR (if `exact_cases=False`)
              or split into combinations (if `exact_cases=True`).
            - list of list: Each inner list defines a full set of values across classes.
        exact_cases (bool): If True, return a list of dictionaries representing
            exact combinations across fields. If False, return a dictionary with
            OR logic applied.

    Returns:
        formatted (dict or list of dict): Dictionary-style filter input compatible
        with `.filter_sample_values()`.

    Raises:
        ValueError: If input shapes are inconsistent with the number of classes,
            or if `class_value` entries are not valid strings/lists.

    Example:
        Single class with OR logic:
            ```python
            format_class_filter("treatment", ["kd", "sc"])
            ```
            ```
            {'treatment': ['kd', 'sc']}
            ```

        Multiple classes with loose matching:
            ```python
            format_class_filter(["cellline", "treatment"], ["AS", "kd"])
            ```
            ```
            {'cellline': 'AS', 'treatment': 'kd'}
            ```

        Multiple classes with exact cases (underscore-joined strings):
            ```python
            format_class_filter(
                ["cellline", "treatment"],
                ["AS_kd", "BE_sc"],
                exact_cases=True
            )
            ```
            ```
            [{'cellline': 'AS', 'treatment': 'kd'},
             {'cellline': 'BE', 'treatment': 'sc'}]
            ```

        Multiple classes with exact cases (list of lists):
            ```python 
            format_class_filter(
                ["cellline", "treatment"],
                [["AS", "kd"], ["BE", "sc"]],
                exact_cases=True
            )
            ```
            ```
            # [{'cellline': 'AS', 'treatment': 'kd'},
             {'cellline': 'BE', 'treatment': 'sc'}]
            ```

    !!! warning "Note"

        This function is primarily used internally by `utils.filter()` and
        `pAnnData.filter_sample_values()`. End users should generally call
        `.filter_sample_values()` directly on `pAnnData` objects instead of
        using this helper.
    """

    if isinstance(classes, str):
        # Simple case: one class
        if isinstance(class_value, list) and exact_cases:
            return [{classes: val} for val in class_value]
        else:
            return {classes: class_value}

    elif isinstance(classes, list):
        if exact_cases:
            if isinstance(class_value, str):
                class_value = [class_value]

            formatted = []
            for entry in class_value:
                if isinstance(entry, str):
                    values = entry.split('_')
                elif isinstance(entry, list):
                    values = entry
                else:
                    raise ValueError("Each class_value entry must be a string or a list.")

                if len(values) != len(classes):
                    raise ValueError("Each class_value entry must match the number of classes. Check that group/class labels did not contain unintentional underscores ('_').")
                formatted.append({cls: val for cls, val in zip(classes, values)})

            return formatted

        else:
            # loose match — OR within each class
            if isinstance(class_value, str):
                values = class_value.split('_')
            else:
                values = class_value
            if len(values) != len(classes):
                raise ValueError("class_value must align with the number of classes. Check that group/class labels did not contain unintentional underscores ('_').")
            return {cls: val for cls, val in zip(classes, values)}

    else:
        raise ValueError("Invalid input: `classes` should be a string or list of strings.")

get_abundance

get_abundance(
    pdata: pAnnData | AnnData, *args: Any, **kwargs: Any
) -> pd.DataFrame

Wrapper to extract abundance from either pAnnData or AnnData.

This is a convenience wrapper that dispatches to the appropriate method: - If pdata is a pAnnData object, it calls pdata.get_abundance(). - If pdata is an AnnData object, it falls back to the internal helper _get_abundance_from_adata.

Parameters:

Name Type Description Default
pdata pAnnData or AnnData

Input object to extract abundance from.

required
*args Any

Positional arguments forwarded to get_abundance.

()
**kwargs Any

Keyword arguments forwarded to get_abundance.

{}
Note

See pAnnData.get_abundance for full parameter documentation. Briefly,

- namelist (list of str, optional): List of accessions or gene names to extract.
- layer (str): Data layer name (default = "X").
- on (str): "protein" or "peptide".
- classes (str or list of str, optional): Sample-level `.obs` column(s) to include.
- log (bool): If True, applies log2 transform to abundance values.
- x_label (str): Label features by "gene" or "accession".

Returns:

Name Type Description
df DataFrame

Long-form abundance DataFrame, optionally with

DataFrame

sample metadata and protein/peptide annotations.

See Also
  • :func:pAnnData.get_abundance (EditingMixin): Full-featured version with detailed docs.
  • get_adata_layer: Helper to access abundance matrices from AnnData layers.
Source code in src/scpviz/utils/data.py
def get_abundance(pdata: pAnnData | ad.AnnData, *args: Any, **kwargs: Any) -> pd.DataFrame:
    """
    Wrapper to extract abundance from either pAnnData or AnnData.

    This is a convenience wrapper that dispatches to the appropriate method:
    - If `pdata` is a `pAnnData` object, it calls `pdata.get_abundance()`.
    - If `pdata` is an `AnnData` object, it falls back to the internal
      helper `_get_abundance_from_adata`.

    Args:
        pdata (pAnnData or anndata.AnnData): Input object to extract abundance from.
        *args: Positional arguments forwarded to `get_abundance`.
        **kwargs: Keyword arguments forwarded to `get_abundance`.

    Note:
        See `pAnnData.get_abundance` for full parameter documentation. Briefly,

            - namelist (list of str, optional): List of accessions or gene names to extract.
            - layer (str): Data layer name (default = "X").
            - on (str): "protein" or "peptide".
            - classes (str or list of str, optional): Sample-level `.obs` column(s) to include.
            - log (bool): If True, applies log2 transform to abundance values.
            - x_label (str): Label features by "gene" or "accession".

    Returns:
        df (pandas.DataFrame): Long-form abundance DataFrame, optionally with
        sample metadata and protein/peptide annotations.

    See Also:
        - :func:`pAnnData.get_abundance` (EditingMixin): Full-featured version with detailed docs.
        - get_adata_layer: Helper to access abundance matrices from AnnData layers.
    """
    if hasattr(pdata, "get_abundance"):
        return pdata.get_abundance(*args, **kwargs)
    import scpviz.utils as _u

    return _u._get_abundance_from_adata(pdata, *args, **kwargs)

get_adata

get_adata(
    pdata: pAnnData, on: str = "protein"
) -> ad.AnnData

Retrieve the protein- or peptide-level AnnData object from a pAnnData container.

Parameters:

Name Type Description Default
pdata pAnnData

The parent pAnnData object containing both protein- and peptide-level data.

required
on str

Which data object to return.
- "protein": return pdata.prot
- "peptide": return pdata.pep

'protein'

Returns:

Name Type Description
adata AnnData

The requested AnnData object.

Source code in src/scpviz/utils/data.py
def get_adata(pdata: pAnnData, on: str = "protein") -> ad.AnnData:
    """
    Retrieve the protein- or peptide-level AnnData object from a pAnnData container.

    Args:
        pdata (pAnnData): The parent pAnnData object containing both protein- and peptide-level data.

        on (str): Which data object to return.  
            - `"protein"`: return `pdata.prot`  
            - `"peptide"`: return `pdata.pep`  

    Returns:
        adata (anndata.AnnData): The requested AnnData object.
    """

    if on in ('protein','prot'):
        return pdata.prot
    elif on in ('peptide','pep'):
        return pdata.pep
    else:
        raise ValueError("Invalid value for 'on'. Options are 'protein' or 'peptide'.")

get_adata_layer

get_adata_layer(adata: AnnData, layer: str) -> np.ndarray

Safely extract layer data as dense numpy array.

This helper returns the requested layer as a dense numpy.ndarray, ensuring compatibility for downstream operations. Supports both .X and .layers[...].

Parameters:

Name Type Description Default
adata AnnData

AnnData object containing data matrices.

required
layer str

Layer key.
- "X": return the main data matrix.
- any other str: return the corresponding entry from .layers. E.g. "X_norm"

required

Returns:

Name Type Description
data ndarray

Dense matrix representation of the requested layer.

Source code in src/scpviz/utils/data.py
def get_adata_layer(adata: ad.AnnData, layer: str) -> np.ndarray:
    """
    Safely extract layer data as dense numpy array.

    This helper returns the requested layer as a dense `numpy.ndarray`,
    ensuring compatibility for downstream operations. Supports both
    `.X` and `.layers[...]`.

    Args:
        adata (anndata.AnnData): AnnData object containing data matrices.

        layer (str): Layer key.  
            - `"X"`: return the main data matrix.  
            - any other str: return the corresponding entry from `.layers`. E.g. "X_norm"

    Returns:
        data (numpy.ndarray): Dense matrix representation of the requested layer.
    """
    if layer == "X":
        data = adata.X
    elif layer in adata.layers:
        data = adata.layers[layer]
    else:
        raise ValueError(f"Layer '{layer}' not found in .layers and is not 'X'.")

    return data.toarray() if hasattr(data, 'toarray') else data

get_classlist

get_classlist(
    adata: AnnData,
    classes: str | list[str] | None = None,
    order: list[str] | None = None,
) -> list[str]

Retrieve unique class values for specified metadata columns. Useful for plot legends.

Unlike get_samplenames, which returns one identifier per row/sample, this function extracts the set of unique class values for grouping purposes (e.g., plotting categories). Supports optional reordering.

Parameters:

Name Type Description Default
adata AnnData

AnnData object containing sample metadata.

required
classes str or list of str

Column(s) in .obs to use.

  • None: combine all metadata columns up to the first _quant column.
  • str: return unique values from one column.
  • list of str: return unique combined values across multiple columns.
None
order list of str

Custom order of categories. Must exactly match the unique values; otherwise, a ValueError is raised.

None

Returns:

Name Type Description
class_list list of str

Unique class values in .obs, optionally reordered.

Raises:

Type Description
ValueError

If invalid columns are provided, or if order does not

Example

Get unique values from one metadata column:

classes = get_classlist(adata, classes="cell_type")

Combine two columns and return unique class labels:

classes = get_classlist(adata, classes=["cell_type", "treatment"])

Reorder categories explicitly:

classes = get_classlist(
    adata, classes="cell_type", order=["A", "B", "C"]
    )

Source code in src/scpviz/utils/data.py
def get_classlist(
    adata: ad.AnnData,
    classes: str | list[str] | None = None,
    order: list[str] | None = None,
) -> list[str]:
    """
    Retrieve unique class values for specified metadata columns. Useful 
    for plot legends.

    Unlike `get_samplenames`, which returns one identifier per row/sample,
    this function extracts the set of unique class values for grouping
    purposes (e.g., plotting categories). Supports optional reordering.

    Args:
        adata (anndata.AnnData): AnnData object containing sample metadata.

        classes (str or list of str, optional): Column(s) in `.obs` to use.

            - None: combine all metadata columns up to the first `_quant` column.  
            - str: return unique values from one column.  
            - list of str: return unique combined values across multiple columns.  

        order (list of str, optional): Custom order of categories. Must exactly
            match the unique values; otherwise, a `ValueError` is raised.

    Returns:
        class_list (list of str): Unique class values in `.obs`, optionally reordered.

    Raises:
        ValueError: If invalid columns are provided, or if `order` does not
        match the unique class list.

    Example:
        Get unique values from one metadata column:
            ```python
            classes = get_classlist(adata, classes="cell_type")
            ```

        Combine two columns and return unique class labels:
            ```python
            classes = get_classlist(adata, classes=["cell_type", "treatment"])
            ```

        Reorder categories explicitly:
            ```python
            classes = get_classlist(
                adata, classes="cell_type", order=["A", "B", "C"]
                )
            ```

    Related Functions:
        get_samplenames: Return per-sample names (not unique class values).
    """

    if classes is None:
        # combine all .obs columns per row into one string
        # NOTE: might break, should use better method to filter out file-related columns
        quant_col_index = adata.obs.columns.get_loc(next(col for col in adata.obs.columns if "_quant" in col))
        selected_columns = adata.obs.iloc[:, :quant_col_index]
        classes_list = selected_columns.apply(lambda x: "_".join(x.astype(str)), axis=1).unique()
        classes = selected_columns.columns.tolist()
    elif isinstance(classes, str):
        # check if classes is one of the columns of adata.obs
        if classes not in adata.obs.columns:
            raise ValueError(f"Invalid value for 'classes'. '{classes}' is not a column in adata.obs.")
        classes_list = adata.obs[classes].unique()
    elif isinstance(classes, list):
        # check if list has length 1
        if len(classes) == 1:
            classes_list = adata.obs[classes[0]].unique()
        # check if all classes are columns of adata.obs
        else:
            if not all([c in adata.obs.columns for c in classes]):
                raise ValueError(f"Invalid value for 'classes'. Not all elements in '{classes}' are columns in adata.obs.")
            classes_list = adata.obs[classes].apply(lambda x: "_".join(x.astype(str)), axis=1).unique()
    else:
        raise ValueError("Invalid value for 'classes'. Must be None, a string or a list of strings.")

    if isinstance(classes_list, str):
        classes_list = [classes_list]
    if isinstance(order, str):
        order = [order]

    if order is not None:
        # check if order list matches classes_list
        missing_elements = set(classes_list) - set(order)
        extra_elements = set(order) - set(classes_list)
        # Print missing and extra elements if any
        if missing_elements or extra_elements:
            if missing_elements:
                print(f"Missing elements in 'order': {missing_elements}")
            if extra_elements:
                print(f"Extra elements in 'order': {extra_elements}")
            raise ValueError("The 'order' list does not match 'classes_list'.")
        # if they match, then reorder classes_list to match order
        classes_list = order

    return classes_list

get_pca_importance

get_pca_importance(
    model: dict[str, Any] | PCA,
    initial_feature_names: list[str],
    n: int = 1,
) -> pd.DataFrame

Identify the most important features for each principal component.

This function ranks features by their absolute PCA loading values and extracts the top contributors for each principal component.

Parameters:

Name Type Description Default
model PCA or dict

Either a fitted PCA model from scikit-learn, or a dictionary with key "PCs" (array-like, shape: (n_components, n_features)).

required
initial_feature_names list of str

Names of the features, typically adata.var_names.

required
n int

Number of top features to return per principal component (default = 1).

1

Returns:

Name Type Description
df DataFrame

DataFrame with one row per principal component,

DataFrame

listing the top contributing features.

Example

Retrieve the top 5 features contributing to each PC:

from scpviz import utils as scutils
pdata.pca(n_components=5)
df = scutils.get_pca_importance(
    pdata.prot.uns['pca'],
    pdata.prot.var_names,
    n=5
)

Source code in src/scpviz/utils/stats.py
def get_pca_importance(
    model: dict[str, Any] | PCA,
    initial_feature_names: list[str],
    n: int = 1,
) -> pd.DataFrame:
    """
    Identify the most important features for each principal component.

    This function ranks features by their absolute PCA loading values and
    extracts the top contributors for each principal component.

    Args:
        model (sklearn.decomposition.PCA or dict): Either a fitted PCA model
            from scikit-learn, or a dictionary with key `"PCs"`
            (array-like, shape: `(n_components, n_features)`).
        initial_feature_names (list of str): Names of the features, typically
            `adata.var_names`.
        n (int): Number of top features to return per principal component
            (default = 1).

    Returns:
        df (pandas.DataFrame): DataFrame with one row per principal component,
        listing the top contributing features.

    Example:
        Retrieve the top 5 features contributing to each PC:
            ```python
            from scpviz import utils as scutils
            pdata.pca(n_components=5)
            df = scutils.get_pca_importance(
                pdata.prot.uns['pca'],
                pdata.prot.var_names,
                n=5
            )
            ```
    """

    if isinstance(model, dict):
        pcs = np.asarray(model["PCs"])  # shape: n_components x n_features
    else:
        pcs = np.asarray(model.components_)  # shape: n_components x n_features

    n_pcs = pcs.shape[0]

    most_important = [
        np.abs(pcs[i]).argsort()[-n:][::-1] for i in range(n_pcs)
    ]
    most_important_names = [
        [initial_feature_names[idx] for idx in row] for row in most_important
    ]

    result = {
        f"PC{i + 1}": most_important_names[i] for i in range(n_pcs)
    }
    df = pd.DataFrame(result.items(), columns=["Principal Component", "Top Features"])
    return df

get_pep_prot_mapping

get_pep_prot_mapping(
    pdata: pAnnData, return_series: Literal[False] = False
) -> str
get_pep_prot_mapping(
    pdata: pAnnData, return_series: Literal[True]
) -> pd.Series
get_pep_prot_mapping(
    pdata: pAnnData, return_series: bool = False
) -> str | pd.Series

Retrieve the peptide-to-protein mapping column or mapping values.

This function resolves the appropriate .pep.var column for peptide-to-protein mapping based on the data source recorded in pdata.metadata["source"].

Parameters:

Name Type Description Default
pdata pAnnData

The annotated proteomics object containing .metadata and .pep.

required
return_series bool

If True, return a pandas Series of peptide-to-protein mappings. If False (default), return the column name as a string.

False

Returns:

Name Type Description
col str

Column name in .pep.var containing peptide-to-protein mapping,

str | Series

if return_series=False.

mapping Series

Series mapping peptides to proteins,

str | Series

if return_series=True.

Raises:

Type Description
ValueError

If the data source is unrecognized or no valid mapping column is found.

Note

The mapping column depends on the import source:

  • Proteome Discoverer → "Master Protein Accessions"
  • DIA-NN → "Protein.Group"
  • MaxQuant → "Leading razor protein"
Source code in src/scpviz/utils/data.py
def get_pep_prot_mapping(
    pdata: pAnnData, return_series: bool = False
) -> str | pd.Series:
    """
    Retrieve the peptide-to-protein mapping column or mapping values.

    This function resolves the appropriate `.pep.var` column for peptide-to-protein
    mapping based on the data source recorded in `pdata.metadata["source"]`.

    Args:
        pdata (pAnnData): The annotated proteomics object containing `.metadata` and `.pep`.
        return_series (bool): If True, return a pandas Series of peptide-to-protein
            mappings. If False (default), return the column name as a string.

    Returns:
        col (str): Column name in `.pep.var` containing peptide-to-protein mapping,
        if `return_series=False`.
        mapping (pandas.Series): Series mapping peptides to proteins,
        if `return_series=True`.

    Raises:
        ValueError: If the data source is unrecognized or no valid mapping column is found.

    Note:
        The mapping column depends on the import source:

        - Proteome Discoverer → `"Master Protein Accessions"`
        - DIA-NN → `"Protein.Group"`
        - MaxQuant → `"Leading razor protein"`
    """
    source = pdata.metadata.get("source", "").lower()

    if source == "proteomediscoverer":
        col = "Master Protein Accessions"
    elif source == "diann":
        col = "Protein.Group"
    elif source == "maxquant":
        col = "Leading razor protein"
    else:
        raise ValueError(f"Unknown data source '{source}' — cannot determine peptide-to-protein mapping.")

    if return_series:
        return pdata.pep.var[col]

    return col

get_protein_clusters

get_protein_clusters(
    pdata: pAnnData,
    on: str = "prot",
    layer: str = "X",
    t: int = 5,
    criterion: str = "maxclust",
) -> dict[Any, list[str]] | None

Retrieve hierarchical clusters of proteins from stored linkage.

This function uses linkage information stored in pdata.stats to partition proteins into clusters.

Parameters:

Name Type Description Default
pdata pAnnData

Input object containing .stats with clustering results.

required
on str

Data level to use, "prot" (default) or "pep".

'prot'
layer str

Data layer name used when the linkage was computed (default = "X").

'X'
t int or float

Number of clusters (if criterion="maxclust") or distance threshold for clustering.

5
criterion str

Clustering criterion passed to scipy.cluster.hierarchy.fcluster, e.g. "maxclust" or "distance".

'maxclust'

Returns:

Name Type Description
clusters dict

Mapping of cluster_id → list of proteins.

None dict[Any, list[str]] | None

If no linkage is found in pdata.stats.

Note

Requires that a clustermap has been previously computed and linkage stored under pdata.stats[f"{on}_{layer}_clustermap"].

Source code in src/scpviz/utils/stats.py
def get_protein_clusters(
    pdata: pAnnData,
    on: str = "prot",
    layer: str = "X",
    t: int = 5,
    criterion: str = "maxclust",
) -> dict[Any, list[str]] | None:
    """
    Retrieve hierarchical clusters of proteins from stored linkage.

    This function uses linkage information stored in `pdata.stats` to
    partition proteins into clusters.

    Args:
        pdata (pAnnData): Input object containing `.stats` with clustering results.
        on (str): Data level to use, `"prot"` (default) or `"pep"`.
        layer (str): Data layer name used when the linkage was computed (default = `"X"`).
        t (int or float): Number of clusters (if `criterion="maxclust"`) or distance
            threshold for clustering.
        criterion (str): Clustering criterion passed to `scipy.cluster.hierarchy.fcluster`,
            e.g. `"maxclust"` or `"distance"`.

    Returns:
        clusters (dict): Mapping of `cluster_id → list of proteins`.
        None: If no linkage is found in `pdata.stats`.

    Note:
        Requires that a clustermap has been previously computed and linkage
        stored under `pdata.stats[f"{on}_{layer}_clustermap"]`.

    Related Functions:
        - plot_clustermap: Generates clustered heatmaps and stores linkage.
    """
    from scipy.cluster.hierarchy import fcluster

    key = f"{on}_{layer}_clustermap"
    stats = pdata.stats.get(key)
    if not stats or "row_linkage" not in stats:
        print(f"No linkage found for {key} in pdata.stats.")
        return None

    linkage = stats["row_linkage"]
    labels = fcluster(linkage, t=t, criterion=criterion)
    order = stats["row_order"]

    from collections import defaultdict
    clusters = defaultdict(list)
    for label, prot in zip(labels, order):
        clusters[label].append(prot)

    return dict(clusters)

get_samplenames

get_samplenames(
    adata: AnnData, classes: str | list[str] | None
) -> list[str] | None

Retrieve sample names for specified class values.

This function resolves .obs metadata into sample-level identifiers (one name per row). It is typically used for plotting functions where sample names are required for labeling or grouping.

Parameters:

Name Type Description Default
adata AnnData

AnnData object containing sample metadata.

required
classes str or list of str

Column(s) in .obs used to build sample names.

  • str: return vlaues from a single column.
  • list of str: combine multiple columns per row with ", ".
required

Returns:

Name Type Description
sample_names list of str

Sample names dervied from .obs.

Example

Get sample names from a single metadata column:

samples = get_samplenames(adata, "cell_type")

Combine multiple columns into sample identifiers:

samples = get_samplenames(adata, ["cell_type", "treatment"])

Source code in src/scpviz/utils/data.py
def get_samplenames(
    adata: ad.AnnData, classes: str | list[str] | None
) -> list[str] | None:
    """
    Retrieve sample names for specified class values.

    This function resolves `.obs` metadata into sample-level identifiers
    (one name per row). It is typically used for plotting functions where
    sample names are required for labeling or grouping.

    Args:
        adata (anndata.AnnData): AnnData object containing sample metadata.

        classes (str or list of str): Column(s) in `.obs` used to build sample names.

            - str: return vlaues from a single column.
            - list of str: combine multiple columns per row with `", "`.

    Returns:
        sample_names (list of str): Sample names dervied from `.obs`.

    Example:
        Get sample names from a single metadata column:
            ```python
            samples = get_samplenames(adata, "cell_type")
            ```

        Combine multiple columns into sample identifiers:
            ```python
            samples = get_samplenames(adata, ["cell_type", "treatment"])
            ```

    Related Functions:
        get_classlist: Return unique class values (not per-sample names).
    """
    if classes is None:
        return None
    elif isinstance(classes, str):
        return adata.obs[classes].values.tolist()
    elif isinstance(classes, list):
        return adata.obs[classes].apply(lambda row: ', '.join(row.values.astype(str)), axis=1).values.tolist()
    else:
        raise ValueError("Invalid input for 'classes'. It should be None, a string, or a list of strings.")

get_string_mappings

get_string_mappings(
    identifiers: list[str],
    use_uniprot: bool = True,
    use_string: bool = True,
    caller_identity: str = "scpviz",
    batch_size: int = 100,
    debug: bool = False,
) -> pd.DataFrame

Resolve STRING identifiers for a list of UniProt accessions.

This function maps UniProt protein accessions to STRING IDs using a two-step strategy:

  1. UniProt lookup – retrieves STRING cross-references (xref_string) and organism IDs via the UniProt API (fast).
  2. STRING API lookup – queries the STRING get_string_ids endpoint for any identifiers not resolved via UniProt.

Parameters:

Name Type Description Default
identifiers list of str

List of UniProt accession IDs to map.

required
use_uniprot bool

If True (default), attempt mapping via UniProt xref_string and organism_id fields.

True
use_string bool

If True (default), query the STRING API for any identifiers still unresolved after the UniProt step.

True
caller_identity str

Identifier passed to the STRING API (default: "scpviz").

'scpviz'
batch_size int

Number of identifiers per batch when querying external APIs (default=100).

100
debug bool

If True, print progress and debug information.

False

Returns:

Type Description
DataFrame

pandas.DataFrame: Mapping table with one row per input identifier and

DataFrame

the following columns:

DataFrame
  • input_identifier: UniProt accession provided as input
DataFrame
  • string_identifier: Corresponding STRING ID (if resolved)
DataFrame
  • ncbi_taxon_id: NCBI taxonomy ID inferred from UniProt or STRING
Example

Map a small set of UniProt accessions to STRING IDs:

proteins = ["P40925", "P40926"]
df = get_string_mappings(proteins)
df

Disable the UniProt shortcut and query STRING directly (takes longer than UniProt):

df = get_string_mappings(proteins, use_uniprot=False)

Source code in src/scpviz/utils/id_maps.py
def get_string_mappings(
    identifiers: list[str],
    use_uniprot: bool = True,
    use_string: bool = True,
    caller_identity: str = "scpviz",
    batch_size: int = 100,
    debug: bool = False,
) -> pd.DataFrame:
    """
    Resolve STRING identifiers for a list of UniProt accessions.

    This function maps UniProt protein accessions to STRING IDs using a
    two-step strategy:

    1. **UniProt lookup** – retrieves STRING cross-references (`xref_string`)
       and organism IDs via the UniProt API (fast).
    2. **STRING API lookup** – queries the STRING `get_string_ids` endpoint
       for any identifiers not resolved via UniProt.

    Args:
        identifiers (list of str): List of UniProt accession IDs to map.
        use_uniprot (bool): If True (default), attempt mapping via UniProt
            `xref_string` and `organism_id` fields.
        use_string (bool): If True (default), query the STRING API for any
            identifiers still unresolved after the UniProt step.
        caller_identity (str): Identifier passed to the STRING API
            (default: "scpviz").
        batch_size (int): Number of identifiers per batch when querying
            external APIs (default=100).
        debug (bool): If True, print progress and debug information.

    Returns:
        pandas.DataFrame: Mapping table with one row per input identifier and
        the following columns:

        - `input_identifier`: UniProt accession provided as input  
        - `string_identifier`: Corresponding STRING ID (if resolved)  
        - `ncbi_taxon_id`: NCBI taxonomy ID inferred from UniProt or STRING  

    Example:
        Map a small set of UniProt accessions to STRING IDs:
            ```python
            proteins = ["P40925", "P40926"]
            df = get_string_mappings(proteins)
            df
            ```

        Disable the UniProt shortcut and query STRING directly (takes longer than UniProt):
            ```python
            df = get_string_mappings(proteins, use_uniprot=False)
            ```

    Related Functions:
        - get_uniprot_fields: Retrieve UniProt metadata, including STRING cross-references.
        - pAnnData.EnrichmentMixin (enrichment_functional(), enrichment_ppi())
    """
    import scpviz.utils as _u

    ids = [str(x).strip() for x in identifiers if x is not None and str(x).strip()]
    if not ids:
        return pd.DataFrame(columns=["input_identifier", "string_identifier", "ncbi_taxon_id"])

    found: Dict[str, str] = {}
    species_map: Dict[str, object] = {}

    # Step 1: UniProt xref_string
    uni_df = pd.DataFrame(columns=["input_identifier", "string_identifier"])
    if use_uniprot:
        try:
            uni_df, uni_species = _u._uniprot_get_string_ids(
                ids, batch_size=batch_size, standardize=True, debug=debug
            )
            if not uni_df.empty:
                found.update(dict(zip(uni_df["input_identifier"], uni_df["string_identifier"])))
            species_map.update(uni_species)

            print(f"{format_log_prefix('api',2)} UniProt mapped: {len(uni_df)} / {len(ids)}")
        except Exception as e:
            print(f"{format_log_prefix('error')} UniProt stream step failed: {e}") 

    # Missing after UniProt
    missing = [i for i in ids if i not in found]

    # Step 2: STRING get_string_ids
    string_df = pd.DataFrame(columns=["input_identifier", "string_identifier", "ncbi_taxon_id"])
    if use_string and missing:
        try:
            string_df = _u._string_get_string_ids(
                missing, batch_size=batch_size, caller_identity=caller_identity, debug=debug
            )
            if not string_df.empty:
                found.update(dict(zip(string_df["input_identifier"], string_df["string_identifier"])))

            print(f"{format_log_prefix('api',2)} STRING mapped: {len(string_df)} / {len(missing)} (missing after UniProt)")
        except Exception as e:
            print(f"{format_log_prefix('error')} STRING stream step failed: {e}") 

    # Build output table
    out_df = pd.DataFrame({"input_identifier": ids})
    out_df["string_identifier"] = out_df["input_identifier"].map(found)

    # Taxon: prefer UniProt organism_id, then STRING ncbi_taxon_id
    tax_from_uniprot = out_df["input_identifier"].map(lambda a: species_map.get(a, pd.NA))
    tax_from_uniprot = tax_from_uniprot.apply(scalarize_taxon)

    if not string_df.empty and "ncbi_taxon_id" in string_df.columns:
        string_tax_map = dict(zip(string_df["input_identifier"], string_df["ncbi_taxon_id"]))
        tax_from_string = out_df["input_identifier"].map(lambda a: string_tax_map.get(a, pd.NA))
        tax_from_string = tax_from_string.apply(scalarize_taxon)
    else:
        tax_from_string = pd.Series([pd.NA] * len(out_df), index=out_df.index)

    out_df["ncbi_taxon_id"] = tax_from_uniprot.combine_first(tax_from_string)

    return out_df

get_uniprot_fields

get_uniprot_fields(
    prot_list: list[str],
    search_fields: list[str] | None = None,
    batch_size: int = 100,
    verbose: bool = True,
    standardize: bool = True,
    worker_verbose: bool = False,
) -> pd.DataFrame

Retrieve UniProt metadata for a list of protein accessions.

This function wraps get_uniprot_fields_worker to handle batching of protein IDs, returning results as a single DataFrame.

Parameters:

Name Type Description Default
prot_list list of str

List of protein accessions.

required
search_fields list of str

UniProt fields to return. Defaults include accession, gene names, GO terms, and STRING IDs.

None
batch_size int

Number of accessions per batch (max 1024, default=100).

100
verbose bool

If True, print progress messages.

True
standardize bool

If True (default), normalize UniProt column names to canonical lowercase keys (e.g., "gene_primary", "organism_id", "xref_string") for consistent downstream processing.

True

Returns:

Name Type Description
df DataFrame

DataFrame containing UniProt metadata for the input proteins.

Example

Query UniProt for a small set of proteins:

proteins = ["P40925", "P40926"]
df = get_uniprot_fields(proteins)
df[["Entry", "Gene Names", "Organism Id"]].head()

Retrieve raw UniProt field names without renaming: >>> df_raw = get_uniprot_fields(proteins, standardize=False)

Source code in src/scpviz/utils/id_maps.py
def get_uniprot_fields(
    prot_list: list[str],
    search_fields: list[str] | None = None,
    batch_size: int = 100,
    verbose: bool = True,
    standardize: bool = True,
    worker_verbose: bool = False,
) -> pd.DataFrame:
    """
    Retrieve UniProt metadata for a list of protein accessions.

    This function wraps `get_uniprot_fields_worker` to handle batching of
    protein IDs, returning results as a single DataFrame.

    Args:
        prot_list (list of str): List of protein accessions.
        search_fields (list of str): UniProt fields to return.
            Defaults include accession, gene names, GO terms, and STRING IDs.
        batch_size (int): Number of accessions per batch (max 1024, default=100).
        verbose (bool): If True, print progress messages.
        standardize (bool): If True (default), normalize UniProt column names
            to canonical lowercase keys (e.g., "gene_primary", "organism_id",
            "xref_string") for consistent downstream processing.

    Returns:
        df (pandas.DataFrame): DataFrame containing UniProt metadata for the input proteins.

    Example:
        Query UniProt for a small set of proteins:
            ```python
            proteins = ["P40925", "P40926"]
            df = get_uniprot_fields(proteins)
            df[["Entry", "Gene Names", "Organism Id"]].head()
            ```

        Retrieve raw UniProt field names without renaming:
            >>> df_raw = get_uniprot_fields(proteins, standardize=False)

    Related Functions:
        - get_uniprot_fields_worker: Worker function that handles low-level UniProt API queries.
        - standardize_uniprot_columns: Helper used internally for column normalization.
    """
    import scpviz.utils as _u

    if search_fields is None:
        search_fields = [
            "accession",
            "id",
            "protein_name",
            "gene_primary",
            "gene_names",
            "organism_id",
            "go",
            "go_f",
            "go_c",
            "go_p",
            "cc_interaction",
            "xref_string",
        ]

    # --- Ensure 'accession' field comes first (UniProt requirement)
    search_fields = ["accession"] + [f for f in search_fields if f != "accession"]

    # --- Split IDs into batches
    batches = [prot_list[i:i + batch_size] for i in range(0, len(prot_list), batch_size)]
    all_results = []

    for i, batch in enumerate(batches, start=1):
        if verbose:
            print(
                f"{format_log_prefix('api', indent=2)} Querying UniProt for batch {i}/{len(batches)} "
                f"({len(batch)} proteins) [fields: {', '.join(search_fields)}]"
            )

            if len(batches) > 1:
                print(f"{format_log_prefix('info_only', indent=3)} Processing batch {i}/{len(batches)}...")

        try:
            batch_df = get_uniprot_fields_worker(batch, search_fields, verbose=worker_verbose)
            if standardize:
                batch_df = _u.standardize_uniprot_columns(batch_df)
            all_results.append(batch_df)
        except Exception as e:
            print(f"{format_log_prefix('warn')} Failed batch {i}: {e}")
            continue

    if not all_results:
        if verbose:
            print(f"{format_log_prefix('warn')} No results retrieved from UniProt.")
        return pd.DataFrame()

    full_method_df = pd.concat(all_results, ignore_index=True)
    if verbose:
        print(f"{format_log_prefix('result_only', 2)} Retrieved UniProt metadata for {len(full_method_df)} entries.")

    return full_method_df

get_uniprot_fields_worker

get_uniprot_fields_worker(
    prot_list: list[str],
    search_fields: list[str] | None = None,
    verbose: bool = False,
) -> pd.DataFrame

Query UniProt for a batch of protein accessions.

This function sends requests to the UniProt REST API for up to 1024 proteins at a time and returns the requested fields as a DataFrame. It handles isoform accessions, fallback queries, and UniProt ID redirects automatically.

Parameters:

Name Type Description Default
prot_list list of str

List of protein accessions or IDs.

required
search_fields list of str

UniProt return fields. See: https://www.uniprot.org/help/return_fields

None
verbose bool

If True, print progress messages and missing accessions.

False

Returns:

Name Type Description
df DataFrame

DataFrame containing UniProt metadata for the input proteins.

Raises:

Type Description
ValueError

If query_type is unknown or the data source cannot be resolved.

Info

  • This function is intended as a worker and is usually called by get_uniprot_fields.
  • It automatically resolves canonical vs. isoform accessions and will attempt UniProt ID mapping if some accessions cannot be found.
Source code in src/scpviz/utils/id_maps.py
def get_uniprot_fields_worker(
    prot_list: list[str],
    search_fields: list[str] | None = None,
    verbose: bool = False,
) -> pd.DataFrame:
    """
    Query UniProt for a batch of protein accessions.

    This function sends requests to the UniProt REST API for up to 1024 proteins
    at a time and returns the requested fields as a DataFrame. It handles isoform
    accessions, fallback queries, and UniProt ID redirects automatically.

    Args:
        prot_list (list of str): List of protein accessions or IDs.
        search_fields (list of str): UniProt return fields.
            See: https://www.uniprot.org/help/return_fields
        verbose (bool): If True, print progress messages and missing accessions.

    Returns:
        df (pandas.DataFrame): DataFrame containing UniProt metadata for the input proteins.

    Raises:
        ValueError: If `query_type` is unknown or the data source cannot be resolved.

    !!! info
        - This function is intended as a **worker** and is usually called by
          `get_uniprot_fields`.
        - It automatically resolves canonical vs. isoform accessions and will
          attempt UniProt ID mapping if some accessions cannot be found.

    Related Functions:
        - get_uniprot_fields: High-level batch UniProt query wrapper.
    """

    base_url = 'https://rest.uniprot.org/uniprotkb/stream'
    if search_fields is None:
        raise ValueError("search_fields is required for UniProt queries.")
    fields = "%2C".join(search_fields)
    format_type = 'tsv'

    def query_uniprot_batch(ids, query_type="accession"):
        if not ids:
            return pd.DataFrame()

        if query_type == "accession":
            query_parts = [f"%28accession%3A{id}%29" for id in ids]
        elif query_type == "id":
            query_parts = [f"%28id%3A{id}%29" for id in ids]
        else:
            raise ValueError(f"Unknown query_type: {query_type}")

        query = "+OR+".join(query_parts)
        full_query = f"%28{query}%29"
        url = f'{base_url}?fields={fields}&format={format_type}&query={full_query}'

        if verbose:
            print(f"Querying UniProt ({query_type}, TSV mode) for {len(ids)} proteins")

        results = requests.get(url)
        results.raise_for_status()

        # Handle empty response gracefully
        if not results.text.strip():
            print(f"{format_log_prefix('warn_only', 2)} UniProt returned empty response for {len(ids)} proteins.")
            return pd.DataFrame()

        return pd.read_csv(io.StringIO(results.text), sep="\t")

    if verbose:
        print(f"{format_log_prefix('API', 1)} Querying UniProt for {len(prot_list)} total proteins [TSV mode].")

    def resolve_uniprot_redirects(accessions, from_db='UniProtKB_AC-ID', to_db='UniProtKB'):
        url = 'https://rest.uniprot.org/idmapping/run'
        data = {'from': from_db, 'to': to_db, 'ids': ','.join(accessions)}

        res = requests.post(url, data=data)
        res.raise_for_status()
        job_id = res.json()['jobId']

        # Poll until job is complete
        while True:
            status = requests.get(f"https://rest.uniprot.org/idmapping/status/{job_id}").json()
            if status.get("jobStatus") == "RUNNING":
                time.sleep(1)
            else:
                break

        # Get results
        results = requests.get(f"https://rest.uniprot.org/idmapping/uniprotkb/results/{job_id}").json()
        mapping = {item['from']: item['to']['primaryAccession'] for item in results.get('results', [])}
        return mapping

    # Split isoform vs canonical accessions
    isoform_ids = [acc for acc in prot_list if '-' in acc]
    canonical_ids = [acc for acc in prot_list if '-' not in acc]

    df_canonical = query_uniprot_batch(canonical_ids, query_type="accession")
    df_isoform = query_uniprot_batch(isoform_ids, query_type="accession")

    # Identify any isoforms that weren't found
    found_isoform_ids = set(df_isoform['Entry']) if not df_isoform.empty else set()
    missing_isoforms = [acc for acc in isoform_ids if acc not in found_isoform_ids]

    if missing_isoforms and verbose:
        print(f"{format_log_prefix('info_only', 3)} Attempting fallback query for {len(missing_isoforms)} isoform base IDs")

    # Attempt fallback query using base accessions
    fallback_ids = list(set([id.split('-')[0] for id in missing_isoforms]))
    df_fallback = query_uniprot_batch(fallback_ids, query_type="id")

    # Combine all DataFrames
    df = pd.concat([df_canonical, df_isoform, df_fallback], ignore_index=True)

    # Final pass: insert missing rows if still unresolved
    found_entries = set(df['Entry']) if 'Entry' in df.columns else set()
    still_missing = set(prot_list) - found_entries

    if still_missing:
        if verbose:
            print(f"{format_log_prefix('info_only', 3)} Attempting UniProt ID redirect for {len(still_missing)} unresolved accessions.")
        redirect_map = resolve_uniprot_redirects(list(still_missing))
        if redirect_map:
            redirected_ids = list(redirect_map.values())
            df_redirected = query_uniprot_batch(redirected_ids, query_type="accession")

            # Remap back to original accession
            inv_map = {v: k for k, v in redirect_map.items()}
            if 'Entry' in df_redirected.columns:
                df_redirected['Entry'] = df_redirected['Entry'].apply(lambda x: inv_map.get(x, x))

            df = pd.concat([df, df_redirected], ignore_index=True)

            resolved = set(redirect_map.keys())
            still_missing -= resolved

    # Step 5: Fill in placeholders for totally missing accessions
    if still_missing:
        print(f"{format_log_prefix('warn_only', 3)} Proteins not found in UniProt: {list(still_missing)[:5]}") if verbose else None
        missing_df = pd.DataFrame({'Entry': list(still_missing)})
        for col in search_fields:
            if col != 'accession' and col not in missing_df.columns:
                missing_df[col] = np.nan
        df = pd.concat([df, missing_df], ignore_index=True)

    if 'STRING' in df.columns:
        # keep first STRING ID (or join all if you prefer)
        df['xref_string'] = df['STRING'].apply(
            lambda s: str(s).split(';')[0].strip() if pd.notna(s) and str(s).strip() else np.nan
        )
        df.drop(columns=['STRING'], inplace=True)

    return df

get_upset_contents

get_upset_contents(
    pdata: pAnnData,
    classes: str | list[str],
    on: str = "protein",
    upsetForm: bool = True,
    debug: bool = False,
) -> pd.DataFrame | dict[str, list[str]]

Construct contents for an UpSet plot from a pAnnData object.

This function extracts feature sets (proteins or peptides) present in specified sample classes and returns them either as a dictionary or in an upsetplot-compatible format.

Parameters:

Name Type Description Default
pdata pAnnData

The pAnnData object containing .prot and .pep.

required
classes str or list of str

Metadata column(s) in .obs to define sample groups. Examples: "cell_type", or ["cell_type", "treatment"].

required
on str

Data level to use. Options are "protein" (default) or "peptide".

'protein'
upsetForm bool

If True, return an UpSet-compatible DataFrame via upsetplot.from_contents. If False, return a raw dictionary.

True
debug bool

If True, print filtering steps and class resolution details.

False

Returns:

Name Type Description
upset_data DataFrame

Binary presence/absence DataFrame for use with upsetplot.UpSet, if upsetForm=True.

upset_dict dict

Mapping of class → list of present features, if upsetForm=False.

Raises:

Type Description
ValueError

If on is not "protein" or "peptide".

Example

Get contents for an UpSet plot of sample classes:

upset_data = get_upset_contents(pdata, classes="treatment")
from upsetplot import UpSet
UpSet(upset_data, subset_size="count").plot()

Retrieve raw dictionary of sets instead:

upset_dict = get_upset_contents(pdata, classes="treatment", upsetForm=False)

Query proteins from a set and highlight them in a plot:

upset_data = scutils.get_upset_contents(pdata, classes="condition")
prot_df = scutils.get_upset_query(upset_data, present=["treated"], absent=["control"])
scplt.plot_rankquant(ax, pdata, classes="condition", cmap=cmaps, color=colors)
scplt.mark_rankquant(ax, pdata, mark_df=prot_df, class_values=["treated"], color="black")

Source code in src/scpviz/utils/class_filter.py
def get_upset_contents(
    pdata: pAnnData,
    classes: str | list[str],
    on: str = "protein",
    upsetForm: bool = True,
    debug: bool = False,
) -> pd.DataFrame | dict[str, list[str]]:
    """
    Construct contents for an UpSet plot from a pAnnData object.

    This function extracts feature sets (proteins or peptides) present in
    specified sample classes and returns them either as a dictionary or
    in an `upsetplot`-compatible format.

    Args:
        pdata (pAnnData): The pAnnData object containing `.prot` and `.pep`.
        classes (str or list of str): Metadata column(s) in `.obs` to define sample groups.
            Examples: `"cell_type"`, or `["cell_type", "treatment"]`.
        on (str): Data level to use. Options are `"protein"` (default) or `"peptide"`.
        upsetForm (bool): If True, return an `UpSet`-compatible DataFrame via
            `upsetplot.from_contents`. If False, return a raw dictionary.
        debug (bool): If True, print filtering steps and class resolution details.

    Returns:
        upset_data (pandas.DataFrame): Binary presence/absence DataFrame for use with
            `upsetplot.UpSet`, if `upsetForm=True`.
        upset_dict (dict): Mapping of class → list of present features,
            if `upsetForm=False`.

    Raises:
        ValueError: If `on` is not `"protein"` or `"peptide"`.

    Example:
        Get contents for an UpSet plot of sample classes:
            ```python
            upset_data = get_upset_contents(pdata, classes="treatment")
            from upsetplot import UpSet
            UpSet(upset_data, subset_size="count").plot()
            ```

        Retrieve raw dictionary of sets instead:
            ```python
            upset_dict = get_upset_contents(pdata, classes="treatment", upsetForm=False)
            ```

        Query proteins from a set and highlight them in a plot:
            ```python
            upset_data = scutils.get_upset_contents(pdata, classes="condition")
            prot_df = scutils.get_upset_query(upset_data, present=["treated"], absent=["control"])
            scplt.plot_rankquant(ax, pdata, classes="condition", cmap=cmaps, color=colors)
            scplt.mark_rankquant(ax, pdata, mark_df=prot_df, class_values=["treated"], color="black")
            ```

    Related Functions:
        - plot_upset: Plot UpSet diagrams directly.
        - plot_venn: Plot Venn diagrams for up to 3 sets.
    """
    import scpviz.utils as _u

    if on == 'protein':
        adata = pdata.prot
    elif on == 'peptide':
        adata = pdata.pep
    else:
        raise ValueError("Invalid value for 'on'. Options are 'protein' or 'peptide'.")

    # Common error: if classes is a list with only one element, unpack it
    if isinstance(classes, list) and len(classes) == 1:
        classes = classes[0]

    classes_list = _u.get_classlist(adata, classes)
    upset_dict = {}

    for j, class_value in enumerate(classes_list):
        data_filtered = _u.resolve_class_filter(adata, classes, class_value, debug=True)

        # get proteins that are present in the filtered data (at least one value is not NaN, not 0)
        X = data_filtered.X.toarray()
        mask_present = (~np.isnan(X)) & (X != 0)
        prot_present = data_filtered.var_names[mask_present.sum(axis=0) > 0]
        upset_dict[class_value] = prot_present.tolist()

    if upsetForm:
        upset_data = _u.upsetplot.from_contents(upset_dict)
        return upset_data

    else:
        return upset_dict

get_upset_query

get_upset_query(
    upset_content: DataFrame,
    present: list[str],
    absent: list[str],
) -> pd.DataFrame

Query features from UpSet contents given inclusion and exclusion criteria.

This function extracts the set of features (proteins or peptides) that are present in all specified groups and absent in others. It then queries UniProt metadata for the resulting accessions.

Parameters:

Name Type Description Default
upset_content DataFrame

Output from get_upset_contents with presence/absence encoding of features.

required
present list of str

List of groups in which the features must be present.

required
absent list of str

List of groups in which the features must be absent.

required

Returns:

Name Type Description
prot_query_df DataFrame

DataFrame of features matching the query,

DataFrame

annotated with UniProt metadata via get_uniprot_fields.

Example

Query proteins unique to one group and highlight them in a plot:

upset_data = scutils.get_upset_contents(pdata, classes="condition")
prot_df = scutils.get_upset_query(upset_data, present=["treated"], absent=["control"])
scplt.plot_rankquant(ax, pdata, classes="condition", cmap=cmaps, color=colors)
scplt.mark_rankquant(ax, pdata, mark_df=prot_df, class_values=["treated"], color="black")

Source code in src/scpviz/utils/class_filter.py
def get_upset_query(
    upset_content: pd.DataFrame, present: list[str], absent: list[str]
) -> pd.DataFrame:
    """
    Query features from UpSet contents given inclusion and exclusion criteria.

    This function extracts the set of features (proteins or peptides) that are
    present in all specified groups and absent in others. It then queries
    UniProt metadata for the resulting accessions.

    Args:
        upset_content (pandas.DataFrame): Output from `get_upset_contents` with
            presence/absence encoding of features.
        present (list of str): List of groups in which the features must be present.
        absent (list of str): List of groups in which the features must be absent.

    Returns:
        prot_query_df (pandas.DataFrame): DataFrame of features matching the query,
        annotated with UniProt metadata via `get_uniprot_fields`.

    Example:
        Query proteins unique to one group and highlight them in a plot:
            ```python
            upset_data = scutils.get_upset_contents(pdata, classes="condition")
            prot_df = scutils.get_upset_query(upset_data, present=["treated"], absent=["control"])
            scplt.plot_rankquant(ax, pdata, classes="condition", cmap=cmaps, color=colors)
            scplt.mark_rankquant(ax, pdata, mark_df=prot_df, class_values=["treated"], color="black")
            ```

    Related Functions:
        - get_upset_contents: Generate presence/absence sets for UpSet analysis.
        - plot_upset: Plot UpSet diagrams from class-based sets.
    """
    import scpviz.utils as _u

    prot_query = _u.upsetplot.query(upset_content, present=present, absent=absent).data['id'].tolist()
    prot_query_df = _u.get_uniprot_fields(prot_query, verbose=False)

    return prot_query_df

infer_layer_is_log

infer_layer_is_log(
    layer: str, adata: Optional[AnnData] = None
) -> bool

Infer whether a layer contains log-transformed values.

  1. Registry (if adata is given and adata.uns['layer_provenance'] exists): walk ancestors via input_layer (cycle-safe). If any step has op == "log_transform", return True. If layer is registered and no log_transform appears, return False.
  2. Name fallback: "log" in layer.lower().

Standalone AnnData objects (e.g. passed into low-level utils helpers) often have no layer_provenance and no pAnnData .history; only the name heuristic applies unless you populate uns['layer_provenance'] yourself.

Parameters:

Name Type Description Default
layer str

Layer name to inspect.

required
adata Optional[AnnData]

Optional AnnData carrying layer_provenance.

None

Returns:

Type Description
bool

True if the layer is treated as log-transformed.

Source code in src/scpviz/utils/data.py
def infer_layer_is_log(layer: str, adata: Optional[ad.AnnData] = None) -> bool:
    """
    Infer whether a layer contains log-transformed values.

    1. **Registry** (if ``adata`` is given and ``adata.uns['layer_provenance']`` exists):
       walk ancestors via ``input_layer`` (cycle-safe). If any step has
       ``op == \"log_transform\"``, return True. If ``layer`` is registered and no
       ``log_transform`` appears, return False.
    2. **Name fallback**: ``\"log\" in layer.lower()``.

    Standalone ``AnnData`` objects (e.g. passed into low-level ``utils`` helpers)
    often have no ``layer_provenance`` and no pAnnData ``.history``; only the
    name heuristic applies unless you populate ``uns['layer_provenance']`` yourself.

    Args:
        layer: Layer name to inspect.
        adata: Optional AnnData carrying ``layer_provenance``.

    Returns:
        True if the layer is treated as log-transformed.
    """
    if adata is not None:
        registry = adata.uns.get("layer_provenance", {})
        visited: set[str] = set()
        current: str = layer
        while current in registry and current not in visited:
            visited.add(current)
            record = registry[current]
            if record.get("op") == "log_transform":
                return True
            nxt = record.get("input_layer", "")
            if not nxt:
                break
            current = nxt
        if layer in registry:
            return False

    return "log" in layer.lower()

pairwise_log2fc

pairwise_log2fc(
    data1: ndarray, data2: ndarray
) -> np.ndarray

Compute pairwise median log2 fold change (log2FC) between two groups.

This function calculates all pairwise log2 ratios between features in two groups of samples and returns the median value per feature. It is primarily used as a helper for fold-change strategies in pAnnData.de().

Parameters:

Name Type Description Default
data1 ndarray

Array of shape (n_samples_group1, n_features) containing abundance values for group 1.

required
data2 ndarray

Array of shape (n_samples_group2, n_features) containing abundance values for group 2.

required

Returns:

Name Type Description
median_log2fc ndarray

Array of shape (n_features,) containing

ndarray

the median pairwise log2 fold change for each feature.

Note

This is an internal helper for differential expression calculations. End users should call pAnnData.de() instead of using this function directly.

Source code in src/scpviz/utils/stats.py
def pairwise_log2fc(data1: np.ndarray, data2: np.ndarray) -> np.ndarray:
    """
    Compute pairwise median log2 fold change (log2FC) between two groups.

    This function calculates all pairwise log2 ratios between features in
    two groups of samples and returns the median value per feature. It is
    primarily used as a helper for fold-change strategies in `pAnnData.de()`.

    Args:
        data1 (numpy.ndarray): Array of shape `(n_samples_group1, n_features)`
            containing abundance values for group 1.
        data2 (numpy.ndarray): Array of shape `(n_samples_group2, n_features)`
            containing abundance values for group 2.

    Returns:
        median_log2fc (numpy.ndarray): Array of shape `(n_features,)` containing
        the median pairwise log2 fold change for each feature.

    Note:
        This is an internal helper for differential expression calculations.
        End users should call `pAnnData.de()` instead of using this function directly.

    Related Functions:
        - pAnnData.de: Differential expression analysis with multiple fold change strategies.
    """
    n1, n2 = data1.shape[0], data2.shape[0]

    # data1[:, None, :] has shape (n1, 1, n_features)
    # data2[None, :, :] has shape (1, n2, n_features)
    # The result is an array of shape (n1, n2, n_features)
    with np.errstate(divide='ignore', invalid='ignore'):
        pairwise_ratios = np.log2(data1[:, None, :] / data2[None, :, :])  # (n1, n2, features)
        pairwise_flat = pairwise_ratios.reshape(-1, data1.shape[1])

    # Identify columns that are entirely NaN
    mask_all_nan = np.all(np.isnan(pairwise_flat), axis=0)
    median_fc = np.full(data1.shape[1], np.nan, dtype=float)

    # Compute only on valid columns
    if not np.all(mask_all_nan):
        valid_cols = ~mask_all_nan
        median_fc[valid_cols] = np.nanmedian(pairwise_flat[:, valid_cols], axis=0)

    # # Reshape to (n1*n2, n_features) and compute the median along the first axis.
    # median_fc = np.nanmedian(pairwise_ratios.reshape(-1, data1.shape[1]), axis=0)
    return median_fc

parse_filename_index

parse_filename_index(
    df: DataFrame,
    obs_columns: list[str],
    delimiter: str = "_",
    condition: str | None = None,
) -> pd.DataFrame

Parse DataFrame index (filenames) into metadata columns based on a list of obs_columns. Can label a subset based on condition.

Parameters:

Name Type Description Default
df DataFrame

DataFrame whose index contains delimited filenames.

required
obs_columns list of str

Names of the metadata columns to extract from the filename.

required
delimiter str

Character used to split the filename. Default is "_".

'_'
condition str or None

Optional boolean expression (evaluated with df.eval) that selects a subset of rows for parsing. If None, parse all rows. For example, condition="parsingType == '5-tokens'"

None

Returns:

Type Description
DataFrame

pd.DataFrame: Copy of df with added metadata columns.

Source code in src/scpviz/utils/data.py
def parse_filename_index(
    df: pd.DataFrame,
    obs_columns: list[str],
    delimiter: str = "_",
    condition: str | None = None,
) -> pd.DataFrame:
    """
    Parse DataFrame index (filenames) into metadata columns based on a list of obs_columns. Can label a subset based on condition.

    Args:
        df (pd.DataFrame):
            DataFrame whose index contains delimited filenames.
        obs_columns (list of str):
            Names of the metadata columns to extract from the filename.
        delimiter (str):
            Character used to split the filename. Default is "_".
        condition (str or None):
            Optional boolean expression (evaluated with df.eval) that selects a
            subset of rows for parsing. If None, parse all rows. For example, `condition="parsingType == '5-tokens'"`

    Returns:
        pd.DataFrame:
            Copy of df with added metadata columns.
    """

    df_parsed = df.copy()

    if condition is None:
        mask = pd.Series(True, index=df.index)
    else:
        try:
            mask = df.eval(condition)
        except Exception as e:
            raise ValueError(f"Invalid condition '{condition}': {e}")

        if mask.dtype != bool:
            raise ValueError(f"Condition '{condition}' did not evaluate to a boolean mask.")

    # Nothing to parse
    if not mask.any():
        raise ValueError(
            f"Condition '{condition}' selected 0 rows. "
            f"Check that the column names and values in the condition are correct."
        )

    # Subset of filenames to parse
    idx_to_parse = df.index[mask]

    # Split index by delimiter
    parts = idx_to_parse.to_series().str.split(delimiter, expand=True)

    # Validate number of parts
    expected = len(obs_columns)
    actual = parts.shape[1]
    if actual != expected:
        raise ValueError(
            f"Expected {expected} parts after splitting index by '{delimiter}', "
            f"but got {actual}. Index example: '{idx_to_parse[0]}'"
        )

    # Assign parsed components
    for i, col in enumerate(obs_columns):
        # Create column if missing
        if col not in df_parsed.columns:
            df_parsed[col] = pd.NA
        # Fill only selected rows
        df_parsed.loc[mask, col] = parts.iloc[:, i].values

    return df_parsed

resolve_accessions

resolve_accessions(
    adata: AnnData | pAnnData,
    namelist: list[str],
    gene_col: str = "Genes",
    gene_map: dict[str, str] | None = None,
) -> list[str] | None

Resolve gene or accession names to accession IDs from .var_names.

This function maps user-specified identifiers (gene names or accession IDs) to the canonical accession IDs in an AnnData or pAnnData object. It first checks .var_names for exact matches, then optionally resolves gene names via a specified column (default "Genes"). Unmatched names are reported.

Parameters:

Name Type Description Default
adata AnnData or pAnnData

AnnData-like object containing .var.

required
namelist list of str

Input identifiers to resolve (genes or accessions).

required
gene_col str

Column in .var containing gene names (default: "Genes").

'Genes'
gene_map dict

Precomputed mapping of gene → accession. If None, a mapping is constructed from gene_col.

None

Returns:

Name Type Description
resolved list of str

List of accession IDs corresponding to the input names.

Raises:

Type Description
ValueError

If none of the provided names can be resolved to .var_names or the gene column.

Example

Resolve gene symbols to accession IDs:

accs = resolve_accessions(adata, namelist=["UBE4B", "GAPDH"])

Resolve accessions directly:

accs = resolve_accessions(adata, namelist=["P12345", "Q67890"])

Source code in src/scpviz/utils/data.py
def resolve_accessions(
    adata: ad.AnnData | pAnnData,
    namelist: list[str],
    gene_col: str = "Genes",
    gene_map: dict[str, str] | None = None,
) -> list[str] | None:
    """
    Resolve gene or accession names to accession IDs from `.var_names`.

    This function maps user-specified identifiers (gene names or accession IDs)
    to the canonical accession IDs in an AnnData or pAnnData object. It first
    checks `.var_names` for exact matches, then optionally resolves gene names
    via a specified column (default `"Genes"`). Unmatched names are reported.

    Args:
        adata (AnnData or pAnnData): AnnData-like object containing `.var`.
        namelist (list of str): Input identifiers to resolve (genes or accessions).
        gene_col (str): Column in `.var` containing gene names (default: `"Genes"`).
        gene_map (dict, optional): Precomputed mapping of gene → accession. If None,
            a mapping is constructed from `gene_col`.

    Returns:
        resolved (list of str): List of accession IDs corresponding to the input names.

    Raises:
        ValueError: If none of the provided names can be resolved to `.var_names`
            or the gene column.

    Example:
        Resolve gene symbols to accession IDs:
            ```python
            accs = resolve_accessions(adata, namelist=["UBE4B", "GAPDH"])
            ```

        Resolve accessions directly:    
            ```python
            accs = resolve_accessions(adata, namelist=["P12345", "Q67890"])
            ```

    Related Functions:
        - get_gene_maps: Build full accession → gene mapping dictionaries.
        - get_abundance: Extract abundance values by gene or accession.
    """
    import pandas as pd

    if not namelist:
        return None

    var_names = adata.var_names.astype(str)

    # Use passed-in gene_map or build one
    if gene_map is None:
        gene_map = {}
        if gene_col in adata.var.columns:
            for acc, gene in zip(var_names, adata.var[gene_col]):
                if pd.notna(gene):
                    gene_map[str(gene)] = acc

    resolved, unmatched = [], []
    for name in namelist:
        name = str(name)
        if name in var_names:
            resolved.append(name)
        elif name in gene_map:
            resolved.append(gene_map[name])
        else:
            unmatched.append(name)

    if not resolved:
        raise ValueError(
            f"No valid names found in `namelist`: {namelist}.\n"
            f"Check against .var_names or '{gene_col}' column."
        )

    if unmatched:
        print(f"{format_log_prefix('warn')} A match was not found for the following:")
        for u in unmatched:
            print(f"  - {u}")

    return resolved

resolve_class_filter

resolve_class_filter(
    adata: pAnnData | AnnData,
    classes: str | list[str],
    class_value: str | list[str],
    debug: bool = False,
    *,
    filter_func: (
        Callable[..., pAnnData | AnnData] | None
    ) = None
) -> pAnnData | ad.AnnData

Resolve (classes, class_value) inputs and apply filtering.

This helper standardizes class/value pairs into dictionary-style filters and applies them to an AnnData or pAnnData object. It is primarily used internally by plotting and analysis functions.

Parameters:

Name Type Description Default
adata AnnData or pAnnData

Input data object to filter.

required
classes str or list of str

Metadata field(s) used for filtering.

required
class_value str or list of str

Values corresponding to classes.

required
debug bool

If True, print resolved class/value pairs.

False
filter_func callable

Filtering function to apply. Defaults to :func:filter.

None

Returns:

Name Type Description
filtered AnnData or pAnnData

Subset of the input object, same type as adata.

Warning

This is an internal helper for use inside functions such as plot_rankquant and plot_raincloud. End users should call pAnnData.filter_sample_values() instead.

Source code in src/scpviz/utils/class_filter.py
def resolve_class_filter(
    adata: pAnnData | ad.AnnData,
    classes: str | list[str],
    class_value: str | list[str],
    debug: bool = False,
    *,
    filter_func: Callable[..., pAnnData | ad.AnnData] | None = None,
) -> pAnnData | ad.AnnData:
    """
    Resolve `(classes, class_value)` inputs and apply filtering.

    This helper standardizes class/value pairs into dictionary-style filters
    and applies them to an AnnData or pAnnData object. It is primarily used
    internally by plotting and analysis functions.

    Args:
        adata (AnnData or pAnnData): Input data object to filter.
        classes (str or list of str): Metadata field(s) used for filtering.
        class_value (str or list of str): Values corresponding to `classes`.
        debug (bool): If True, print resolved class/value pairs.
        filter_func (callable, optional): Filtering function to apply.
            Defaults to :func:`filter`.

    Returns:
        filtered (AnnData or pAnnData): Subset of the input object, same type as `adata`.

    !!! warning
        This is an internal helper for use inside functions such as
        `plot_rankquant` and `plot_raincloud`. End users should call
        `pAnnData.filter_sample_values()` instead.

    Related Functions:
        - filter: Legacy utility for sample filtering.
        - format_class_filter: Standardizes filter inputs.
        - pAnnData.filter_sample_values: Recommended user-facing filter method.
    """

    if isinstance(classes, str):
        values = class_value
    else:
        values = class_value.split('_')

    if debug:
        print(f"Classes: {classes}, Values: {values}")

    if filter_func is None:
        filter_func = filter

    return filter_func(adata, classes, values, debug=debug)

resolve_input_layer

resolve_input_layer(adata: AnnData, layer: str) -> str

Resolve the source layer name for provenance when the user passes layer='X'.

The active matrix .X tracks its logical source in adata.uns['current_X_layer'] (maintained by set_X() and set at import). For any other layer string, return it unchanged.

If current_X_layer is missing (legacy objects), falls back to "X_raw".

Source code in src/scpviz/utils/data.py
def resolve_input_layer(adata: ad.AnnData, layer: str) -> str:
    """
    Resolve the source layer name for provenance when the user passes ``layer='X'``.

    The active matrix ``.X`` tracks its logical source in ``adata.uns['current_X_layer']``
    (maintained by ``set_X()`` and set at import). For any other ``layer`` string,
    return it unchanged.

    If ``current_X_layer`` is missing (legacy objects), falls back to ``\"X_raw\"``.
    """
    if layer == "X":
        return adata.uns.get("current_X_layer", "X_raw")
    return layer

scalarize_taxon

scalarize_taxon(x: object) -> object

Normalize taxon-id values so they never contain lists or arrays.

Returns:

Type Description
object

Scalar string-like taxon id, or pd.NA.

Source code in src/scpviz/utils/id_maps.py
def scalarize_taxon(x: object) -> object:
    """
    Normalize taxon-id values so they never contain lists or arrays.

    Returns:
        Scalar string-like taxon id, or pd.NA.
    """
    # Handle pandas missing scalar explicitly first
    if x is pd.NA:
        return pd.NA

    # Handle standard missing
    if x is None:
        return pd.NA
    if isinstance(x, float) and np.isnan(x):
        return pd.NA

    # Empty string
    if isinstance(x, str):
        s = x.strip()
        return pd.NA if s == "" else s

    # Empty container / container → first element
    if isinstance(x, (list, tuple, np.ndarray)):
        if len(x) == 0:
            return pd.NA
        return scalarize_taxon(x[0])

    # Everything else → string
    return str(x)

standardize_uniprot_columns

standardize_uniprot_columns(
    df: DataFrame | None,
) -> pd.DataFrame | None

Normalize UniProt DataFrame column names to a consistent lowercase, snake_case schema.

This ensures stability across UniProt REST API version changes while keeping the user informed only when critical fields are affected.

Parameters:

Name Type Description Default
df DataFrame

Raw UniProt metadata table.

required

Returns:

Type Description
DataFrame | None

pd.DataFrame: Copy of the DataFrame with standardized column names.

Source code in src/scpviz/utils/id_maps.py
def standardize_uniprot_columns(df: pd.DataFrame | None) -> pd.DataFrame | None:
    """
    Normalize UniProt DataFrame column names to a consistent lowercase, snake_case schema.

    This ensures stability across UniProt REST API version changes while keeping
    the user informed only when critical fields are affected.

    Args:
        df (pd.DataFrame): Raw UniProt metadata table.

    Returns:
        pd.DataFrame: Copy of the DataFrame with standardized column names.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.shape[1] == 0:
        return df

    rename_map = {}
    aliases = {
        # identifiers
        "entry": "accession",
        "entry_name": "id",
        "accession": "accession",
        "primaryaccession": "accession",

        # gene fields
        "gene_names_primary": "gene_primary",
        "gene_name_primary": "gene_primary",
        "gene_primary_name": "gene_primary",
        "gene_primary": "gene_primary",
        "gene_primaryname": "gene_primary",
        "gene_primary_name_": "gene_primary",
        "gene_primaryname_": "gene_primary",

        # organism fields
        "organism_id": "organism_id",
        "organism_identifier": "organism_id",
        "organismid": "organism_id",

        # STRING / cross-reference
        "cross_reference_string": "xref_string",
        "xref_string_id": "xref_string",
        "crossreference_string": "xref_string",
        "string": "xref_string",
        "string_id": "xref_string",
        "xref_string": "xref_string",
    }

    # critical canonical fields we care about if changed or missing
    critical_fields = {"accession", "gene_primary", "organism_id", "xref_string"}

    # known benign patterns — don't warn if these change
    benign_patterns = {
        "gene_ontology",
        "go",
        "gene_names",      # non-primary gene list
        "protein_name",    # descriptive only
        "cc_interaction",  # crossref metadata
    }

    for col in df.columns:
        norm = (
            re.sub(r"[^a-z0-9]+", "_", col.lower())
            .strip("_")
            .replace("__", "_")
        )

        mapped = aliases.get(norm, None)

        if mapped:
            rename_map[col] = mapped
        else:
            # warn only if this looks like a drifted critical column
            if (
                any(k in norm for k in ["accession", "gene", "organism", "string"])
                and not any(p in norm for p in benign_patterns)
            ):
                warnings.warn(
                    f"[standardize_uniprot_columns] ⚠️ Unrecognized UniProt column '{col}' "
                    f"(normalized='{norm}') — may affect critical mapping.",
                    RuntimeWarning,
                    stacklevel=2,
                )
            rename_map[col] = norm  # keep normalized fallback name

    df = df.rename(columns=rename_map)
    # verify that all critical fields exist at least once
    missing_critical = [c for c in critical_fields if c not in df.columns]
    if missing_critical:
        if _setup.GLOBAL_DEBUG:
            warnings.warn(
                f"[standardize_uniprot_columns] Missing expected UniProt columns: {', '.join(missing_critical)}",
                RuntimeWarning,
                stacklevel=2,
            )

    return df.rename(columns=rename_map)

update_layer_provenance

update_layer_provenance(
    adata: AnnData,
    layer_name: str,
    op: str,
    input_layer: str,
    **kwargs: Any
) -> str

Register a layer in the provenance registry stored in adata.uns.

Preprocessing methods (normalize, impute, log_transform) call this before assigning adata.layers[...]. Chains are reconstructable by following input_layer pointers.

If layer_name already exists with a different op or input_layer, a warning is printed and the record is stored under layer_name_1, layer_name_2, …

Parameters:

Name Type Description Default
adata AnnData

AnnData to update (must not rely on pAnnData .history; registry lives only in adata.uns).

required
layer_name str

Intended output layer key.

required
op str

One of "normalize", "impute", "log_transform".

required
input_layer str

Source layer name, or "X" if read from adata.X.

required
**kwargs Any

Extra metadata (e.g. method=..., base=...).

{}

Returns:

Type Description
str

Actual layer key to use in adata.layers (may be suffixed on collision).

Source code in src/scpviz/utils/data.py
def update_layer_provenance(
    adata: ad.AnnData,
    layer_name: str,
    op: str,
    input_layer: str,
    **kwargs: Any,
) -> str:
    """
    Register a layer in the provenance registry stored in ``adata.uns``.

    Preprocessing methods (``normalize``, ``impute``, ``log_transform``) call this
    before assigning ``adata.layers[...]``. Chains are reconstructable by following
    ``input_layer`` pointers.

    If ``layer_name`` already exists with a different ``op`` or ``input_layer``,
    a warning is printed and the record is stored under ``layer_name_1``, ``layer_name_2``, …

    Args:
        adata: AnnData to update (must not rely on pAnnData ``.history``; registry
            lives only in ``adata.uns``).
        layer_name: Intended output layer key.
        op: One of ``\"normalize\"``, ``\"impute\"``, ``\"log_transform\"``.
        input_layer: Source layer name, or ``\"X\"`` if read from ``adata.X``.
        **kwargs: Extra metadata (e.g. ``method=...``, ``base=...``).

    Returns:
        Actual layer key to use in ``adata.layers`` (may be suffixed on collision).
    """
    if "layer_provenance" not in adata.uns:
        adata.uns["layer_provenance"] = {}

    registry = adata.uns["layer_provenance"]
    new_record = {"op": op, "input_layer": input_layer, **kwargs}

    if layer_name in registry:
        existing = registry[layer_name]
        collision = (
            existing.get("input_layer") != input_layer or existing.get("op") != op
        )
        if collision:
            suffix_n = 1
            candidate = f"{layer_name}_{suffix_n}"
            while candidate in registry:
                suffix_n += 1
                candidate = f"{layer_name}_{suffix_n}"

            print(
                f"{format_log_prefix('warn')} Layer '{layer_name}' already exists "
                f"in the provenance registry with a different origin:\n"
                f"       existing: {existing}\n"
                f"       new:      {new_record}\n"
                f"     Storing new layer as '{candidate}' to avoid collision.\n"
                f"     Use pdata.show_layer_provenance('{layer_name}') to inspect "
                "the existing chain."
            )
            layer_name = candidate

    registry[layer_name] = new_record
    return layer_name