Skip to content

Api deseq

omicverse.bulk.pyDEG

Bases: object

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
class pyDEG(object):


    def __init__(self,raw_data:pd.DataFrame) -> None:
        r"""Initialize the pyDEG class.

        Arguments:
            raw_data: The raw data to be processed.

        Returns:
            None
        """
        self.raw_data=raw_data
        self.data=raw_data.copy()

    def drop_duplicates_index(self)->pd.DataFrame:
        r"""Drop the duplicated index of data.

        Returns:
            data: The data after dropping the duplicated index.
        """
        self.data=data_drop_duplicates_index(self.data)
        return self.data

    def normalize(self)->pd.DataFrame:
        r"""Normalize the data using DESeq2 method.

        Returns:
            data: The normalized data.
        """
        self.size_factors=estimateSizeFactors(self.data)
        self.data=deseq2_normalize(self.data)
        return self.data

    def foldchange_set(self,fc_threshold:int=-1,pval_threshold:float=0.05,logp_max:int=6,fold_threshold:int=0):
        r"""Set fold-change and p-value thresholds to classify differentially expressed genes as up-regulated, down-regulated, or not significant.

        Arguments:
            fc_threshold: Absolute fold-change threshold. If set to -1, the threshold is calculated based on the histogram of log2 fold-changes. (-1)
            pval_threshold: p-value threshold for determining significance. (0.05)
            logp_max: Maximum value for log-transformed p-values. (6)
            fold_threshold: Index of the histogram bin corresponding to the fold-change threshold (only applicable if fc_threshold=-1). (0)

        Returns:
            None
        """
        if fc_threshold==-1:
            foldp=np.histogram(self.result['log2FC'].dropna())
            foldchange=(foldp[1][np.where(foldp[1]>0)[0][fold_threshold]]+foldp[1][np.where(foldp[1]>0)[0][fold_threshold+1]])/2
        else:
            foldchange=fc_threshold
        print('... Fold change threshold: %s'%foldchange)
        fc_max,fc_min=foldchange,0-foldchange
        self.fc_max,self.fc_min=fc_max,fc_min
        self.pval_threshold=pval_threshold
        self.result['sig']='normal'
        self.result.loc[((self.result['log2FC']>fc_max)&(self.result['qvalue']<pval_threshold)),'sig']='up'
        self.result.loc[((self.result['log2FC']<fc_min)&(self.result['qvalue']<pval_threshold)),'sig']='down'
        self.result.loc[self.result['-log(qvalue)']>logp_max,'-log(qvalue)']=logp_max
        self.logp_max=logp_max


    def plot_volcano(self,figsize:tuple=(4,4),pval_name='qvalue',fc_name='log2FC',
                     title:str='',titlefont:dict={'weight':'normal','size':14,},
                     up_color:str='#e25d5d',down_color:str='#7388c1',normal_color:str='#d7d7d7',
                     up_fontcolor:str='#e25d5d',down_fontcolor:str='#7388c1',normal_fontcolor:str='#d7d7d7',
                     legend_bbox:tuple=(0.8, -0.2),legend_ncol:int=2,legend_fontsize:int=12,
                     plot_genes:list=None,plot_genes_num:int=10,plot_genes_fontsize:int=10,
                     ticks_fontsize:int=12,ax=None):
        r"""Generate a volcano plot for the differential gene expression analysis results.

        Arguments:
            figsize: The size of the generated figure. ((4,4))
            pval_name: Column name for p-values. ('qvalue')
            fc_name: Column name for fold changes. ('log2FC')
            title: The title of the plot. ('')
            titlefont: A dictionary of font properties for the plot title. ({'weight':'normal','size':14,})
            up_color: The color of the up-regulated genes in the plot. ('#e25d5d')
            down_color: The color of the down-regulated genes in the plot. ('#7388c1')
            normal_color: The color of the non-significant genes in the plot. ('#d7d7d7')
            up_fontcolor: Font color for up-regulated gene labels. ('#e25d5d')
            down_fontcolor: Font color for down-regulated gene labels. ('#7388c1')
            normal_fontcolor: Font color for normal gene labels. ('#d7d7d7')
            legend_bbox: A tuple containing the coordinates of the legend's bounding box. ((0.8, -0.2))
            legend_ncol: The number of columns in the legend. (2)
            legend_fontsize: The font size of the legend. (12)
            plot_genes: A list of genes to be plotted on the volcano plot. (None)
            plot_genes_num: The number of genes to be plotted on the volcano plot. (10)
            plot_genes_fontsize: The font size of the genes to be plotted on the volcano plot. (10)
            ticks_fontsize: The font size of the ticks. (12)
            ax: Matplotlib axis object. (None)

        Returns:
            ax: The generated volcano plot.

        """

        ax=volcano(result=self.result,pval_name=pval_name,fc_name=fc_name,pval_max=self.logp_max,
                       figsize=figsize,title=title,titlefont=titlefont,
                       up_color=up_color,down_color=down_color,normal_color=normal_color,
                       up_fontcolor=up_fontcolor,down_fontcolor=down_fontcolor,normal_fontcolor=normal_fontcolor,
                       legend_bbox=legend_bbox,legend_ncol=legend_ncol,legend_fontsize=legend_fontsize,plot_genes=plot_genes,
                       plot_genes_num=plot_genes_num,plot_genes_fontsize=plot_genes_fontsize,
                       ticks_fontsize=ticks_fontsize,ax=ax,
                       pval_threshold=self.pval_threshold,fc_max=self.fc_max,fc_min=self.fc_min)
        return ax
        '''
        fig, ax = plt.subplots(figsize=figsize)
        result=self.result.copy()
        #首先绘制正常基因
        ax.scatter(x=result[result['sig']=='normal']['log2FC'],
                y=result[result['sig']=='normal']['-log(qvalue)'],
                color=normal_color,#颜色
                alpha=.5,#透明度
                )
        #接着绘制上调基因
        ax.scatter(x=result[result['sig']=='up']['log2FC'],
                y=result[result['sig']=='up']['-log(qvalue)'],
                color=up_color,#选择色卡第15个颜色
                alpha=.5,#透明度
                )
        #绘制下调基因
        ax.scatter(x=result[result['sig']=='down']['log2FC'],
                y=result[result['sig']=='down']['-log(qvalue)'],
                color=down_color,#颜色
                alpha=.5,#透明度
                )

        ax.plot([result['log2FC'].min(),result['log2FC'].max()],#辅助线的x值起点与终点
                [-np.log10(self.pval_threshold),-np.log10(self.pval_threshold)],#辅助线的y值起点与终点
                linewidth=2,#辅助线的宽度
                linestyle="--",#辅助线类型:虚线
                color='black'#辅助线的颜色
        )
        ax.plot([self.fc_max,self.fc_max],
                [result['-log(qvalue)'].min(),result['-log(qvalue)'].max()],
                linewidth=2, 
                linestyle="--",
                color='black')
        ax.plot([self.fc_min,self.fc_min],
                [result['-log(qvalue)'].min(),result['-log(qvalue)'].max()],
                linewidth=2, 
                linestyle="--",
                color='black')
        #设置横标签与纵标签
        ax.set_ylabel(r'$-log_{10}(qvalue)$',titlefont)                                    
        ax.set_xlabel(r'$log_{2}FC$',titlefont)
        #设置标题
        ax.set_title(title,titlefont)

        #绘制图注
        #legend标签列表,上面的color即是颜色列表
        labels = ['up:{0}'.format(len(result[result['sig']=='up'])),
                'down:{0}'.format(len(result[result['sig']=='down']))]  
        #用label和color列表生成mpatches.Patch对象,它将作为句柄来生成legend
        color = [up_color,down_color]
        patches = [mpatches.Patch(color=color[i], label="{:s}".format(labels[i]) ) for i in range(len(color))] 

        ax.legend(handles=patches,
            bbox_to_anchor=legend_bbox, 
            ncol=legend_ncol,
            fontsize=legend_fontsize)

        ax.spines['top'].set_visible(False)
        ax.spines['right'].set_visible(False)
        ax.spines['bottom'].set_visible(True)
        ax.spines['left'].set_visible(True)

        from adjustText import adjust_text
        import adjustText

        if plot_genes is not None:
            hub_gene=plot_genes
        else:
            up_result=result.loc[result['sig']=='up']
            down_result=result.loc[result['sig']=='down']
            hub_gene=up_result.sort_values('qvalue').index[:plot_genes_num//2].tolist()+down_result.sort_values('qvalue').index[:plot_genes_num//2].tolist()

        color_dict={
        'up':up_fontcolor,
            'down':down_fontcolor,
            'normal':normal_fontcolor
        }

        texts=[ax.text(result.loc[i,'log2FC'], 
               result.loc[i,'-log(qvalue)'],
               i,
               fontdict={'size':plot_genes_fontsize,'weight':'bold','color':color_dict[result.loc[i,'sig']]}
               ) for i in hub_gene]

        if adjustText.__version__<='0.8':
            adjust_text(texts,only_move={'text': 'xy'},arrowprops=dict(arrowstyle='->', color='red'),)
        else:
            adjust_text(texts,only_move={"text": "xy", "static": "xy", "explode": "xy", "pull": "xy"},
                        arrowprops=dict(arrowstyle='->', color='red'))

        ax.set_xticks([round(i,2) for i in ax.get_xticks()[1:-1]],#获取x坐标轴内容
              [round(i,2) for i in ax.get_xticks()[1:-1]],#更新x坐标轴内容
              fontsize=ticks_fontsize,
              fontweight='normal'
              )
        return fig,ax
        '''

    def plot_boxplot(self,genes:list,treatment_groups:list,control_groups:list,
                     log:bool=True,
                     treatment_name:str='Treatment',control_name:str='Control',
                     figsize:tuple=(4,3),palette:list=["#a64d79","#674ea7"],
                     title:str='Gene Expression',fontsize:int=12,legend_bbox:tuple=(1, 0.55),legend_ncol:int=1,
                     **kwarg)->Tuple[matplotlib.figure.Figure,matplotlib.axes._axes.Axes]:
        r"""
        Plot the boxplot of genes from dds data

        Arguments:
            genes: The genes to plot.
            treatment_groups: The treatment groups.
            control_groups: The control groups.
            figsize: The figure size.
            palette: The color palette.
            title: The title of the plot.
            fontsize: The fontsize of the plot.
            legend_bbox: The bbox of the legend.
            legend_ncol: The number of columns of the legend.
            **kwarg: Other arguments for plot_boxplot function.

        Returns:
            fig: The figure of the plot.
            ax: The axis of the plot.
        """
        p_data=pd.DataFrame(columns=['Value','Gene','Type'])
        if log:
            for gene in genes:
                plot_data1=pd.DataFrame()
                plot_data1['Value']=np.log1p(self.data[treatment_groups].loc[gene].values)
                plot_data1['Gene']=gene
                plot_data1['Type']=treatment_name

                plot_data2=pd.DataFrame()
                plot_data2['Value']=np.log1p(self.data[control_groups].loc[gene].values)
                plot_data2['Gene']=gene
                plot_data2['Type']=control_name

                plot_data=pd.concat([plot_data1,plot_data2],axis=0)
                p_data=pd.concat([p_data,plot_data],axis=0)
        else:
            for gene in genes:
                plot_data1=pd.DataFrame()
                plot_data1['Value']=self.data[treatment_groups].loc[gene].values
                plot_data1['Gene']=gene
                plot_data1['Type']=treatment_name

                plot_data2=pd.DataFrame()
                plot_data2['Value']=self.data[control_groups].loc[gene].values
                plot_data2['Gene']=gene
                plot_data2['Type']=control_name

                plot_data=pd.concat([plot_data1,plot_data2],axis=0)
                p_data=pd.concat([p_data,plot_data],axis=0)

        fig,ax=plot_boxplot(p_data,hue='Type',x_value='Gene',y_value='Value',palette=palette,
                          figsize=figsize,fontsize=fontsize,title=title,
                          legend_bbox=legend_bbox,legend_ncol=legend_ncol, **kwarg)
        return fig,ax

    def ranking2gsea(self,rank_max:int=200,rank_min:int=274)->pd.DataFrame:
        r"""
        Ranking the result of dds data for gsea analysis

        Arguments:
            rank_max: The max rank of the result.
            rank_min: The min rank of the result.

        Returns:
            rnk: The ranking result.

        """


        result=self.result.copy()
        result['fcsign']=np.sign(result['log2FC'])
        result['logp']=-np.log10(result['pvalue'])
        result['metric']=result['logp']/result['fcsign']
        rnk=pd.DataFrame()
        rnk['gene_name']=result.index
        rnk['rnk']=result['metric'].values
        rnk=rnk.sort_values(by=['rnk'],ascending=False)
        k=1
        total=0
        for i in range(len(rnk)):
            if rnk.loc[i,'rnk']==np.inf: 
                total+=1
        #200跟274根据你的数据进行更改,保证inf比你数据最大的大,-inf比数据最小的小就好
        for i in range(len(rnk)):
            if rnk.loc[i,'rnk']==np.inf: 
                rnk.loc[i,'rnk']=rank_max+(total-k)
                k+=1
            elif rnk.loc[i,'rnk']==-np.inf: 
                rnk.loc[i,'rnk']=-(rank_min+k)
                k+=1
        return rnk

    def deg_analysis(self,group1:list,group2:list,
                 method:str='DEseq2',alpha:float=0.05,
                 multipletests_method:str='fdr_bh',n_cpus:int=8,
                 cooks_filter:bool=True, independent_filter:bool=True)->pd.DataFrame:
        r"""
        Differential expression analysis.

        Arguments:
            group1: The first group to be compared.
            group2: The second group to be compared.
            method: The method to be used for differential expression analysis.
                - `DEseq2`: DEseq2
                - `ttest`: ttest
                - `wilcox`: wilconx test
            alpha: The threshold of p-value.
            multipletests_method:
                - `bonferroni` : one-step correction
                - `sidak` : one-step correction
                - `holm-sidak` : step down method using Sidak adjustments
                - `holm` : step-down method using Bonferroni adjustments
                - `simes-hochberg` : step-up method  (independent)
                - `hommel` : closed method based on Simes tests (non-negative)
                - `fdr_bh` : Benjamini/Hochberg  (non-negative)
                - `fdr_by` : Benjamini/Yekutieli (negative)
                - `fdr_tsbh` : two stage fdr correction (non-negative)
                - `fdr_tsbky` : two stage fdr correction (non-negative)

        Returns
            result: The result of differential expression analysis.
        """
        from pydeseq2.dds import DeseqDataSet
        from pydeseq2.ds import DeseqStats
        from scipy.stats import ttest_ind
        from statsmodels.stats.multitest import multipletests
        print(f"⚙️ You are using {method} method for differential expression analysis.")
        if method=='ttest':

            data=self.data

            g1_mean=data[group1].mean(axis=1)
            g2_mean=data[group2].mean(axis=1)
            g=(g2_mean+g1_mean)/2
            g=g.loc[g>0].min()
            fold=(g1_mean+g)/(g2_mean+g)
            #log2fold=np.log2(fold)
            ttest = ttest_ind(data[group1].T.values, data[group2].T.values)
            pvalue=ttest[1]
            print(f"⏰ Start to calculate qvalue...")
            qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                               method=multipletests_method, is_sorted=False, returnsorted=False)
            #qvalue=fdrcorrection(np.nan_to_num(np.array(pvalue),0), alpha=0.05, method='indep', is_sorted=False)
            genearray = np.asarray(pvalue)
            result = pd.DataFrame({'pvalue':genearray,'qvalue':qvalue[1],'FoldChange':fold})
            result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
            result['BaseMean']=(g1_mean+g2_mean)/2
            result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
            result['log2FC'] = np.log2(result['FoldChange'])
            result['abs(log2FC)'] = abs(np.log2(result['FoldChange']))
            result['size']  =np.abs(result['FoldChange'])/10
            result=result.loc[~result['pvalue'].isnull()]
            result['-log(pvalue)'] = -np.log10(result['pvalue'])
            result['-log(qvalue)'] = -np.log10(result['qvalue'])
            #max mean of between each value in group1 and group2
            #result=result[result['padj']<alpha]
            result['sig']='normal'
            result.loc[result['qvalue']<alpha,'sig']='sig'
            print(f"✅ Differential expression analysis completed.")

            self.result=result
            return result
        elif method=='wilcox':
            raise ValueError('The method is not supported.')
            print(f"⚙️ You are using {method} method for differential expression analysis.")
        elif method=='DEseq2':
            import pydeseq2
            counts_df = self.data[group1+group2].T
            clinical_df = pd.DataFrame(index=group1+group2)

            clinical_df['condition'] = ['Treatment'] * len(group1) + ['Control'] * len(group2)
            print(f"⏰ Start to create DeseqDataSet...")
            # Determine pydeseq2 version and create the DeseqDataSet accordingly
            if pydeseq2.__version__ <= '0.3.5':
                dds = DeseqDataSet(
                    counts=counts_df,
                    clinical=clinical_df,
                    design_factors="condition",  # compare samples based on "condition"
                    ref_level=["condition", "Control"],
                    refit_cooks=True,
                    n_cpus=n_cpus,
                )
            elif pydeseq2.__version__ <= '0.4.1':
                if ad.__version__ > '0.10.8':
                    raise ImportError(
                        'Please install the 0.10.8 version of anndata: `pip install anndata==0.10.8`.'
                    )
                dds = DeseqDataSet(
                    counts=counts_df,
                    metadata=clinical_df,
                    design_factors="condition",
                    refit_cooks=True,
                    n_cpus=n_cpus,
                )
            else:
                from pydeseq2.default_inference import DefaultInference
                inference = DefaultInference(n_cpus=n_cpus)
                dds = DeseqDataSet(
                    counts=counts_df,
                    metadata=clinical_df,
                    design_factors="condition",
                    refit_cooks=True,
                    inference=inference,
                )

            dds.fit_size_factors()
            dds.fit_genewise_dispersions()
            dds.fit_dispersion_trend()
            dds.fit_dispersion_prior()
            print(f"logres_prior={dds.uns['_squared_logres']}, sigma_prior={dds.uns['prior_disp_var']}")
            dds.fit_MAP_dispersions()
            dds.fit_LFC()
            dds.calculate_cooks()
            if dds.refit_cooks:
                dds.refit()

            # Add the 'contrast' parameter here:
        # FIX: Adding version check for DeseqStats constructor
            if pydeseq2.__version__<='0.3.5':
                stat_res = DeseqStats(dds, alpha=alpha, cooks_filter=cooks_filter, independent_filter=independent_filter)
            elif pydeseq2.__version__ <= '0.4.1':
                # For newer PyDESeq2 versions that require the contrast parameter
                stat_res = DeseqStats(dds, contrast=["condition", "Treatment", "Control"], 
                                    alpha=alpha, cooks_filter=cooks_filter, independent_filter=independent_filter)
                stat_res.run_wald_test()
                if stat_res.cooks_filter:
                    stat_res._cooks_filtering()

                if stat_res.independent_filter:
                    stat_res._independent_filtering()
                else:
                    stat_res._p_value_adjustment()
            else:
                stat_res=DeseqStats(
                            dds,
                            contrast=["condition", "Treatment", "Control"], 
                            alpha=alpha,
                            cooks_filter=cooks_filter,
                            independent_filter=independent_filter,
                        )
                stat_res.run_wald_test()
                if stat_res.cooks_filter:
                    stat_res._cooks_filtering()

                if stat_res.independent_filter:
                    stat_res._independent_filtering()
                else:
                    stat_res._p_value_adjustment()


            self.stat_res = stat_res
            stat_res.summary()
            result = stat_res.results_df
            result['qvalue'] = result['padj']
            result['-log(pvalue)'] = -np.log10(result['pvalue'])
            result['-log(qvalue)'] = -np.log10(result['padj'])
            result['BaseMean'] = result['baseMean']
            result['log2(BaseMean)'] = np.log2(result['baseMean'] + 1)
            result['log2FC'] = result['log2FoldChange']
            result['abs(log2FC)'] = abs(result['log2FC'])
            result['sig'] = 'normal'
            result.loc[result['qvalue'] < alpha, 'sig'] = 'sig'
            self.result = result
            print(f"✅ Differential expression analysis completed.")
            return result


        elif method == 'edgepy':
            try:
                from inmoose.data.pasilla import pasilla
                from inmoose.edgepy import DGEList, glmLRT, topTags
                from patsy import dmatrix
            except:
                raise ImportError('Please install inmoose: `pip install inmoose`')
            print(f"⏰ Start to create DGEList...")
            anno1=pd.DataFrame(
                index=group1+group2
            )
            anno1['condition']=['treatment' for i in group1]+['control' for i in group2]
            var=pd.DataFrame(index=self.data.index)
            var.index.name='gene_id'

            # build a DGEList object
            dge_list = DGEList(
                counts=self.data[group1+group2].values, 
                samples=anno1, 
                group_col="condition", 
                genes=var
            )
            design1 = dmatrix("~condition", data=anno1)
            dge_list.estimateGLMCommonDisp(design=design1)
            fit = dge_list.glmFit(design=design1)
            lrt = glmLRT(fit)
            lrt.index=var.index

            #	log2FoldChange	lfcSE	logCPM	stat	pvalue		
            # 
            pvalue=lrt['pvalue'].values.reshape(-1)
            qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                               method=multipletests_method, is_sorted=False, returnsorted=False)

            g1_mean=self.data[group1].mean(axis=1)
            g2_mean=self.data[group2].mean(axis=1)
            g=(g2_mean+g1_mean)/2
            g=g.loc[g>0].min()
            fold=(g1_mean+g)/(g2_mean+g)
            print(f"⏰ Start to calculate qvalue...")

            result = pd.DataFrame({'pvalue':pvalue,'qvalue':qvalue[1],'FoldChange':fold})
            result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
            result['BaseMean']=(g1_mean+g2_mean)/2
            result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
            result['log2FC'] = np.log2(result['FoldChange'])
            result['abs(log2FC)'] = abs(result['log2FC'])
            result['size']  =np.abs(result['log2FC'])/10
            result=result.loc[~result['pvalue'].isnull()]
            result['-log(pvalue)'] = -np.log10(result['pvalue'])
            result['-log(qvalue)'] = -np.log10(result['qvalue'])
            #max mean of between each value in group1 and group2
            #result=result[result['padj']<alpha]
            result['sig']='normal'
            result.loc[result['qvalue']<alpha,'sig']='sig'
            self.result=result
            print(f"✅ Differential expression analysis completed.")
            return result

        elif method == 'limma':
            try:
                from patsy import dmatrix
                from inmoose.limma import lmFit, makeContrasts, contrasts_fit, eBayes, topTable
            except:
                raise ImportError('Please install inmoose: `pip install inmoose`')
            print(f"⏰ Start to create DGEList...")
            anno1=pd.DataFrame(
                index=group1+group2
            )
            anno1['condition']=['treatment' for i in group1]+['control' for i in group2]

            # 3.1 构建设计矩阵
            design1 = dmatrix("~0 + condition", data=anno1)
            #    列名会是 ['condition[treatment]', 'condition[control]']

            # 3.2 lmFit 拟合线性模型
            #    输入: counts_df 行基因为基因,列为样本
            counts_df = self.data[group1+group2].values
            fit = lmFit(counts_df, design1)
            # 3.3 定义对比——treatment vs control
            contrast_matrix = makeContrasts(
                "condition[treatment] - condition[control]",
                levels=design1
            )

            # 3.4 contrasts_fit 应用对比
            fit_con = contrasts_fit(fit, contrast_matrix)

            # 3.5 经验贝叶斯调整
            print(f"⏰ Start to adjust pvalue...")
            fit_eb = eBayes(fit_con)

            g1_mean=self.data[group1].mean(axis=1)
            g2_mean=self.data[group2].mean(axis=1)
            g=(g2_mean+g1_mean)/2
            g=g.loc[g>0].min()
            fold=(g1_mean+g)/(g2_mean+g)

            pvalue=fit_eb.p_value.values.reshape(-1)
            qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                               method=multipletests_method, is_sorted=False, returnsorted=False)

            result = pd.DataFrame({'pvalue':pvalue,'qvalue':qvalue[1],'FoldChange':fold})
            result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
            result['BaseMean']=(g1_mean+g2_mean)/2
            result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
            result['log2FC'] = np.log2(result['FoldChange'])
            result['abs(log2FC)'] = abs(result['log2FC'])
            result['size']  =np.abs(result['log2FC'])/10
            result['sig']='normal'
            result=result.loc[~result['pvalue'].isnull()]
            result['-log(pvalue)'] = -np.log10(result['pvalue'])
            result['-log(qvalue)'] = -np.log10(result['qvalue'])
            #max mean of between each value in group1 and group2
            #result=result[result['padj']<alpha]
            result['sig']='normal'
            result.loc[result['qvalue']<alpha,'sig']='sig'

            result['F']=fit_eb.F.reshape(-1)
            result['t']=fit_eb.t.values.reshape(-1)
            self.result=result
            print(f"✅ Differential expression analysis completed.")
            return result

            # 3.6 提取结果




        else:  # This is where the "method" check (not pydeseq2 version check) ends
            raise ValueError('The method is not supported.')

