Skip to content

Api single2spatial

omicverse.bulk2single.Single2Spatial

Bases: object

Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
class Single2Spatial(object):

    def __init__(self,single_data:anndata.AnnData,
                 spatial_data:anndata.AnnData,
                 celltype_key:str,
                 spot_key:list=['xcoord','ycoord'],
                 top_marker_num=500,
                marker_used=True,gpu:Union[int,str]=0) -> None:
        """Init Single2Spatial model

        Arguments:
            single_data: the anndata object of single cell data
            spatial_data: the anndata object of spatial data 
            celltype_key: the key of cell type in `single_data.obs`
            spot_key: the key of spot in `spatial_data.obs` Default: ['xcoord','ycoord']
            top_marker_num: the number of top marker genes used in the model. Default: 500
            marker_used: whether use marker genes in the model. Default: True
            gpu: the gpu used in the model. Default: 0 if gpu is available, else cpu, mps is also supported
        """

        self.single_data = single_data
        self.spatial_data = spatial_data
        self.top_marker_num = top_marker_num
        self.marker_used = marker_used
        self.celltype_key=celltype_key
        if gpu=='mps' and torch.backends.mps.is_available():
            print('Note that mps may loss will be nan, used it when torch is supported')
            self.used_device = torch.device("mps")
        else:
            self.used_device = torch.device(f"cuda:{gpu}") if gpu >= 0 and torch.cuda.is_available() else torch.device('cpu')
        self.history=[]
        self.input_data = create_data_pyomic(self.single_data,self.spatial_data,
                                             celltype_key,spot_key,)

    def train(self,spot_num:int,
                    cell_num:int,
                    df_save_dir:str='save_model',
                    df_save_name:str='df',
                    max_cell_in_diff_spot_ratio=None,
                    k:int=10,
                    random_seed:int=112,
                    mul_train:int=1,save=True,
                    n_jobs:int=1,num_epochs=1000,batch_size=1000,predicted_size=32)->anndata.AnnData:
        """Train the model of single2spatial

        Arguments:
            spot_num: the number of spots in the spatial data predicted
            cell_num: the number of cells in each spot predicted
            df_save_dir: the directory to save the model
            df_save_name: the name of the model
            max_cell_in_diff_spot_ratio: the ratio of max cell number in different spot
            k: the number of nearest neighbors
            random_seed: the random seed
            mul_train: the number of times to train the model
            n_jobs: the number of jobs to run in parallel

        Returns:
            sp_adata: the anndata object of the predicted spatial data

        """
        # load data

        xtrain, ytrain = create_data(self.input_data['input_sc_meta'], 
                                     self.input_data['input_sc_data'], self.input_data["input_st_data"], 
                                     spot_num, cell_num,
                                     self.top_marker_num,
                                     self.marker_used, mul_train)
        df_runner = DFRunner(self.input_data['input_sc_data'], self.input_data['input_sc_meta'], 
                             self.input_data['input_st_data'], self.input_data['input_st_meta'], 
                             self.marker_used, self.top_marker_num, random_seed=random_seed,n_jobs=n_jobs,device=self.used_device)
        self.df_runner=df_runner
        df_meta, df_spot = self.df_runner.run(xtrain, ytrain, max_cell_in_diff_spot_ratio, k, df_save_dir, 
                                              df_save_name,num_epochs=num_epochs,batch_size=batch_size,predicted_size=predicted_size)

        if save:
            path_save = os.path.join(df_save_dir, f"{df_save_name}.pth")
            if not os.path.exists(df_save_dir):
                os.makedirs(df_save_dir)
            torch.save(df_runner.model.state_dict(), path_save)
            print(f"...save trained net in {path_save}.")

        sp_adata=anndata.AnnData(df_spot.T)
        sp_adata.obs=df_meta
        sp_adata.obs.set_index(sp_adata.obs['Cell'],inplace=True)
        sp_adata.obsm['X_spatial']=sp_adata.obs[['Cell_xcoord','Cell_ycoord']].values
        self.sp_adata=sp_adata
        return sp_adata
        #  save df
        os.makedirs(map_save_dir, exist_ok=True)
        meta_dir = os.path.join(map_save_dir, f'meta_{map_save_name}_{k}.csv')
        spot_dir = os.path.join(map_save_dir, f'data_{map_save_name}_{k}.csv')
        df_meta.to_csv(meta_dir)
        df_spot.to_csv(spot_dir)
        print(f"saving result to {meta_dir} and {spot_dir}")
        return df_meta, df_spot


        return df_meta, df_spot

    def save(self,df_save_dir:str='save_model',
                df_save_name:str='df',):
        """Save the model of single2spatial

        Arguments:
            df_save_dir: the directory to save the model
            df_save_name: the name of the model
        """

        path_save = os.path.join(df_save_dir, f"{df_save_name}.pth")
        if not os.path.exists(df_save_dir):
            os.makedirs(df_save_dir)
        torch.save(self.df_runner.model.state_dict(), path_save)
        print(f"...save trained net in {path_save}.")
        #print("Model have been saved to "+os.path.join(df_save_dir, f"{df_save_name}"))


    def load(self,modelsize,
                    df_load_dir:str='save_model/df',

                    max_cell_in_diff_spot_ratio=None,
                    k:int=10,
                    random_seed:int=112,
                    n_jobs:int=1,predicted_size=32)->anndata.AnnData:
        """Load the model of single2spatial

        Arguments:
            spot_num: the number of spots in the spatial data predicted
            cell_num: the number of cells in each spot predicted
            df_load_dir: the directory to load the model
            max_cell_in_diff_spot_ratio: the ratio of max cell number in different spot
            k: the number of nearest neighbors
            random_seed: the random seed
            mul_train: the number of times to train the model

        Returns:
            sp_adata: the anndata object of the predicted spatial data

        """
        #xtrain, ytrain = create_data(self.input_data['input_sc_meta'], 
        #                             self.input_data['input_sc_data'], self.input_data["input_st_data"], 
        #                             spot_num, cell_num,
        #                             self.top_marker_num,
        #                             self.marker_used, mul_train)
        df_runner = DFRunner(self.input_data['input_sc_data'], self.input_data['input_sc_meta'], 
                             self.input_data['input_st_data'], self.input_data['input_st_meta'], 
                             self.marker_used, self.top_marker_num, random_seed=random_seed,n_jobs=n_jobs)
        self.df_runner=df_runner
        df_meta, df_spot = self.df_runner.load(df_load_dir,modelsize,max_cell_in_diff_spot_ratio, k,
                                              predicted_size=predicted_size)

        sp_adata=anndata.AnnData(df_spot.T)
        sp_adata.obs=df_meta
        sp_adata.obs.set_index(sp_adata.obs['Cell'],inplace=True)
        sp_adata.obsm['X_spatial']=sp_adata.obs[['Cell_xcoord','Cell_ycoord']].values
        self.sp_adata=sp_adata
        return sp_adata
        #  save df
        os.makedirs(map_save_dir, exist_ok=True)
        meta_dir = os.path.join(map_save_dir, f'meta_{map_save_name}_{k}.csv')
        spot_dir = os.path.join(map_save_dir, f'data_{map_save_name}_{k}.csv')
        df_meta.to_csv(meta_dir)
        df_spot.to_csv(spot_dir)
        print(f"saving result to {meta_dir} and {spot_dir}")
        return df_meta, df_spot

    def spot_assess(self)->anndata.AnnData:
        """Assess the predicted spatial data

        Returns:
            sp_adata_spot: the anndata object of the predicted spatial data with spot-level information
        """

        # spot-level
        # calculate cell type proportion per spot
        prop = self.sp_adata.obs[['Cell', 'Cell_type', 'Spot']].pivot_table(index=['Spot'], columns=['Cell_type'], aggfunc='count',values = 'Cell', fill_value=0)
        prop = prop.div(prop.sum(axis=1), axis=0)
        prop.columns = pd.Index(list(prop.columns))
        prop['Spot_xcoord'] = np.array(pd.pivot_table(self.sp_adata.obs,values='Spot_xcoord',index=['Spot'])['Spot_xcoord'].values)
        prop['Spot_ycoord'] = np.array(pd.pivot_table(self.sp_adata.obs,values='Spot_ycoord',index=['Spot'])['Spot_ycoord'].values)

        # aggregate gene expression per spot
        pred_spot_new = self.sp_adata.to_df()
        genes = pred_spot_new.columns
        pred_spot_new['Spot'] = self.sp_adata.obs['Spot']
        pred_spot_mean = pred_spot_new.groupby('Spot')[genes].mean()

        sp_adata_spot=anndata.AnnData(pred_spot_mean)
        sp_adata_spot.obs=prop
        sp_adata_spot.obsm['X_spatial']=sp_adata_spot.obs[['Spot_xcoord','Spot_ycoord']].values
        return sp_adata_spot

