Source code for allfreqs.allfreqs

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Created by Roberto Preste
from typing import Dict, Optional

from cached_property import cached_property
import pandas as pd
from skbio import TabularMSA, DNA, Sequence

from allfreqs.classes import Reference, MultiAlignment
from allfreqs.constants import AMBIGUOUS_COLS, STANDARD_COLS


[docs]class AlleleFreqs: """Class used to calculate allele frequencies from a multialignment. Input can be either a fasta or csv file with multialigned sequences, which may or may not contain the reference sequence in the first position. In the latter case, an additional reference sequence file is needed, either in fasta or csv format. """ def __init__(self, multialg: MultiAlignment, reference: Reference, ambiguous: bool = False): self.multialg = multialg self.reference = reference self.ambiguous = ambiguous if len(self.multialg.tabmsa.sequence[0]) != len(self.reference): raise ValueError("Reference and aligned sequences must have " "the same length.")
[docs] @classmethod def from_fasta(cls, sequences: str, reference: Optional[str] = None, ambiguous: bool = False): """Read a multialignment from a fasta file. If `reference` is not provided, it is assumed that the first sequence of the multialignment is the reference sequence. Otherwise, an additional fasta file with the reference sequence is needed. Args: sequences: input fasta file with multialignment reference: optional fasta file with reference sequence ambiguous: show frequencies for ambiguous nucleotides too [default: False] """ msa = TabularMSA.read(sequences, constructor=DNA) if not reference: refer = msa[0] multialg = {seq.metadata.get("id"): str(seq) for seq in msa[1:]} else: refer = Sequence.read(reference) multialg = {seq.metadata.get("id"): str(seq) for seq in msa} ref = Reference(refer) alg = MultiAlignment(multialg) return cls(multialg=alg, reference=ref, ambiguous=ambiguous)
[docs] @classmethod def from_csv(cls, sequences: str, reference: Optional[str] = None, ambiguous: bool = False, **kwargs): """Read a multialignment from a csv file. If `reference` is not provided, it is assumed that the first sequence of the multialignment is the reference sequence. Otherwise, an additional csv file with the reference sequence is needed. In both cases, the input csv file must be composed of two columns only, one for sequences ids and the other for the actual sequences; if not, you can provide additional options for pandas to restrict the number of columns read. Args: sequences: input csv file with multialignment reference: optional csv file with reference sequence ambiguous: show frequencies for ambiguous nucleotides too [default: False] **kwargs: additional options for pandas.read_csv() """ msa = pd.read_csv(sequences, **kwargs) if msa.shape[1] != 2: raise ValueError("Please make sure the input only contains two " "columns.") if not reference: refer = msa.iloc[0, 1] msa = msa.iloc[1:, :] multialg = dict(zip(msa.iloc[:, 0], msa.iloc[:, 1])) else: refer = pd.read_csv(reference, **kwargs) if refer.shape[1] != 2: raise ValueError("Please make sure the input only contains " "two columns.") refer = refer.iloc[0, 1] multialg = dict(zip(msa.iloc[:, 0], msa.iloc[:, 1])) ref = Reference(refer) alg = MultiAlignment(multialg) return cls(multialg=alg, reference=ref, ambiguous=ambiguous)
@cached_property def df(self) -> pd.DataFrame: """Convert sequences to the proper dataframe for further allele frequency calculations.""" df = self.multialg.tabmsa["sequence"].str.split("", expand=True) df.drop([0, df.columns[len(df.columns) - 1]], axis=1, inplace=True) df.fillna("-", inplace=True) # avoidable df.index = self.multialg.tabmsa["id"] df.columns = self.reference.indexes return df @cached_property def frequencies(self) -> pd.DataFrame: """Calculate allele frequencies for the 4 basic nucleotides, gaps and other (non-canonical) nucleotides.""" rows = [] cols = AMBIGUOUS_COLS if self.ambiguous else STANDARD_COLS idxs = [] for col in self.df.columns: freqs = self.df[col].value_counts(normalize=True) freq_dict = self._get_frequencies(freqs) rows.append(freq_dict) idxs.append(col) var_df = pd.DataFrame(rows, columns=cols, index=idxs) var_df.reset_index(inplace=True) var_df.rename({"index": "position"}, axis=1, inplace=True) return var_df def _get_frequencies(self, freqs: pd.Series) -> Dict[str, float]: """Get allele frequencies for either standard or ambiguous nucleotides from the value counts returned by Pandas. Args: freqs: normalized value counts Series from pandas Returns: frequencies: dictionary with each nucleotide and its frequency """ if self.ambiguous: freq_A = freqs.get("A", 0.0) freq_C = freqs.get("C", 0.0) freq_G = freqs.get("G", 0.0) freq_T = freqs.get("T", 0.0) freq_R = freqs.get("R", 0.0) freq_Y = freqs.get("Y", 0.0) freq_K = freqs.get("K", 0.0) freq_M = freqs.get("M", 0.0) freq_S = freqs.get("S", 0.0) freq_W = freqs.get("W", 0.0) freq_B = freqs.get("B", 0.0) freq_D = freqs.get("D", 0.0) freq_H = freqs.get("H", 0.0) freq_V = freqs.get("V", 0.0) freq_N = freqs.get("N", 0.0) freq_gap = freqs.get("-", 0.0) frequencies = {"A": freq_A, "C": freq_C, "G": freq_G, "T": freq_T, "R": freq_R, "Y": freq_Y, "K": freq_K, "M": freq_M, "S": freq_S, "W": freq_W, "B": freq_B, "D": freq_D, "H": freq_H, "V": freq_V, "N": freq_N, "gap": freq_gap} else: freq_A = freqs.get("A", 0.0) freq_C = freqs.get("C", 0.0) freq_G = freqs.get("G", 0.0) freq_T = freqs.get("T", 0.0) freq_gap = freqs.get("-", 0.0) freq_oth = 1.0 - (freq_A + freq_C + freq_G + freq_T + freq_gap) frequencies = {"A": freq_A, "C": freq_C, "G": freq_G, "T": freq_T, "gap": freq_gap, "oth": freq_oth} return frequencies
[docs] def to_csv(self, output_file: str = "all_freqs.csv"): """Write the resulting allele frequency dataframe to disk. Args: output_file: output file name """ self.frequencies.to_csv(output_file, index=False)
def __repr__(self): return "<{} ({} sequences, {} positions)>".format( self.__class__.__name__, len(self.multialg), len(self.reference) )