__init__(raw_data)

Initialize the pyDEG class.

Parameters:

Name Type Description Default
raw_data pd.DataFrame

The raw data to be processed.

required

Returns:

Type Description
None

None

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def __init__(self,raw_data:pd.DataFrame) -> None:
    r"""Initialize the pyDEG class.

    Arguments:
        raw_data: The raw data to be processed.

    Returns:
        None
    """
    self.raw_data=raw_data
    self.data=raw_data.copy()

drop_duplicates_index()

Drop the duplicated index of data.

Returns:

Name Type Description
data pd.DataFrame

The data after dropping the duplicated index.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def drop_duplicates_index(self)->pd.DataFrame:
    r"""Drop the duplicated index of data.

    Returns:
        data: The data after dropping the duplicated index.
    """
    self.data=data_drop_duplicates_index(self.data)
    return self.data

normalize()

Normalize the data using DESeq2 method.

Returns:

Name Type Description
data pd.DataFrame

The normalized data.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def normalize(self)->pd.DataFrame:
    r"""Normalize the data using DESeq2 method.

    Returns:
        data: The normalized data.
    """
    self.size_factors=estimateSizeFactors(self.data)
    self.data=deseq2_normalize(self.data)
    return self.data

foldchange_set(fc_threshold=-1, pval_threshold=0.05, logp_max=6, fold_threshold=0)