__init__(single_data, spatial_data, celltype_key, spot_key=['xcoord', 'ycoord'], top_marker_num=500, marker_used=True, gpu=0)

Init Single2Spatial model

Parameters:

Name Type Description Default
single_data anndata.AnnData

the anndata object of single cell data

required
spatial_data anndata.AnnData

the anndata object of spatial data

required
celltype_key str

the key of cell type in single_data.obs

required
spot_key list

the key of spot in spatial_data.obs Default: ['xcoord','ycoord']

['xcoord', 'ycoord']
top_marker_num

the number of top marker genes used in the model. Default: 500

500
marker_used

whether use marker genes in the model. Default: True

True
gpu Union[int, str]

the gpu used in the model. Default: 0 if gpu is available, else cpu, mps is also supported

0
Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
def __init__(self,single_data:anndata.AnnData,
             spatial_data:anndata.AnnData,
             celltype_key:str,
             spot_key:list=['xcoord','ycoord'],
             top_marker_num=500,
            marker_used=True,gpu:Union[int,str]=0) -> None:
    """Init Single2Spatial model

    Arguments:
        single_data: the anndata object of single cell data
        spatial_data: the anndata object of spatial data 
        celltype_key: the key of cell type in `single_data.obs`
        spot_key: the key of spot in `spatial_data.obs` Default: ['xcoord','ycoord']
        top_marker_num: the number of top marker genes used in the model. Default: 500
        marker_used: whether use marker genes in the model. Default: True
        gpu: the gpu used in the model. Default: 0 if gpu is available, else cpu, mps is also supported
    """

    self.single_data = single_data
    self.spatial_data = spatial_data
    self.top_marker_num = top_marker_num
    self.marker_used = marker_used
    self.celltype_key=celltype_key
    if gpu=='mps' and torch.backends.mps.is_available():
        print('Note that mps may loss will be nan, used it when torch is supported')
        self.used_device = torch.device("mps")
    else:
        self.used_device = torch.device(f"cuda:{gpu}") if gpu >= 0 and torch.cuda.is_available() else torch.device('cpu')
    self.history=[]
    self.input_data = create_data_pyomic(self.single_data,self.spatial_data,
                                         celltype_key,spot_key,)

