import re
import gzip
import os
import numpy as np
import scipy.sparse as ss
import scipy.sparse.csgraph as ssc
import pandas as pd
from .network import is_symmetric
import logging
logger = logging.getLogger(__name__)
[docs]def get_matching_data_and_network(data, network):
"""Interesct data labels with network node labels."""
matching_genes = (network.labels.index & data.columns).tolist()
data_network = network.get_sub_network(matching_genes)
genes = data_network.labels.sort_values().index
data_filtered = data[genes]
return data_filtered, data_network
[docs]def get_pathway_inducer(network, gene_set, normed=True):
"""Get a laplacian based pathway inducer."""
index = network.labels.loc[network.labels.index.intersection(gene_set)
].values.astype(np.int64)
m = len(index)
laplacian = None
if m > 1:
row = np.concatenate([np.repeat(i, m) for i in index])
col = np.concatenate([index for _ in range(m)])
values = np.ones(m * m)
mask = ss.coo_matrix((values, (row, col)),
shape=network.graph.shape).tocsr()
pathway_graph = mask.multiply(network.graph).tocsr()
if not is_symmetric(pathway_graph):
raise RuntimeError('Error: adjacency matrix is not symmetric')
laplacian = ssc.laplacian(pathway_graph, normed=normed).tocsr()
if len(laplacian.nonzero()[0]) < 1: # invalid inducer
laplacian = None
if laplacian is None:
logger.debug('invalid laplacian')
return laplacian
[docs]def get_pathway_selector(network, gene_set):
"""Get a pathway selector."""
index = network.labels.loc[network.labels.index.intersection(gene_set)
].values.astype(np.int64)
m = len(index)
if m > 1:
values = np.ones(m)
return ss.coo_matrix(
(values, (index, index)), shape=network.graph.shape
).tocsr()
else:
return None
more_than_one_whitespace = re.compile(r'\s+')
[docs]def read_gmt_from_file_pointer(fp):
sets = {}
for line in fp:
splitted = more_than_one_whitespace.split(line.strip().decode())
sets[splitted[0]] = set(splitted[2:])
return sets
[docs]def read_gmt(gmt_file):
"""Read a .gmt file."""
sets = {}
with open(gmt_file, 'rb') as fp:
sets = read_gmt_from_file_pointer(fp)
return sets
[docs]def write_inducer(inducer, filename, sep=','):
"""Write and inducer in COO format."""
inducer = inducer.tocoo()
with gzip.open(filename, 'wt') as fp:
for triplet in zip(inducer.row, inducer.col, inducer.data):
fp.write(sep.join(map(str, triplet)) + os.linesep)
[docs]def read_inducer(filename, size, header=None, sep=','):
"""Read inducer in CSC format."""
inducer = pd.read_csv(filename, header=header, sep=',')
inducer.columns = ['row', 'column', 'data']
return ss.coo_matrix(
(inducer['data'], (inducer['row'], inducer['column'])),
shape=(size, size)
).tocsc()
[docs]def write_inducers(
data,
network,
gene_sets,
data_type,
network_type,
output_dir,
selection_only=False,
gene_set_type=''
):
"""Write inducers and data for a specific data-network combination."""
data_filtered, data_network = get_matching_data_and_network(data, network)
suffix = (
'{}-selector'.format(network_type) if selection_only else network_type
)
# save inducers
for name, gene_set in gene_sets.items():
L = (
get_pathway_selector(data_network, gene_set=gene_set)
if selection_only else
get_pathway_inducer(data_network, gene_set=gene_set)
)
if L is not None:
write_inducer(
L, '{}/{}_{}_{}.csv.gz'.format(
output_dir, name.lower(), data_type, suffix
)
)
if len(gene_set_type) > 0:
suffix = '{}-{}'.format(suffix, gene_set_type)
# save the data
data_filtered.to_csv('{}/{}_{}.csv'.format(output_dir, data_type, suffix))
[docs]def write_preprocessed(
data, data_name, network, network_name, gene_sets, gene_sets_name,
output_dir
):
"""Write inducers and data for a specific data-network combination."""
# save inducers
for name, gene_set in gene_sets.items():
L = (get_pathway_inducer(network, gene_set=gene_set))
if L is not None:
write_inducer(
L, '{}/{}_{}_{}_{}.csv.gz'.format(
output_dir,
'inducer',
data_name,
network_name,
name,
)
)
# save the data
data.to_csv(
'{}/{}_{}_{}_{}.csv'.format(
output_dir, 'data', data_name, network_name, gene_sets_name
)
)