Set fold-change and p-value thresholds to classify differentially expressed genes as up-regulated, down-regulated, or not significant.

Parameters:

Name Type Description Default
fc_threshold int

Absolute fold-change threshold. If set to -1, the threshold is calculated based on the histogram of log2 fold-changes. (-1)

-1
pval_threshold float

p-value threshold for determining significance. (0.05)

0.05
logp_max int

Maximum value for log-transformed p-values. (6)

6
fold_threshold int

Index of the histogram bin corresponding to the fold-change threshold (only applicable if fc_threshold=-1). (0)

0

Returns:

Type Description

None

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def foldchange_set(self,fc_threshold:int=-1,pval_threshold:float=0.05,logp_max:int=6,fold_threshold:int=0):
    r"""Set fold-change and p-value thresholds to classify differentially expressed genes as up-regulated, down-regulated, or not significant.

    Arguments:
        fc_threshold: Absolute fold-change threshold. If set to -1, the threshold is calculated based on the histogram of log2 fold-changes. (-1)
        pval_threshold: p-value threshold for determining significance. (0.05)
        logp_max: Maximum value for log-transformed p-values. (6)
        fold_threshold: Index of the histogram bin corresponding to the fold-change threshold (only applicable if fc_threshold=-1). (0)

    Returns:
        None
    """
    if fc_threshold==-1:
        foldp=np.histogram(self.result['log2FC'].dropna())
        foldchange=(foldp[1][np.where(foldp[1]>0)[0][fold_threshold]]+foldp[1][np.where(foldp[1]>0)[0][fold_threshold+1]])/2
    else:
        foldchange=fc_threshold
    print('... Fold change threshold: %s'%foldchange)
    fc_max,fc_min=foldchange,0-foldchange
    self.fc_max,self.fc_min=fc_max,fc_min
    self.pval_threshold=pval_threshold
    self.result['sig']='normal'
    self.result.loc[((self.result['log2FC']>fc_max)&(self.result['qvalue']<pval_threshold)),'sig']='up'
    self.result.loc[((self.result['log2FC']<fc_min)&(self.result['qvalue']<pval_threshold)),'sig']='down'
    self.result.loc[self.result['-log(qvalue)']>logp_max,'-log(qvalue)']=logp_max
    self.logp_max=logp_max