train(spot_num, cell_num, df_save_dir='save_model', df_save_name='df', max_cell_in_diff_spot_ratio=None, k=10, random_seed=112, mul_train=1, save=True, n_jobs=1, num_epochs=1000, batch_size=1000, predicted_size=32)

Train the model of single2spatial

Parameters:

Name Type Description Default
spot_num int

the number of spots in the spatial data predicted

required
cell_num int

the number of cells in each spot predicted

required
df_save_dir str

the directory to save the model

'save_model'
df_save_name str

the name of the model

'df'
max_cell_in_diff_spot_ratio

the ratio of max cell number in different spot

None
k int

the number of nearest neighbors

10
random_seed int

the random seed

112
mul_train int

the number of times to train the model

1
n_jobs int

the number of jobs to run in parallel

1

Returns:

Name Type Description
sp_adata anndata.AnnData

the anndata object of the predicted spatial data

Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
def train(self,spot_num:int,
                cell_num:int,
                df_save_dir:str='save_model',
                df_save_name:str='df',
                max_cell_in_diff_spot_ratio=None,
                k:int=10,
                random_seed:int=112,
                mul_train:int=1,save=True,
                n_jobs:int=1,num_epochs=1000,batch_size=1000,predicted_size=32)->anndata.AnnData:
    """Train the model of single2spatial

    Arguments:
        spot_num: the number of spots in the spatial data predicted
        cell_num: the number of cells in each spot predicted
        df_save_dir: the directory to save the model
        df_save_name: the name of the model
        max_cell_in_diff_spot_ratio: the ratio of max cell number in different spot
        k: the number of nearest neighbors
        random_seed: the random seed
        mul_train: the number of times to train the model
        n_jobs: the number of jobs to run in parallel

    Returns:
        sp_adata: the anndata object of the predicted spatial data

    """
    # load data

    xtrain, ytrain = create_data(self.input_data['input_sc_meta'], 
                                 self.input_data['input_sc_data'], self.input_data["input_st_data"], 
                                 spot_num, cell_num,
                                 self.top_marker_num,
                                 self.marker_used, mul_train)
    df_runner = DFRunner(self.input_data['input_sc_data'], self.input_data['input_sc_meta'], 
                         self.input_data['input_st_data'], self.input_data['input_st_meta'], 
                         self.marker_used, self.top_marker_num, random_seed=random_seed,n_jobs=n_jobs,device=self.used_device)
    self.df_runner=df_runner
    df_meta, df_spot = self.df_runner.run(xtrain, ytrain, max_cell_in_diff_spot_ratio, k, df_save_dir, 
                                          df_save_name,num_epochs=num_epochs,batch_size=batch_size,predicted_size=predicted_size)

    if save:
        path_save = os.path.join(df_save_dir, f"{df_save_name}.pth")
        if not os.path.exists(df_save_dir):
            os.makedirs(df_save_dir)
        torch.save(df_runner.model.state_dict(), path_save)
        print(f"...save trained net in {path_save}.")

    sp_adata=anndata.AnnData(df_spot.T)
    sp_adata.obs=df_meta
    sp_adata.obs.set_index(sp_adata.obs['Cell'],inplace=True)
    sp_adata.obsm['X_spatial']=sp_adata.obs[['Cell_xcoord','Cell_ycoord']].values
    self.sp_adata=sp_adata
    return sp_adata
    #  save df
    os.makedirs(map_save_dir, exist_ok=True)
    meta_dir = os.path.join(map_save_dir, f'meta_{map_save_name}_{k}.csv')
    spot_dir = os.path.join(map_save_dir, f'data_{map_save_name}_{k}.csv')
    df_meta.to_csv(meta_dir)
    df_spot.to_csv(spot_dir)
    print(f"saving result to {meta_dir} and {spot_dir}")
    return df_meta, df_spot


    return df_meta, df_spot

