Source code for elaspic.elaspic_sequence

# -*- coding: utf-8 -*-
import re
import os
import os.path as op
import requests
import psutil
import time
import shutil
import logging
import atexit
import six

from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord

from . import conf, helper, errors

logger = logging.getLogger(__name__)
configs = conf.Configs()


# %% Sequence tools
[docs]def download_uniport_sequence(uniprot_id, output_dir): """ """ output_file = op.join(output_dir, uniprot_id + '.fasta') # If the file already exists, do nothing... if op.isfile(output_file): logger.debug('Sequence file {} already exists...'.format(output_file)) return output_file logger.debug('Downloading sequence {}...'.format(uniprot_id + '.fasta')) address = 'http://www.uniprot.org/uniprot/{}.fasta'.format(uniprot_id) r = requests.get(address) if r.status_code == 200: with open(output_file, 'w') as ofh: ofh.write(r.text) return output_file
[docs]def convert_basestring_to_seqrecord(sequence, sequence_id='id'): if any([isinstance(sequence, string_type) for string_type in six.string_types]): seqrec = SeqRecord(Seq(sequence), id=str(sequence_id)) elif isinstance(sequence, Seq): seqrec = SeqRecord(sequence, id=str(sequence_id)) elif isinstance(sequence, SeqRecord): seqrec = sequence else: raise Exception("Wrong class type %s for ``sequence``" % str(type(sequence))) return seqrec # %%
class Sequence: """ Class for calculating sequence level features. """ def __init__(self, sequence_file, provean_supset_file=None): """ Parameters ---------- sequence_file : str Full filename of the file containing the protein sequence. Only fasta format is supported. provean_supset_file : str Full path and """ logger.debug('Initialising a Sequence instance with parameters:') logger.debug('sequence_file: {}'.format(sequence_file)) logger.debug('provean_supset_file: {}'.format(provean_supset_file)) self.sequence_file = sequence_file self.seqrecord = SeqIO.read(self.sequence_file, 'fasta') self.protein_id = helper.slugify(self.seqrecord.id) self.sequence = str(self.seqrecord.seq) # Provean supset if provean_supset_file is not None and provean_supset_file != self.provean_supset_file: shutil.copy(provean_supset_file, self.provean_supset_file) shutil.copy(provean_supset_file, self.provean_supset_file + '.fasta') if not self.provean_supset_exists: logger.debug('Calculating provean supset...') self._build_provean_supset() else: logger.debug('Provean supset is already calculated!') self.provean_supset_length = self._get_provean_supset_length() # Mutations self.mutations = {}
[docs] def mutate(self, mutation): if mutation in self.mutations: return self.mutations[mutation] if mutation[0] != self.sequence[int(mutation[1:-1])-1]: logger.error('sequence: {}'.format(self.sequence)) logger.error('mutation: {}'.format(mutation)) raise errors.MutationMismatchError() results = dict( protein_id=self.protein_id, mutation=mutation, provean_score=self._run_provean(mutation), matrix_score=self.score_pairwise(mutation[0], mutation[-1]) ) return results
@property def provean_supset_file(self): return op.join( configs['sequence_dir'], helper.slugify(self.protein_id + '_provean_supset') ) @property def provean_supset_exists(self): return ( op.isfile(self.provean_supset_file) and op.isfile(self.provean_supset_file + '.fasta') ) @property def result(self): result = dict( protein_id=self.protein_id, sequence=self.sequence, sequence_file=self.sequence_file, provean_supset_exists=self.provean_supset_exists, provean_supset_file=self.provean_supset_file, provean_supset_length=self.provean_supset_length, mutations=self.mutations, ) return result def _build_provean_supset(self): """ """ logger.debug('Building Provean supporting set. This might take a while...') atexit.register(_clear_provean_temp) # Get the required parameters any_position = 0 while self.sequence[any_position] not in helper.canonical_amino_acids: any_position += 1 first_aa = self.sequence[any_position] mutation = '{0}{1}{0}'.format(first_aa, any_position+1) # Run provean provean_score = self._run_provean( mutation, save_supporting_set=True, check_mem_usage=True ) return provean_score # provean_supset_length = None # for line in result.split('\n'): # if 'Number of supporting sequences used:' in line: # provean_supset_length = int(line.split()[-1]) # if provean_supset_length is None: # logger.error('Provean supporting set length could not be estimated. # This is an error!') # logger.error('Provean result: {}'.format(result)) # logger.error('Provean error_message: {}'.format(error_message)) # logger.error('Provean return_code: {}'.format(return_code)) # logger.error('Protein sequence: {}'.format(self.sequence)) # logger.error('Protein mutation: {}'.format(mutation)) # # logger.info('Provean supset length: {}'.format(provean_supset_length)) # return provean_supset_length def _get_provean_supset_length(self): provean_supset_length = 0 with open(self.provean_supset_file) as fh: for line in fh: if line and not line.startswith('#'): provean_supset_length += 1 return provean_supset_length def _run_provean(self, mutation, save_supporting_set=False, check_mem_usage=False): """ Provean results look something like this:: #[23:28:34] clustering subject sequences... #[23:28:34] selecting clusters... #[23:28:34] 0 subject sequences in 0 clusters were selected for supporting sequences. #[23:28:34] use the query itself as a supporting sequence #[23:28:34] loading subject sequences from a FASTA file... #[23:28:34] scores were computed based on the query sequence itself. ## Number of clusters: 1 ## Number of supporting sequences used: 1 #[23:28:34] computing delta alignment scores... #[23:28:34] printing PROVEAN scores... ### PROVEAN scores ## ## VARIATION SCORE #M1A -6.000 Parameters ---------- domain_mutation : string Mutation in domain coordinates (i.e. relative to the start of the domain) Returns ------- list [result, error_message, return_code] -- The output from running a provean system command. Raises ------ errors.ProveanError Can raise this exception only if ``check_mem_usage`` is set to ``True``. """ if check_mem_usage: # Get initial measurements of how much virtual memory and disk space is availible disk_space_availible = psutil.disk_usage(configs['provean_temp_dir']).free / (1024**3) logger.debug('Disk space availible: {:.2f} GB'.format(disk_space_availible)) if disk_space_availible < 5: raise errors.ProveanError( 'Not enough disk space ({:.2f} GB) to run provean' .format(disk_space_availible)) memory_availible = psutil.virtual_memory().available / float(1024)**3 logger.debug('Memory availible: {:.2f} GB'.format(memory_availible)) if memory_availible < 0.5: raise errors.ProveanError( 'Not enough memory ({:.2f} GB) to run provean' .format(memory_availible)) # Create a file with mutation mutation_file = op.join(configs['sequence_dir'], '{}.var'.format(mutation)) with open(mutation_file, 'w') as ofh: ofh.write(mutation) # Run provean system_command = ( "provean " + " -q '{}' ".format(self.sequence_file) + " -v '{}' ".format(mutation_file) + " -d " + op.join(configs['blast_db_dir'], 'nr') + " --tmp_dir '{}' ".format(configs['provean_temp_dir']) + " --num_threads {} ".format(configs['n_cores']) + " --psiblast '{}' ".format(helper.get_which('psiblast')) + " --blastdbcmd '{}' ".format(helper.get_which('blastdbcmd')) + " --cdhit '{}' ".format(helper.get_which('cd-hit')) ) if self.provean_supset_exists: # use supporting set system_command += " --supporting_set '{}' ".format(self.provean_supset_file) else: system_command += " --save_supporting_set '{}' ".format(self.provean_supset_file) logger.debug(system_command) child_process = helper.run_subprocess_locally(configs['sequence_dir'], system_command) logger.debug('Parent group id: {}'.format(os.getpgrp())) child_process_group_id = os.getpgid(child_process.pid) logger.debug('Child group id: {}'.format(child_process_group_id)) # Keep an eye on provean to make sure it doesn't do anything crazy time.sleep(5) while check_mem_usage and child_process.poll() is None: disk_space_availible_now = ( psutil.disk_usage(configs['provean_temp_dir']).free / float(1024)**3 ) if disk_space_availible_now < 5: # less than 5 GB of free disk space left raise errors.ProveanResourceError( 'Ran out of disk space and provean had to be terminated ({} GB used)' .format(disk_space_availible-disk_space_availible_now), child_process_group_id) memory_availible_now = ( psutil.virtual_memory().available / float(1024)**3 ) if memory_availible_now < 0.5: raise errors.ProveanResourceError( 'Ran out of RAM and provean had to be terminated ({} GB left)' .format(memory_availible - memory_availible_now), child_process_group_id) time.sleep(60) # Wait for 1 minute before checking again # Collect the results and check for errors result, error_message, return_code = helper.subprocess_communicate(child_process) logger.debug(result) # Extract provean score from the results message provean_score = None result_list = result.split('\n') for i in range(len(result_list)): if re.findall('# VARIATION\s*SCORE', result_list[i]): provean_score = float(result_list[i+1].split()[-1]) break if return_code != 0 or provean_score is None: logger.error('return_code: {}'.format(return_code)) logger.error('provean_score: {}'.format(provean_score)) logger.error('error_message: {}'.format(error_message)) raise errors.ProveanError(error_message) return provean_score # === Other sequence scores ===
[docs] def score_pairwise(self, seq1, seq2, matrix=None, gap_s=None, gap_e=None): """Get the BLOSUM (or what ever matrix is given) score. """ matrix = matrix or configs['matrix'] gap_s = gap_s or configs['gap_start'] gap_e = gap_e or configs['gap_extend'] score = 0 gap = False for i in range(len(seq1)): pair = (seq1[i], seq2[i]) if not gap: if '-' in pair: gap = True score += gap_s else: score += self._score_match(pair, matrix) else: if '-' not in pair: gap = False score += self._score_match(pair, matrix) else: score += gap_e return score
def _score_match(self, pair_match, matrix_match): """ """ if pair_match not in matrix_match: return matrix_match[(tuple(reversed(pair_match)))] else: return matrix_match[pair_match] def _clear_provean_temp(): provean_temp_dir = configs['provean_temp_dir'] logger.info("Clearning provean temporary files from '{}'...".format(provean_temp_dir)) for filename in os.listdir(provean_temp_dir): file_path = os.path.join(provean_temp_dir, filename) try: if os.path.isfile(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(e)