deg_analysis(group1, group2, method='DEseq2', alpha=0.05, multipletests_method='fdr_bh', n_cpus=8, cooks_filter=True, independent_filter=True)

Differential expression analysis.

Parameters:

Name Type Description Default
group1 list

The first group to be compared.

required
group2 list

The second group to be compared.

required
method str

The method to be used for differential expression analysis. - DEseq2: DEseq2 - ttest: ttest - wilcox: wilconx test

'DEseq2'
alpha float

The threshold of p-value.

0.05
multipletests_method str
  • bonferroni : one-step correction
  • sidak : one-step correction
  • holm-sidak : step down method using Sidak adjustments
  • holm : step-down method using Bonferroni adjustments
  • simes-hochberg : step-up method (independent)
  • hommel : closed method based on Simes tests (non-negative)
  • fdr_bh : Benjamini/Hochberg (non-negative)
  • fdr_by : Benjamini/Yekutieli (negative)
  • fdr_tsbh : two stage fdr correction (non-negative)
  • fdr_tsbky : two stage fdr correction (non-negative)
'fdr_bh'

Returns result: The result of differential expression analysis.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def deg_analysis(self,group1:list,group2:list,
             method:str='DEseq2',alpha:float=0.05,
             multipletests_method:str='fdr_bh',n_cpus:int=8,
             cooks_filter:bool=True, independent_filter:bool=True)->pd.DataFrame:
    r"""
    Differential expression analysis.

    Arguments:
        group1: The first group to be compared.
        group2: The second group to be compared.
        method: The method to be used for differential expression analysis.
            - `DEseq2`: DEseq2
            - `ttest`: ttest
            - `wilcox`: wilconx test
        alpha: The threshold of p-value.
        multipletests_method:
            - `bonferroni` : one-step correction
            - `sidak` : one-step correction
            - `holm-sidak` : step down method using Sidak adjustments
            - `holm` : step-down method using Bonferroni adjustments
            - `simes-hochberg` : step-up method  (independent)
            - `hommel` : closed method based on Simes tests (non-negative)
            - `fdr_bh` : Benjamini/Hochberg  (non-negative)
            - `fdr_by` : Benjamini/Yekutieli (negative)
            - `fdr_tsbh` : two stage fdr correction (non-negative)
            - `fdr_tsbky` : two stage fdr correction (non-negative)

    Returns
        result: The result of differential expression analysis.
    """
    from pydeseq2.dds import DeseqDataSet
    from pydeseq2.ds import DeseqStats
    from scipy.stats import ttest_ind
    from statsmodels.stats.multitest import multipletests
    print(f"⚙️ You are using {method} method for differential expression analysis.")
    if method=='ttest':

        data=self.data

        g1_mean=data[group1].mean(axis=1)
        g2_mean=data[group2].mean(axis=1)
        g=(g2_mean+g1_mean)/2
        g=g.loc[g>0].min()
        fold=(g1_mean+g)/(g2_mean+g)
        #log2fold=np.log2(fold)
        ttest = ttest_ind(data[group1].T.values, data[group2].T.values)
        pvalue=ttest[1]
        print(f"⏰ Start to calculate qvalue...")
        qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                           method=multipletests_method, is_sorted=False, returnsorted=False)
        #qvalue=fdrcorrection(np.nan_to_num(np.array(pvalue),0), alpha=0.05, method='indep', is_sorted=False)
        genearray = np.asarray(pvalue)
        result = pd.DataFrame({'pvalue':genearray,'qvalue':qvalue[1],'FoldChange':fold})
        result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
        result['BaseMean']=(g1_mean+g2_mean)/2
        result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
        result['log2FC'] = np.log2(result['FoldChange'])
        result['abs(log2FC)'] = abs(np.log2(result['FoldChange']))
        result['size']  =np.abs(result['FoldChange'])/10
        result=result.loc[~result['pvalue'].isnull()]
        result['-log(pvalue)'] = -np.log10(result['pvalue'])
        result['-log(qvalue)'] = -np.log10(result['qvalue'])
        #max mean of between each value in group1 and group2
        #result=result[result['padj']<alpha]
        result['sig']='normal'
        result.loc[result['qvalue']<alpha,'sig']='sig'
        print(f"✅ Differential expression analysis completed.")

        self.result=result
        return result
    elif method=='wilcox':
        raise ValueError('The method is not supported.')
        print(f"⚙️ You are using {method} method for differential expression analysis.")
    elif method=='DEseq2':
        import pydeseq2
        counts_df = self.data[group1+group2].T
        clinical_df = pd.DataFrame(index=group1+group2)

        clinical_df['condition'] = ['Treatment'] * len(group1) + ['Control'] * len(group2)
        print(f"⏰ Start to create DeseqDataSet...")
        # Determine pydeseq2 version and create the DeseqDataSet accordingly
        if pydeseq2.__version__ <= '0.3.5':
            dds = DeseqDataSet(
                counts=counts_df,
                clinical=clinical_df,
                design_factors="condition",  # compare samples based on "condition"
                ref_level=["condition", "Control"],
                refit_cooks=True,
                n_cpus=n_cpus,
            )
        elif pydeseq2.__version__ <= '0.4.1':
            if ad.__version__ > '0.10.8':
                raise ImportError(
                    'Please install the 0.10.8 version of anndata: `pip install anndata==0.10.8`.'
                )
            dds = DeseqDataSet(
                counts=counts_df,
                metadata=clinical_df,
                design_factors="condition",
                refit_cooks=True,
                n_cpus=n_cpus,
            )
        else:
            from pydeseq2.default_inference import DefaultInference
            inference = DefaultInference(n_cpus=n_cpus)
            dds = DeseqDataSet(
                counts=counts_df,
                metadata=clinical_df,
                design_factors="condition",
                refit_cooks=True,
                inference=inference,
            )

        dds.fit_size_factors()
        dds.fit_genewise_dispersions()
        dds.fit_dispersion_trend()
        dds.fit_dispersion_prior()
        print(f"logres_prior={dds.uns['_squared_logres']}, sigma_prior={dds.uns['prior_disp_var']}")
        dds.fit_MAP_dispersions()
        dds.fit_LFC()
        dds.calculate_cooks()
        if dds.refit_cooks:
            dds.refit()

        # Add the 'contrast' parameter here:
    # FIX: Adding version check for DeseqStats constructor
        if pydeseq2.__version__<='0.3.5':
            stat_res = DeseqStats(dds, alpha=alpha, cooks_filter=cooks_filter, independent_filter=independent_filter)
        elif pydeseq2.__version__ <= '0.4.1':
            # For newer PyDESeq2 versions that require the contrast parameter
            stat_res = DeseqStats(dds, contrast=["condition", "Treatment", "Control"], 
                                alpha=alpha, cooks_filter=cooks_filter, independent_filter=independent_filter)
            stat_res.run_wald_test()
            if stat_res.cooks_filter:
                stat_res._cooks_filtering()

            if stat_res.independent_filter:
                stat_res._independent_filtering()
            else:
                stat_res._p_value_adjustment()
        else:
            stat_res=DeseqStats(
                        dds,
                        contrast=["condition", "Treatment", "Control"], 
                        alpha=alpha,
                        cooks_filter=cooks_filter,
                        independent_filter=independent_filter,
                    )
            stat_res.run_wald_test()
            if stat_res.cooks_filter:
                stat_res._cooks_filtering()

            if stat_res.independent_filter:
                stat_res._independent_filtering()
            else:
                stat_res._p_value_adjustment()


        self.stat_res = stat_res
        stat_res.summary()
        result = stat_res.results_df
        result['qvalue'] = result['padj']
        result['-log(pvalue)'] = -np.log10(result['pvalue'])
        result['-log(qvalue)'] = -np.log10(result['padj'])
        result['BaseMean'] = result['baseMean']
        result['log2(BaseMean)'] = np.log2(result['baseMean'] + 1)
        result['log2FC'] = result['log2FoldChange']
        result['abs(log2FC)'] = abs(result['log2FC'])
        result['sig'] = 'normal'
        result.loc[result['qvalue'] < alpha, 'sig'] = 'sig'
        self.result = result
        print(f"✅ Differential expression analysis completed.")
        return result


    elif method == 'edgepy':
        try:
            from inmoose.data.pasilla import pasilla
            from inmoose.edgepy import DGEList, glmLRT, topTags
            from patsy import dmatrix
        except:
            raise ImportError('Please install inmoose: `pip install inmoose`')
        print(f"⏰ Start to create DGEList...")
        anno1=pd.DataFrame(
            index=group1+group2
        )
        anno1['condition']=['treatment' for i in group1]+['control' for i in group2]
        var=pd.DataFrame(index=self.data.index)
        var.index.name='gene_id'

        # build a DGEList object
        dge_list = DGEList(
            counts=self.data[group1+group2].values, 
            samples=anno1, 
            group_col="condition", 
            genes=var
        )
        design1 = dmatrix("~condition", data=anno1)
        dge_list.estimateGLMCommonDisp(design=design1)
        fit = dge_list.glmFit(design=design1)
        lrt = glmLRT(fit)
        lrt.index=var.index

        #	log2FoldChange	lfcSE	logCPM	stat	pvalue		
        # 
        pvalue=lrt['pvalue'].values.reshape(-1)
        qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                           method=multipletests_method, is_sorted=False, returnsorted=False)

        g1_mean=self.data[group1].mean(axis=1)
        g2_mean=self.data[group2].mean(axis=1)
        g=(g2_mean+g1_mean)/2
        g=g.loc[g>0].min()
        fold=(g1_mean+g)/(g2_mean+g)
        print(f"⏰ Start to calculate qvalue...")

        result = pd.DataFrame({'pvalue':pvalue,'qvalue':qvalue[1],'FoldChange':fold})
        result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
        result['BaseMean']=(g1_mean+g2_mean)/2
        result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
        result['log2FC'] = np.log2(result['FoldChange'])
        result['abs(log2FC)'] = abs(result['log2FC'])
        result['size']  =np.abs(result['log2FC'])/10
        result=result.loc[~result['pvalue'].isnull()]
        result['-log(pvalue)'] = -np.log10(result['pvalue'])
        result['-log(qvalue)'] = -np.log10(result['qvalue'])
        #max mean of between each value in group1 and group2
        #result=result[result['padj']<alpha]
        result['sig']='normal'
        result.loc[result['qvalue']<alpha,'sig']='sig'
        self.result=result
        print(f"✅ Differential expression analysis completed.")
        return result

    elif method == 'limma':
        try:
            from patsy import dmatrix
            from inmoose.limma import lmFit, makeContrasts, contrasts_fit, eBayes, topTable
        except:
            raise ImportError('Please install inmoose: `pip install inmoose`')
        print(f"⏰ Start to create DGEList...")
        anno1=pd.DataFrame(
            index=group1+group2
        )
        anno1['condition']=['treatment' for i in group1]+['control' for i in group2]

        # 3.1 构建设计矩阵
        design1 = dmatrix("~0 + condition", data=anno1)
        #    列名会是 ['condition[treatment]', 'condition[control]']

        # 3.2 lmFit 拟合线性模型
        #    输入: counts_df 行基因为基因,列为样本
        counts_df = self.data[group1+group2].values
        fit = lmFit(counts_df, design1)
        # 3.3 定义对比——treatment vs control
        contrast_matrix = makeContrasts(
            "condition[treatment] - condition[control]",
            levels=design1
        )

        # 3.4 contrasts_fit 应用对比
        fit_con = contrasts_fit(fit, contrast_matrix)

        # 3.5 经验贝叶斯调整
        print(f"⏰ Start to adjust pvalue...")
        fit_eb = eBayes(fit_con)

        g1_mean=self.data[group1].mean(axis=1)
        g2_mean=self.data[group2].mean(axis=1)
        g=(g2_mean+g1_mean)/2
        g=g.loc[g>0].min()
        fold=(g1_mean+g)/(g2_mean+g)

        pvalue=fit_eb.p_value.values.reshape(-1)
        qvalue = multipletests(np.nan_to_num(np.array(pvalue),0), alpha=0.5, 
                           method=multipletests_method, is_sorted=False, returnsorted=False)

        result = pd.DataFrame({'pvalue':pvalue,'qvalue':qvalue[1],'FoldChange':fold})
        result['MaxBaseMean']=np.max([g1_mean,g2_mean],axis=0)
        result['BaseMean']=(g1_mean+g2_mean)/2
        result['log2(BaseMean)']=np.log2((g1_mean+g2_mean)/2)
        result['log2FC'] = np.log2(result['FoldChange'])
        result['abs(log2FC)'] = abs(result['log2FC'])
        result['size']  =np.abs(result['log2FC'])/10
        result['sig']='normal'
        result=result.loc[~result['pvalue'].isnull()]
        result['-log(pvalue)'] = -np.log10(result['pvalue'])
        result['-log(qvalue)'] = -np.log10(result['qvalue'])
        #max mean of between each value in group1 and group2
        #result=result[result['padj']<alpha]
        result['sig']='normal'
        result.loc[result['qvalue']<alpha,'sig']='sig'

        result['F']=fit_eb.F.reshape(-1)
        result['t']=fit_eb.t.values.reshape(-1)
        self.result=result
        print(f"✅ Differential expression analysis completed.")
        return result

        # 3.6 提取结果




    else:  # This is where the "method" check (not pydeseq2 version check) ends
        raise ValueError('The method is not supported.')