load(modelsize, df_load_dir='save_model/df', max_cell_in_diff_spot_ratio=None, k=10, random_seed=112, n_jobs=1, predicted_size=32)

Load the model of single2spatial

Parameters:

Name Type Description Default
spot_num

the number of spots in the spatial data predicted

required
cell_num

the number of cells in each spot predicted

required
df_load_dir str

the directory to load the model

'save_model/df'
max_cell_in_diff_spot_ratio

the ratio of max cell number in different spot

None
k int

the number of nearest neighbors

10
random_seed int

the random seed

112
mul_train

the number of times to train the model

required

Returns:

Name Type Description
sp_adata anndata.AnnData

the anndata object of the predicted spatial data

Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
def load(self,modelsize,
                df_load_dir:str='save_model/df',

                max_cell_in_diff_spot_ratio=None,
                k:int=10,
                random_seed:int=112,
                n_jobs:int=1,predicted_size=32)->anndata.AnnData:
    """Load the model of single2spatial

    Arguments:
        spot_num: the number of spots in the spatial data predicted
        cell_num: the number of cells in each spot predicted
        df_load_dir: the directory to load the model
        max_cell_in_diff_spot_ratio: the ratio of max cell number in different spot
        k: the number of nearest neighbors
        random_seed: the random seed
        mul_train: the number of times to train the model

    Returns:
        sp_adata: the anndata object of the predicted spatial data

    """
    #xtrain, ytrain = create_data(self.input_data['input_sc_meta'], 
    #                             self.input_data['input_sc_data'], self.input_data["input_st_data"], 
    #                             spot_num, cell_num,
    #                             self.top_marker_num,
    #                             self.marker_used, mul_train)
    df_runner = DFRunner(self.input_data['input_sc_data'], self.input_data['input_sc_meta'], 
                         self.input_data['input_st_data'], self.input_data['input_st_meta'], 
                         self.marker_used, self.top_marker_num, random_seed=random_seed,n_jobs=n_jobs)
    self.df_runner=df_runner
    df_meta, df_spot = self.df_runner.load(df_load_dir,modelsize,max_cell_in_diff_spot_ratio, k,
                                          predicted_size=predicted_size)

    sp_adata=anndata.AnnData(df_spot.T)
    sp_adata.obs=df_meta
    sp_adata.obs.set_index(sp_adata.obs['Cell'],inplace=True)
    sp_adata.obsm['X_spatial']=sp_adata.obs[['Cell_xcoord','Cell_ycoord']].values
    self.sp_adata=sp_adata
    return sp_adata
    #  save df
    os.makedirs(map_save_dir, exist_ok=True)
    meta_dir = os.path.join(map_save_dir, f'meta_{map_save_name}_{k}.csv')
    spot_dir = os.path.join(map_save_dir, f'data_{map_save_name}_{k}.csv')
    df_meta.to_csv(meta_dir)
    df_spot.to_csv(spot_dir)
    print(f"saving result to {meta_dir} and {spot_dir}")
    return df_meta, df_spot