ranking2gsea(rank_max=200, rank_min=274)

Ranking the result of dds data for gsea analysis

Parameters:

Name Type Description Default
rank_max int

The max rank of the result.

200
rank_min int

The min rank of the result.

274

Returns:

Name Type Description
rnk pd.DataFrame

The ranking result.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def ranking2gsea(self,rank_max:int=200,rank_min:int=274)->pd.DataFrame:
    r"""
    Ranking the result of dds data for gsea analysis

    Arguments:
        rank_max: The max rank of the result.
        rank_min: The min rank of the result.

    Returns:
        rnk: The ranking result.

    """


    result=self.result.copy()
    result['fcsign']=np.sign(result['log2FC'])
    result['logp']=-np.log10(result['pvalue'])
    result['metric']=result['logp']/result['fcsign']
    rnk=pd.DataFrame()
    rnk['gene_name']=result.index
    rnk['rnk']=result['metric'].values
    rnk=rnk.sort_values(by=['rnk'],ascending=False)
    k=1
    total=0
    for i in range(len(rnk)):
        if rnk.loc[i,'rnk']==np.inf: 
            total+=1
    #200跟274根据你的数据进行更改,保证inf比你数据最大的大,-inf比数据最小的小就好
    for i in range(len(rnk)):
        if rnk.loc[i,'rnk']==np.inf: 
            rnk.loc[i,'rnk']=rank_max+(total-k)
            k+=1
        elif rnk.loc[i,'rnk']==-np.inf: 
            rnk.loc[i,'rnk']=-(rank_min+k)
            k+=1
    return rnk

plot_volcano(figsize=(4, 4), pval_name='qvalue', fc_name='log2FC', title='', titlefont={'weight': 'normal', 'size': 14}, up_color='#e25d5d', down_color='#7388c1', normal_color='#d7d7d7', up_fontcolor='#e25d5d', down_fontcolor='#7388c1', normal_fontcolor='#d7d7d7', legend_bbox=(0.8, -0.2), legend_ncol=2, legend_fontsize=12, plot_genes=None, plot_genes_num=10, plot_genes_fontsize=10, ticks_fontsize=12, ax=None)

Generate a volcano plot for the differential gene expression analysis results.

Parameters:

Name Type Description Default
figsize tuple

The size of the generated figure. ((4,4))

(4, 4)
pval_name

Column name for p-values. ('qvalue')

'qvalue'
fc_name

Column name for fold changes. ('log2FC')

'log2FC'
title str

The title of the plot. ('')

''
titlefont dict

A dictionary of font properties for the plot title. ({'weight':'normal','size':14,})

{'weight': 'normal', 'size': 14}
up_color str

The color of the up-regulated genes in the plot. ('#e25d5d')

'#e25d5d'
down_color str

The color of the down-regulated genes in the plot. ('#7388c1')

'#7388c1'
normal_color str

The color of the non-significant genes in the plot. ('#d7d7d7')

'#d7d7d7'
up_fontcolor str

Font color for up-regulated gene labels. ('#e25d5d')

'#e25d5d'
down_fontcolor str

Font color for down-regulated gene labels. ('#7388c1')

'#7388c1'
normal_fontcolor str

Font color for normal gene labels. ('#d7d7d7')

'#d7d7d7'
legend_bbox tuple

A tuple containing the coordinates of the legend's bounding box. ((0.8, -0.2))

(0.8, -0.2)
legend_ncol int

The number of columns in the legend. (2)

2
legend_fontsize int

The font size of the legend. (12)

12
plot_genes list

A list of genes to be plotted on the volcano plot. (None)

None
plot_genes_num int

The number of genes to be plotted on the volcano plot. (10)

10
plot_genes_fontsize int

The font size of the genes to be plotted on the volcano plot. (10)

10
ticks_fontsize int

The font size of the ticks. (12)

12
ax

Matplotlib axis object. (None)

None

Returns:

Name Type Description
ax