save(df_save_dir='save_model', df_save_name='df')

Save the model of single2spatial

Parameters:

Name Type Description Default
df_save_dir str

the directory to save the model

'save_model'
df_save_name str

the name of the model

'df'
Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
def save(self,df_save_dir:str='save_model',
            df_save_name:str='df',):
    """Save the model of single2spatial

    Arguments:
        df_save_dir: the directory to save the model
        df_save_name: the name of the model
    """

    path_save = os.path.join(df_save_dir, f"{df_save_name}.pth")
    if not os.path.exists(df_save_dir):
        os.makedirs(df_save_dir)
    torch.save(self.df_runner.model.state_dict(), path_save)
    print(f"...save trained net in {path_save}.")

spot_assess()

Assess the predicted spatial data

Returns:

Name Type Description
sp_adata_spot anndata.AnnData

the anndata object of the predicted spatial data with spot-level information

Source code in /Users/fernandozeng/miniforge3/envs/scbasset/lib/python3.8/site-packages/omicverse/bulk2single/_single2spatial.py
def spot_assess(self)->anndata.AnnData:
    """Assess the predicted spatial data

    Returns:
        sp_adata_spot: the anndata object of the predicted spatial data with spot-level information
    """

    # spot-level
    # calculate cell type proportion per spot
    prop = self.sp_adata.obs[['Cell', 'Cell_type', 'Spot']].pivot_table(index=['Spot'], columns=['Cell_type'], aggfunc='count',values = 'Cell', fill_value=0)
    prop = prop.div(prop.sum(axis=1), axis=0)
    prop.columns = pd.Index(list(prop.columns))
    prop['Spot_xcoord'] = np.array(pd.pivot_table(self.sp_adata.obs,values='Spot_xcoord',index=['Spot'])['Spot_xcoord'].values)
    prop['Spot_ycoord'] = np.array(pd.pivot_table(self.sp_adata.obs,values='Spot_ycoord',index=['Spot'])['Spot_ycoord'].values)

    # aggregate gene expression per spot
    pred_spot_new = self.sp_adata.to_df()
    genes = pred_spot_new.columns
    pred_spot_new['Spot'] = self.sp_adata.obs['Spot']
    pred_spot_mean = pred_spot_new.groupby('Spot')[genes].mean()

    sp_adata_spot=anndata.AnnData(pred_spot_mean)
    sp_adata_spot.obs=prop
    sp_adata_spot.obsm['X_spatial']=sp_adata_spot.obs[['Spot_xcoord','Spot_ycoord']].values
    return sp_adata_spot