The generated volcano plot.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def plot_volcano(self,figsize:tuple=(4,4),pval_name='qvalue',fc_name='log2FC',
                 title:str='',titlefont:dict={'weight':'normal','size':14,},
                 up_color:str='#e25d5d',down_color:str='#7388c1',normal_color:str='#d7d7d7',
                 up_fontcolor:str='#e25d5d',down_fontcolor:str='#7388c1',normal_fontcolor:str='#d7d7d7',
                 legend_bbox:tuple=(0.8, -0.2),legend_ncol:int=2,legend_fontsize:int=12,
                 plot_genes:list=None,plot_genes_num:int=10,plot_genes_fontsize:int=10,
                 ticks_fontsize:int=12,ax=None):
    r"""Generate a volcano plot for the differential gene expression analysis results.

    Arguments:
        figsize: The size of the generated figure. ((4,4))
        pval_name: Column name for p-values. ('qvalue')
        fc_name: Column name for fold changes. ('log2FC')
        title: The title of the plot. ('')
        titlefont: A dictionary of font properties for the plot title. ({'weight':'normal','size':14,})
        up_color: The color of the up-regulated genes in the plot. ('#e25d5d')
        down_color: The color of the down-regulated genes in the plot. ('#7388c1')
        normal_color: The color of the non-significant genes in the plot. ('#d7d7d7')
        up_fontcolor: Font color for up-regulated gene labels. ('#e25d5d')
        down_fontcolor: Font color for down-regulated gene labels. ('#7388c1')
        normal_fontcolor: Font color for normal gene labels. ('#d7d7d7')
        legend_bbox: A tuple containing the coordinates of the legend's bounding box. ((0.8, -0.2))
        legend_ncol: The number of columns in the legend. (2)
        legend_fontsize: The font size of the legend. (12)
        plot_genes: A list of genes to be plotted on the volcano plot. (None)
        plot_genes_num: The number of genes to be plotted on the volcano plot. (10)
        plot_genes_fontsize: The font size of the genes to be plotted on the volcano plot. (10)
        ticks_fontsize: The font size of the ticks. (12)
        ax: Matplotlib axis object. (None)

    Returns:
        ax: The generated volcano plot.

    """

    ax=volcano(result=self.result,pval_name=pval_name,fc_name=fc_name,pval_max=self.logp_max,
                   figsize=figsize,title=title,titlefont=titlefont,
                   up_color=up_color,down_color=down_color,normal_color=normal_color,
                   up_fontcolor=up_fontcolor,down_fontcolor=down_fontcolor,normal_fontcolor=normal_fontcolor,
                   legend_bbox=legend_bbox,legend_ncol=legend_ncol,legend_fontsize=legend_fontsize,plot_genes=plot_genes,
                   plot_genes_num=plot_genes_num,plot_genes_fontsize=plot_genes_fontsize,
                   ticks_fontsize=ticks_fontsize,ax=ax,
                   pval_threshold=self.pval_threshold,fc_max=self.fc_max,fc_min=self.fc_min)
    return ax
    '''
    fig, ax = plt.subplots(figsize=figsize)
    result=self.result.copy()
    #首先绘制正常基因
    ax.scatter(x=result[result['sig']=='normal']['log2FC'],
            y=result[result['sig']=='normal']['-log(qvalue)'],
            color=normal_color,#颜色
            alpha=.5,#透明度
            )
    #接着绘制上调基因
    ax.scatter(x=result[result['sig']=='up']['log2FC'],
            y=result[result['sig']=='up']['-log(qvalue)'],
            color=up_color,#选择色卡第15个颜色
            alpha=.5,#透明度
            )
    #绘制下调基因
    ax.scatter(x=result[result['sig']=='down']['log2FC'],
            y=result[result['sig']=='down']['-log(qvalue)'],
            color=down_color,#颜色
            alpha=.5,#透明度
            )

    ax.plot([result['log2FC'].min(),result['log2FC'].max()],#辅助线的x值起点与终点
            [-np.log10(self.pval_threshold),-np.log10(self.pval_threshold)],#辅助线的y值起点与终点
            linewidth=2,#辅助线的宽度
            linestyle="--",#辅助线类型:虚线
            color='black'#辅助线的颜色
    )
    ax.plot([self.fc_max,self.fc_max],
            [result['-log(qvalue)'].min(),result['-log(qvalue)'].max()],
            linewidth=2, 
            linestyle="--",
            color='black')
    ax.plot([self.fc_min,self.fc_min],
            [result['-log(qvalue)'].min(),result['-log(qvalue)'].max()],
            linewidth=2, 
            linestyle="--",
            color='black')
    #设置横标签与纵标签
    ax.set_ylabel(r'$-log_{10}(qvalue)$',titlefont)                                    
    ax.set_xlabel(r'$log_{2}FC$',titlefont)
    #设置标题
    ax.set_title(title,titlefont)

    #绘制图注
    #legend标签列表,上面的color即是颜色列表
    labels = ['up:{0}'.format(len(result[result['sig']=='up'])),
            'down:{0}'.format(len(result[result['sig']=='down']))]  
    #用label和color列表生成mpatches.Patch对象,它将作为句柄来生成legend
    color = [up_color,down_color]
    patches = [mpatches.Patch(color=color[i], label="{:s}".format(labels[i]) ) for i in range(len(color))] 

    ax.legend(handles=patches,
        bbox_to_anchor=legend_bbox, 
        ncol=legend_ncol,
        fontsize=legend_fontsize)

    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['bottom'].set_visible(True)
    ax.spines['left'].set_visible(True)

    from adjustText import adjust_text
    import adjustText

    if plot_genes is not None:
        hub_gene=plot_genes
    else:
        up_result=result.loc[result['sig']=='up']
        down_result=result.loc[result['sig']=='down']
        hub_gene=up_result.sort_values('qvalue').index[:plot_genes_num//2].tolist()+down_result.sort_values('qvalue').index[:plot_genes_num//2].tolist()

    color_dict={
    'up':up_fontcolor,
        'down':down_fontcolor,
        'normal':normal_fontcolor
    }

    texts=[ax.text(result.loc[i,'log2FC'], 
           result.loc[i,'-log(qvalue)'],
           i,
           fontdict={'size':plot_genes_fontsize,'weight':'bold','color':color_dict[result.loc[i,'sig']]}
           ) for i in hub_gene]

    if adjustText.__version__<='0.8':
        adjust_text(texts,only_move={'text': 'xy'},arrowprops=dict(arrowstyle='->', color='red'),)
    else:
        adjust_text(texts,only_move={"text": "xy", "static": "xy", "explode": "xy", "pull": "xy"},
                    arrowprops=dict(arrowstyle='->', color='red'))

    ax.set_xticks([round(i,2) for i in ax.get_xticks()[1:-1]],#获取x坐标轴内容
          [round(i,2) for i in ax.get_xticks()[1:-1]],#更新x坐标轴内容
          fontsize=ticks_fontsize,
          fontweight='normal'
          )
    return fig,ax
    '''

plot_boxplot(genes, treatment_groups, control_groups, log=True, treatment_name='Treatment', control_name='Control', figsize=(4, 3), palette=['#a64d79', '#674ea7'], title='Gene Expression', fontsize=12, legend_bbox=(1, 0.55), legend_ncol=1, **kwarg)

Plot the boxplot of genes from dds data

Parameters:

Name Type Description Default
genes list

The genes to plot.

required
treatment_groups list

The treatment groups.

required
control_groups list

The control groups.

required
figsize tuple

The figure size.

(4, 3)
palette list

The color palette.

['#a64d79', '#674ea7']
title str

The title of the plot.

'Gene Expression'
fontsize int

The fontsize of the plot.

12
legend_bbox tuple

The bbox of the legend.

(1, 0.55)
legend_ncol int

The number of columns of the legend.

1
**kwarg

Other arguments for plot_boxplot function.

{}

Returns:

Name Type Description
fig matplotlib.figure.Figure

The figure of the plot.

ax matplotlib.axes._axes.Axes

The axis of the plot.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def plot_boxplot(self,genes:list,treatment_groups:list,control_groups:list,
                 log:bool=True,
                 treatment_name:str='Treatment',control_name:str='Control',
                 figsize:tuple=(4,3),palette:list=["#a64d79","#674ea7"],
                 title:str='Gene Expression',fontsize:int=12,legend_bbox:tuple=(1, 0.55),legend_ncol:int=1,
                 **kwarg)->Tuple[matplotlib.figure.Figure,matplotlib.axes._axes.Axes]:
    r"""
    Plot the boxplot of genes from dds data

    Arguments:
        genes: The genes to plot.
        treatment_groups: The treatment groups.
        control_groups: The control groups.
        figsize: The figure size.
        palette: The color palette.
        title: The title of the plot.
        fontsize: The fontsize of the plot.
        legend_bbox: The bbox of the legend.
        legend_ncol: The number of columns of the legend.
        **kwarg: Other arguments for plot_boxplot function.

    Returns:
        fig: The figure of the plot.
        ax: The axis of the plot.
    """
    p_data=pd.DataFrame(columns=['Value','Gene','Type'])
    if log:
        for gene in genes:
            plot_data1=pd.DataFrame()
            plot_data1['Value']=np.log1p(self.data[treatment_groups].loc[gene].values)
            plot_data1['Gene']=gene
            plot_data1['Type']=treatment_name

            plot_data2=pd.DataFrame()
            plot_data2['Value']=np.log1p(self.data[control_groups].loc[gene].values)
            plot_data2['Gene']=gene
            plot_data2['Type']=control_name

            plot_data=pd.concat([plot_data1,plot_data2],axis=0)
            p_data=pd.concat([p_data,plot_data],axis=0)
    else:
        for gene in genes:
            plot_data1=pd.DataFrame()
            plot_data1['Value']=self.data[treatment_groups].loc[gene].values
            plot_data1['Gene']=gene
            plot_data1['Type']=treatment_name

            plot_data2=pd.DataFrame()
            plot_data2['Value']=self.data[control_groups].loc[gene].values
            plot_data2['Gene']=gene
            plot_data2['Type']=control_name

            plot_data=pd.concat([plot_data1,plot_data2],axis=0)
            p_data=pd.concat([p_data,plot_data],axis=0)

    fig,ax=plot_boxplot(p_data,hue='Type',x_value='Gene',y_value='Value',palette=palette,
                      figsize=figsize,fontsize=fontsize,title=title,
                      legend_bbox=legend_bbox,legend_ncol=legend_ncol, **kwarg)
    return fig,ax

omicverse.bulk.deseq2_normalize(data)

Normalize the data using DESeq2 method.

Parameters:

Name Type Description Default
data pd.DataFrame

The data to be normalized.

required

Returns:

Name Type Description
data pd.DataFrame

The normalized data.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def deseq2_normalize(data:pd.DataFrame)->pd.DataFrame:
    r"""Normalize the data using DESeq2 method.

    Arguments:
        data: The data to be normalized.

    Returns:
        data: The normalized data.

    """
    avg1=data.apply(np.log,axis=1).mean(axis=1).replace([np.inf,-np.inf],np.nan).dropna()
    data1=data.loc[avg1.index]
    data_log=data1.apply(np.log,axis=1)
    scale=data_log.sub(avg1.values,axis=0).median(axis=0).apply(np.exp)
    return data/scale

omicverse.bulk.estimateSizeFactors(data)

Estimate size factors for data normalization.

Parameters:

Name Type Description Default
data pd.DataFrame

A pandas DataFrame of gene expression data where rows correspond to samples and columns correspond to genes.

required

Returns:

Name Type Description
scale pd.Series

A pandas Series of size factors, one for each sample.

Examples:

>>> import pandas as pd
>>> import numpy as np
>>> import omicverse as ov
>>> data = pd.DataFrame(np.random.rand(100, 10), columns=list('abcdefghij'))
>>> size_factors = ov.bulk.estimateSizeFactors(data)
Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def estimateSizeFactors(data:pd.DataFrame)->pd.Series:
    r"""Estimate size factors for data normalization.

    Arguments:
        data: A pandas DataFrame of gene expression data where rows correspond to samples and columns correspond to genes.

    Returns:
        scale: A pandas Series of size factors, one for each sample.

    Examples:
        >>> import pandas as pd
        >>> import numpy as np
        >>> import omicverse as ov
        >>> data = pd.DataFrame(np.random.rand(100, 10), columns=list('abcdefghij'))
        >>> size_factors = ov.bulk.estimateSizeFactors(data)
    """
    avg1=data.apply(np.log,axis=1).mean(axis=1).replace([np.inf,-np.inf],np.nan).dropna()
    data1=data.loc[avg1.index]
    data_log=data1.apply(np.log,axis=1)
    scale=data_log.sub(avg1.values,axis=0).median(axis=0).apply(np.exp)
    return scale

omicverse.bulk.estimateDispersions(counts)

Estimate the dispersion parameter of the Negative Binomial distribution for each gene in the input count matrix.

Parameters:

Name Type Description Default
counts pd.DataFrame

Input count matrix with shape (n_genes, n_samples).

required

Returns:

Name Type Description
disp pd.Series

Array of dispersion values for each gene in the input count matrix.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def estimateDispersions(counts:pd.DataFrame)->pd.Series:
    r"""Estimate the dispersion parameter of the Negative Binomial distribution for each gene in the input count matrix.

    Arguments:
        counts: Input count matrix with shape (n_genes, n_samples).

    Returns:
        disp: Array of dispersion values for each gene in the input count matrix.
    """
    # Step 1: Calculate mean and variance of counts for each gene
    mean_counts = np.mean(counts, axis=1)
    var_counts = np.var(counts, axis=1)

    # Step 2: Fit trend line to variance-mean relationship using GLM
    mean_expr = sm.add_constant(np.log(mean_counts))
    mod = sm.GLM(np.log(var_counts), mean_expr, family=sm.families.Gamma())
    res = mod.fit()
    fitted_var = np.exp(res.fittedvalues)

    # Step 3: Calculate residual variance for each gene
    disp = fitted_var / var_counts

    return disp

omicverse.bulk.Matrix_ID_mapping(data, gene_ref_path)

Map gene IDs in the input data to gene symbols using a reference table.

Parameters:

Name Type Description Default
data pd.DataFrame

The input data containing gene IDs as index.

required
gene_ref_path str

The path to the reference table containing the mapping from gene IDs to gene symbols.

required

Returns:

Name Type Description
data pd.DataFrame

The input data with gene IDs mapped to gene symbols.

Source code in /Users/fernandozeng/miniforge3/envs/space/lib/python3.10/site-packages/omicverse/bulk/_Deseq2.py
def Matrix_ID_mapping(data:pd.DataFrame,gene_ref_path:str)->pd.DataFrame:
    r"""Map gene IDs in the input data to gene symbols using a reference table.

    Arguments:
        data: The input data containing gene IDs as index.
        gene_ref_path: The path to the reference table containing the mapping from gene IDs to gene symbols.

    Returns:
        data: The input data with gene IDs mapped to gene symbols.

    """

    pair=pd.read_csv(gene_ref_path,sep='\t',index_col=0)
    ret_gene=list(set(data.index.tolist()) & set(pair.index.tolist()))
    data=data.loc[ret_gene]
    #data=data_drop_duplicates_index(data)
    new_index=[]
    for i in ret_gene:
        a=pair.loc[i,'symbol']
        if str(a)=='nan':
            new_index.append(i)
        else:
            new_index.append(a)
    data.index=new_index
    return data