Source code for elaspic.structure_analysis

# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import absolute_import
from builtins import zip
from builtins import object

import os
import os.path as op
import time
import subprocess
import logging
from collections import OrderedDict

import six
import pandas as pd

from Bio.PDB import PDBIO
from Bio.PDB.PDBParser import PDBParser

from . import errors, helper, structure_tools

logger = logging.getLogger(__name__)


# %% CONSTANTS

# Standard accessibilities for a ALA-X-ALA tripeptide
# (obtained from NACCESS)
STANDARD_DATA = """
STANDARD ACCESSIBILITES FOR PROBE 1.40 AND RADII vdw.radii
ATOM S   2  ALA  107.95   0.0  69.41   0.0   0.00   0.0  69.41   0.0  38.54   0.0  71.38   0.0  36.58   0.0
ATOM S   2  CYS  134.28   0.0  96.75   0.0   0.00   0.0  96.75   0.0  37.53   0.0  97.93   0.0  36.35   0.0
ATOM S   2  ASP  140.39   0.0  48.00   0.0  54.69   0.0 102.69   0.0  37.70   0.0  49.24   0.0  91.15   0.0
ATOM S   2  GLU  172.25   0.0  59.10   0.0  75.64   0.0 134.74   0.0  37.51   0.0  60.29   0.0 111.96   0.0
ATOM S   2  PHE  199.48   0.0 164.11   0.0   0.00   0.0 164.11   0.0  35.37   0.0 165.25   0.0  34.23   0.0
ATOM S   2  GLY   80.10   0.0  32.33   0.0   0.00   0.0  32.33   0.0  47.77   0.0  37.55   0.0  42.55   0.0
ATOM S   2  HIS  182.88   0.0  96.01   0.0  51.07   0.0 147.08   0.0  35.80   0.0  97.15   0.0  85.73   0.0
ATOM S   2  ILE  175.12   0.0 137.96   0.0   0.00   0.0 137.96   0.0  37.16   0.0 139.14   0.0  35.98   0.0
ATOM S   2  LYS  200.81   0.0 115.38   0.0  47.92   0.0 163.30   0.0  37.51   0.0 116.57   0.0  84.24   0.0
ATOM S   2  LEU  178.63   0.0 141.12   0.0   0.00   0.0 141.12   0.0  37.51   0.0 142.31   0.0  36.32   0.0
ATOM S   2  MET  194.15   0.0 156.64   0.0   0.00   0.0 156.64   0.0  37.51   0.0 157.84   0.0  36.32   0.0
ATOM S   2  ASN  143.94   0.0  44.98   0.0  61.26   0.0 106.24   0.0  37.70   0.0  46.23   0.0  97.72   0.0
ATOM S   2  PRO  136.13   0.0 119.90   0.0   0.00   0.0 119.90   0.0  16.23   0.0 120.95   0.0  15.19   0.0
ATOM S   2  GLN  178.50   0.0  51.03   0.0  89.96   0.0 140.99   0.0  37.51   0.0  52.22   0.0 126.28   0.0
ATOM S   2  ARG  238.76   0.0  76.60   0.0 124.65   0.0 201.25   0.0  37.51   0.0  77.80   0.0 160.97   0.0
ATOM S   2  SER  116.50   0.0  46.89   0.0  31.22   0.0  78.11   0.0  38.40   0.0  48.55   0.0  67.95   0.0
ATOM S   2  THR  139.27   0.0  74.54   0.0  27.17   0.0 101.70   0.0  37.57   0.0  75.72   0.0  63.55   0.0
ATOM S   2  VAL  151.44   0.0 114.28   0.0   0.00   0.0 114.28   0.0  37.16   0.0 115.47   0.0  35.97   0.0
ATOM S   2  TRP  249.36   0.0 187.67   0.0  23.60   0.0 211.26   0.0  38.10   0.0 189.67   0.0  59.69   0.0
ATOM S   2  TYR  212.76   0.0 135.35   0.0  42.03   0.0 177.38   0.0  35.38   0.0 136.50   0.0  76.26   0.0
"""
STANDARD_SASA_ALL = [[l.strip() for l in line.split()] for line in STANDARD_DATA.strip().split('\n')[1:]]
STANDARD_SASA = {x[3]: float(x[4]) for x in STANDARD_SASA_ALL}


# %% Init
class AnalyzeStructure(object):
    """
    Runs the program pops to calculate the interface size of the complexes
    This is done by calculating the surface of the complex and the seperated parts.
    The interface is then given by the substracting.
    """

    def __init__(self, pdb_file, working_dir, vdw_distance=5.0, min_contact_distance=4.0):
        self.pdb_file = pdb_file
        #: Folder with all the binaries (i.e. ./analyze_structure)
        self.working_dir = working_dir
        self.vdw_distance = vdw_distance
        self.min_contact_distance = min_contact_distance

        self._prepare_temp_folder(self.working_dir)

        self.sp = structure_tools.StructureParser(pdb_file)
        self.sp.extract()
        self.sp.save_structure(output_dir=self.working_dir)

        self.chain_ids = self.sp.chain_ids


    def _prepare_temp_folder(self, temp_folder):
        # ./analyze_structure
        os.makedirs(temp_folder, exist_ok=True)



    #%%
    def __call__(self, chain_id, mutation, chain_id_other=None):
        """Calculate all properties.
        """
        # Solvent accessibility
        (seasa_by_chain_together, seasa_by_chain_separately,
         seasa_by_residue_together, seasa_by_residue_separately) = self.get_seasa()
        seasa_info = (
            seasa_by_residue_separately[
                (seasa_by_residue_separately['pdb_chain'] == chain_id) &
                (seasa_by_residue_separately['res_num'] == mutation[1:-1])
            ].iloc[0]
        )
        self._validate_mutation(seasa_info['res_name'], mutation)
        solvent_accessibility = seasa_info['rel_sasa']

        # Secondary structure
        secondary_structure_df = self.get_secondary_structure()
        secondary_structure_df = (
            secondary_structure_df[
                (secondary_structure_df.chain == chain_id) &
                (secondary_structure_df.resnum == mutation[1:-1])
            ]
        )
        assert len(secondary_structure_df) == 1
        secondary_structure_df = secondary_structure_df.iloc[0]
        self._validate_mutation(seasa_info['res_name'], mutation)
        secondary_structure = secondary_structure_df.ss_code

        # Contact distance
        contact_distance = None
        if chain_id_other is not None:
            try:
                contact_distance = self.get_interchain_distances(chain_id, mutation)
                contact_distance = contact_distance[chain_id][chain_id_other]
                logger.debug(
                    'The shortest interchain distance between chain {} and chain {} is {}'
                    .format(chain_id, chain_id_other, contact_distance))
                if not contact_distance:
                    raise ValueError()
            except (IndexError, KeyError, ValueError) as e:
                logger.error(
                    'Could not calculate the shortest contact distance between two chains!'
                )
                logger.error(e)
                logger.error(contact_distance)
                raise

        # PhysiChem
        physchem, physchem_ownchain = self.get_physi_chem(chain_id, mutation)

        # Compile results
        results = dict(
            solvent_accessibility = solvent_accessibility,
            secondary_structure = secondary_structure,
            contact_distance = contact_distance,
            physchem = physchem,
            physchem_ownchain = physchem_ownchain,
        )
        return results


[docs] def get_structure_file(self, chains, ext='.pdb'): return op.join(self.working_dir, self.sp.pdb_id + chains + ext) #%%
[docs] def get_physi_chem(self, chain_id, mutation): """ Return the atomic contact vector, that is, counting how many interactions between charged, polar or "carbon" residues there are. The "carbon" interactions give you information about the Van der Waals packing of the residues. Comparing the wildtype vs. the mutant values is used in the machine learning algorithm. 'mutation' is of the form: 'A16' where A is the chain identifier and 16 the residue number (in pdb numbering) of the mutation chainIDs is a list of strings with the chain identifiers to be used if more than two chains are given, the chains not containing the mutation are considered as "opposing" chain """ model = self.sp.structure[0] mutated_chain = model[chain_id] opposite_chains = [ chain for chain in model.child_list if chain.id != chain_id ] main_chain_atoms = ['CA', 'C', 'N', 'O'] # Find the mutated residue (assuming mutation is in index coordinates) # mutation_pos = int(mutation[1:-1]) # residue_counter = 0 # for residue in mutated_chain: # if residue.resname in structure_tools.AMINO_ACIDS and not residue.id[0].strip(): # residue_counter += 1 # if residue_counter == mutation_pos: # self._validate_mutation(residue.resname, mutation) # mutated_residue = residue # break # mutated_atoms = [atom for atom in mutated_residue if atom.name not in main_chain_atoms] # Find the mutated residue (assuming mutation is in resnum) for residue in mutated_chain: if residue.resname in structure_tools.AMINO_ACIDS and not residue.id[0].strip(): if str(residue.id[1]) == mutation[1:-1]: self._validate_mutation(residue.resname, mutation) mutated_residue = residue break mutated_atoms = [atom for atom in mutated_residue if atom.name not in main_chain_atoms] # Go through each atom in each residue in each partner chain... opposite_chain_contacts = { 'equal_charge': [], 'opposite_charge': [], 'h_bond': [], 'carbon_contact': []} for opposite_chain in opposite_chains: for partner_residue in opposite_chain: self._increment_vector( mutated_residue, mutated_atoms, partner_residue, opposite_chain_contacts) # Go through each atom in each residue in the mutated chain... same_chain_contacts = { 'equal_charge': [], 'opposite_charge': [], 'h_bond': [], 'carbon_contact': []} for partner_residue in mutated_chain: # Skipping the mutated residue... if partner_residue == mutated_residue: continue self._increment_vector( mutated_residue, mutated_atoms, partner_residue, same_chain_contacts) opposite_chain_contact_vector = [ len(opposite_chain_contacts['equal_charge']), len(opposite_chain_contacts['opposite_charge']), len(opposite_chain_contacts['h_bond']), len(set(opposite_chain_contacts['carbon_contact']))] same_chain_contact_vector = [ len(same_chain_contacts['equal_charge']), len(same_chain_contacts['opposite_charge']), len(same_chain_contacts['h_bond']), len(set(same_chain_contacts['carbon_contact']))] return opposite_chain_contact_vector, same_chain_contact_vector
def _validate_mutation(self, resname, mutation): valid_aa = [mutation[0].upper(), mutation[-1].upper()] if structure_tools.AAA_DICT.get(resname, resname) not in valid_aa: logger.error(resname) logger.error(mutation) logger.error(structure_tools.AAA_DICT[resname]) raise errors.MutationMismatchError() def _increment_vector( self, mutated_residue, mutated_atoms, partner_residue, contact_features_dict): # For each residue each atom of the mutated residue has to be checked for mutated_atom in mutated_atoms: mutated_atom_type = self._get_atom_type(mutated_residue.resname, mutated_atom) # And each partner residue and partner atom for partner_atom in partner_residue: r = structure_tools.calculate_distance( mutated_atom, partner_atom, self.vdw_distance) if r is not None: partner_atom_type = self._get_atom_type(partner_residue.resname, partner_atom) if partner_atom_type == 'ignore': continue if mutated_atom_type == 'carbon' and partner_atom_type == 'carbon': # The Van der Waals packing should be determined # nonredundant. Thus, the atomic coordinates are # used to keep track of which interactions where # already counted. Does not matter as much for others. contact_features_dict['carbon_contact'].append(tuple(partner_atom.coord)) if r <= self.min_contact_distance: if mutated_atom_type == 'charged_plus' and partner_atom_type == 'charged_plus': contact_features_dict['equal_charge'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'charged_minus' and partner_atom_type == 'charged_plus': contact_features_dict['opposite_charge'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'charged_plus' and partner_atom_type == 'charged_minus': contact_features_dict['opposite_charge'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'charged_minus' and partner_atom_type == 'charged_minus': contact_features_dict['equal_charge'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'charged' and partner_atom_type == 'polar': contact_features_dict['h_bond'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'polar' and partner_atom_type == 'charged': contact_features_dict['h_bond'].append(tuple(mutated_atom.coord)) if mutated_atom_type == 'polar' and partner_atom_type == 'polar': contact_features_dict['h_bond'].append(tuple(mutated_atom.coord)) def _get_atom_type(self, residue, atom): """ Checks what type of atom it is (i.e. charged, polar, carbon) In order to see what "interaction" type two atoms are forming, check the individual label which every atom in every residue has (see pdb file convention for an explanation of the labels). With this label, one can determine which atom of the residue one is looking at, and hence, one can determine which "interaction" two atoms are forming. """ # This is based on the naming convention for the atoms in crystalography charged_plus = ['NH1', 'NH2', 'NZ'] # Label of negatively charged atoms charged_minus = ['OD1', 'OD2', 'OE1', 'OE2'] # Label of polar atoms polar = ['OG', 'OG1', 'OD1', 'OD2', 'ND1', 'OE1', 'NE', 'NE1', 'NE2', 'ND1', 'ND2', 'SG', 'OH', 'O', 'N'] if residue.upper() in ['ARG', 'R', 'LYS', 'K']: if atom.name in charged_plus: return 'charged_plus' if residue.upper() in ['ASP', 'D', 'GLU', 'E']: if atom.name in charged_minus: return 'charged_minus' if atom.name in polar: return 'polar' if atom.name[0] == 'C' or atom.name == 'SD': return 'carbon' return 'ignore' #%% SASA New
[docs] def get_seasa(self): structure_file = self.get_structure_file(''.join(self.chain_ids)) seasa_by_chain, seasa_by_residue = self._run_msms(structure_file) if len(self.chain_ids) > 1: seasa_by_chain_separately = [] seasa_by_residue_separately = [] for chain_id in self.chain_ids: structure_file = self.get_structure_file(chain_id) seasa_by_chain, seasa_by_residue = self._run_msms(structure_file) seasa_by_chain_separately.append(seasa_by_chain) seasa_by_residue_separately.append(seasa_by_residue) seasa_by_chain_separately = pd.concat(seasa_by_chain_separately, ignore_index=True) seasa_by_residue_separately = pd.concat(seasa_by_residue_separately, ignore_index=True) return [seasa_by_chain, seasa_by_chain_separately, seasa_by_residue, seasa_by_residue_separately] else: return [None, seasa_by_chain, None, seasa_by_residue]
def _run_msms(self, filename): """ In the future, could add an option to measure residue depth using Bio.PDB.ResidueDepth().residue_depth()... """ base_filename = op.splitext(filename)[0] # Convert pdb to xyz coordiates assert(os.path.isfile(op.join(self.working_dir, filename))) system_command = 'pdb_to_xyzrn {0}.pdb'.format(op.join(self.working_dir, base_filename)) logger.debug('msms system command 1: %s' % system_command) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) if return_code != 0: logger.debug('msms result 1:') logger.debug(result) logger.debug('msms error message 1:') logger.debug(error_message) logger.debug('naccess rc 1:') logger.debug(return_code) raise errors.MSMSError(error_message) else: with open(op.join(self.working_dir, base_filename + '.xyzrn'), 'w') as ofh: ofh.writelines(result) # Calculate SASA and SESA (excluded) probe_radius = 1.4 system_command_string = ( 'msms ' '-probe_radius {1:.1f} ' '-surface ases ' '-if {0}.xyzrn ' '-af {0}.area') system_command = system_command_string.format(op.join(self.working_dir, base_filename), probe_radius) logger.debug('msms system command 2: %s' % system_command) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) number_of_tries = 0 while return_code != 0 and number_of_tries < 5: logger.error('MSMS exited with an error!') probe_radius -= 0.1 logger.debug('Reducing probe radius to {}'.format(probe_radius)) system_command = system_command_string.format(op.join(self.working_dir, base_filename), probe_radius) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) number_of_tries += 1 if return_code != 0: logger.debug('msms result 2:') logger.debug(result) logger.debug('msms error message 2:') logger.debug(error_message) logger.debug('naccess rc 2:') logger.debug(return_code) raise errors.MSMSError(error_message) # Read and parse the output with open(op.join(self.working_dir, base_filename + '.area'), 'r') as fh: file_data = fh.readlines() file_data = [ [l.strip() for l in line.split()] for line in file_data] del file_data[0] msms_columns = [ 'atom_num', 'abs_sesa', 'abs_sasa', 'atom_id', 'res_name', 'res_num', 'pdb_chain' ] def msms_parse_row(row): parsed_row = [ int(row[0]), float(row[1]), float(row[2]), row[3].split('_')[0].strip(), row[3].split('_')[1].strip(), row[3].split('_')[2], row[3].split('_')[3] ] return parsed_row file_data = [msms_parse_row(row) for row in file_data if row] seasa_df = pd.DataFrame(data=file_data, columns=msms_columns) seasa_df['atom_num'] = seasa_df['atom_num'].apply(lambda x: x + 1) seasa_df['rel_sasa'] = [ x[0] / STANDARD_SASA.get(x[1], x[0]) * 100 for x in zip(seasa_df['abs_sasa'], seasa_df['res_name']) ] seasa_gp_by_chain = seasa_df.groupby(['pdb_chain']) seasa_gp_by_residue = seasa_df.groupby(['pdb_chain', 'res_name', 'res_num']) seasa_by_chain = seasa_gp_by_chain.sum().reset_index() seasa_by_residue = seasa_gp_by_residue.sum().reset_index() return seasa_by_chain, seasa_by_residue #%% SASA Old
[docs] def get_sasa(self, program_to_use='pops'): """Get Solvent Accessible Surface Area scores. .. note:: deprecated Use python:fn:`get_seasa` instead. """ if program_to_use == 'naccess': run_sasa_atom = self._run_naccess_atom elif program_to_use == 'pops': run_sasa_atom = self._run_pops_atom else: raise Exception('Unknown program specified!') sasa_score_splitchains = {} for chain_id in self.chain_ids: sasa_score_splitchains.update(run_sasa_atom(chain_id + '.pdb')) sasa_score_allchains = run_sasa_atom(''.join(self.chain_ids) + '.pdb') return [sasa_score_splitchains, sasa_score_allchains]
def _run_naccess_atom(self, filename): """ NACESS is not used anymore. It is not reliable enough on a PDB-wide scale. """ # run naccess system_command = ('naccess ' + filename) logger.debug('naccess system command: %s' % system_command) assert(os.path.isfile(self.working_dir + filename)) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) logger.debug('naccess result: {}'.format(result)) logger.debug('naccess error: {}'.format(error_message)) logger.debug('naccess rc: {}'.format(return_code)) # Collect results sasa_scores = {} with open(self.working_dir + filename.split('.')[0] + '.rsa') as fh: for line in fh: row = line.split() if row[0] != 'RES': continue try: (line_id, res, chain, num, all_abs, all_rel, sidechain_abs, sidechain_rel, mainchain_abs, mainchain_rel, nonpolar_abs, nonpolar_rel, polar_abs, polar_rel) = row except ValueError as e: print(e) print(line) print(row) sasa_scores.setdefault(chain, []).append(sidechain_rel) # percent sasa on sidechain return sasa_scores def _run_pops_atom(self, chain_id): # Use pops to calculate sasa score for the given chain termination, rc, e = self.__run_pops_atom(chain_id) if termination != 'Clean termination': logger.error('Pops error for pdb: %s, chains: %s: ' % (self.pdb_file, ' '.join(self.chain_ids),) ) logger.error(e) raise errors.PopsError(e, self.pdb_file, self.chain_ids) else: logger.warning('Pops error for pdb: %s, chains: %s: ' % (self.pdb_file, ' '.join(self.chain_ids),) ) logger.warning(e) # Read the sasa scores from a text file sasa_scores = self.__read_pops_atom(chain_id) return sasa_scores def __run_pops_atom(self, chain_id): system_command = ('pops --noHeaderOut --noTotalOut --atomOut --pdb {0}.pdb --popsOut {0}.out'.format(chain_id)) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) # The returncode can be non zero even if pops calculated the surface # area. In that case it is indicated by "clean termination" written # to the output. Hence this check: # if output[-1] == 'Clean termination' the run should be OK logger.debug('result: %s' % result) output = [ line for line in result.split('\n') if line != '' ] return output[-1], return_code, error_message def __read_pops_atom(self, chain_id): """ Read pops sasa results atom by atom, ignoring all main chain atoms except for Ca """ # The new way ignore = ['N', 'C', 'O'] per_residue_sasa_scores = [] current_residue_number = None total_sasa, total_sa = None, None # so pylint stops complaining with open(self.working_dir + chain_id + '.out', 'r') as fh: for line in fh: row = line.split() if len(row) != 11: continue atom_number, atom_name, residue_name, chain, residue_number, sasa, __, __, __, __, sa = line.split() atom_number, residue_number, sasa, sa = int(atom_number), int(residue_number), float(sasa), float(sa) if atom_name in ignore: continue if current_residue_number != residue_number: if current_residue_number: per_residue_sasa_scores.append(total_sasa / float(total_sa)) current_residue_number = residue_number total_sasa = 0 total_sa = 0 total_sasa += sasa total_sa += sa per_residue_sasa_scores.append(total_sasa / float(total_sa)) return per_residue_sasa_scores #%% Secondary Structure
[docs] def get_secondary_structure(self): return self.get_stride()
[docs] def get_stride(self): structure_file = self.get_structure_file(''.join(self.chain_ids)) stride_results_file = op.join( op.dirname(structure_file), structure_tools.get_pdb_id(structure_file) + '_stride_results.txt' ) system_command = 'stride {} -f{}'.format(structure_file, stride_results_file) logger.debug('stride system command: %s' % system_command) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) logger.debug('stride return code: %i' % return_code) logger.debug('stride result: %s' % result) logger.debug('stride error: %s' % error_message) # collect results with open(stride_results_file) as fh: file_data_df = pd.DataFrame( [[structure_tools.AAA_DICT[row.split()[1]], row.split()[2], row.split()[3], int(row.split()[4]), row.split()[5]] for row in fh.readlines() if row[:3] == 'ASG'], columns=['amino_acid', 'chain', 'resnum', 'idx', 'ss_code']) return file_data_df
[docs] def get_dssp(self): """Not used because crashes on server. """ n_tries = 0 return_code = -1 while return_code != 0 and n_tries < 5: if n_tries > 0: logger.debug('Waiting for 1 minute before trying again...') time.sleep(60) system_command = ('dssp -v -i ' + ''.join(self.chain_ids) + '.pdb' + ' -o ' + 'dssp_results.txt') logger.debug('dssp system command: %s' % system_command) result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) logger.debug('dssp return code: %i' % return_code) logger.debug('dssp result: %s' % result) logger.debug('dssp error: %s' % error_message) n_tries += 1 if return_code != 0: if 'boost::thread_resource_error' in error_message: system_command = "rsync {0}{1}.pdb /home/kimlab1/strokach/tmp/elaspic_bad_pdbs/" helper.subprocess_check_output_locally(self.working_dir, system_command) raise errors.ResourceError(error_message) # collect results dssp_ss = {} dssp_acc = {} start = False with open(op.join(self.working_dir, 'dssp_results.txt')) as fh: for l in fh: row = l.split() if not row or len(row) < 2: continue if row[1] == "RESIDUE": start = True # Start parsing from here continue if not start: continue if l[9] == ' ': continue # Skip -- missing residue # resseq, icode, chainid, aa, ss = int(l[5:10]), l[10], l[11], l[13], l[16] chainid, ss = l[11], l[16] if ss == ' ': ss = '-' try: acc = int(l[34:38]) #phi = float(l[103:109]) #psi = float(l[109:115]) except ValueError as e: # DSSP output breaks its own format when there are >9999 # residues, since only 4 digits are allocated to the seq num # field. See 3kic chain T res 321, 1vsy chain T res 6077. # Here, look for whitespace to figure out the number of extra # digits, and shift parsing the rest of the line by that amount. if l[34] != ' ': shift = l[34:].find(' ') acc = int((l[34+shift:38+shift])) #phi = float(l[103+shift:109+shift]) #psi = float(l[109+shift:115+shift]) else: raise e dssp_ss.setdefault(chainid, []).append(ss) # percent sasa on sidechain dssp_acc.setdefault(chainid, []).append(acc) for key in list(dssp_ss.keys()): dssp_ss[key] = ''.join(dssp_ss[key]) return dssp_ss, dssp_acc #%%
[docs] def get_interchain_distances(self, pdb_chain=None, pdb_mutation=None, cutoff=None): """ """ model = self.sp.structure[0] shortest_interchain_distances = {} # Chain 1 for i, chain_1_id in enumerate(self.sp.chain_ids): shortest_interchain_distances[chain_1_id] = {} chain_1 = model[chain_1_id] if pdb_chain: if chain_1_id == pdb_chain: continue chain_2_ids = [pdb_chain] else: chain_2_ids = self.sp.chain_ids[i+1:] # Chain 2 for chain_2_id in chain_2_ids: chain_2 = model[chain_2_id] min_r = cutoff # Residue 1 for residue_1 in chain_1: if (residue_1.resname not in structure_tools.AMINO_ACIDS or residue_1.id[0] != ' '): continue # Residue 2 for residue_2_idx, residue_2 in enumerate(chain_2): if (residue_2.resname not in structure_tools.AMINO_ACIDS or residue_2.id[0] != ' '): continue if pdb_mutation: if str(residue_2.id[1]) != pdb_mutation[1:-1]: continue if (structure_tools.convert_aa(residue_2.resname) != pdb_mutation[0] and structure_tools.convert_aa(residue_2.resname) != pdb_mutation[-1]): logger.debug(pdb_mutation) logger.debug(structure_tools.convert_aa(residue_2.resname)) logger.debug(residue_2.id) raise errors.MutationMismatchError() # Atom 1 for atom_1 in residue_1: # Atom 2 for atom_2 in residue_2: r = structure_tools.calculate_distance(atom_1, atom_2, min_r) if min_r is None or (r is not None and r < min_r): min_r = r shortest_interchain_distances[chain_1_id][chain_2_id] = min_r if not shortest_interchain_distances: logger.error( 'get_interchain_distances({pdb_chain}, {pdb_mutation}, {cutoff}) failed!' .format(pdb_chain=pdb_chain, pdb_mutation=pdb_mutation, cutoff=cutoff) ) raise Exception() _shortest_interchain_distances_complement = {} for key in shortest_interchain_distances: for key_2, value in shortest_interchain_distances[key].items(): _shortest_interchain_distances_complement.setdefault(key_2, dict())[key] = value shortest_interchain_distances.update(_shortest_interchain_distances_complement) all_chains = {key for key in shortest_interchain_distances} all_chains.update( {key_2 for key in shortest_interchain_distances for key_2 in shortest_interchain_distances[key]} ) if set(all_chains) != set(self.sp.chain_ids): logger.error( 'get_interchain_distances({pdb_chain}, {pdb_mutation}, {cutoff}) failed!' .format(pdb_chain=pdb_chain, pdb_mutation=pdb_mutation, cutoff=cutoff) ) logger.error('Did not calculate chain distances for all chain pairs!') logger.error('all_chains: {}'.format(all_chains)) logger.error('self.sp.chain_ids: {}'.format(self.sp.chain_ids)) raise Exception() for key_1, value_1 in shortest_interchain_distances.items(): logger.debug( 'Calculated interchain distances between chain {} and chains {}.' .format(key_1, ', '.join(list(value_1.keys())))) return shortest_interchain_distances #%%
[docs] def get_interface_area(self, chain_ids): """ """ assert len(chain_ids) == 2 termination, rc, e = self.__run_pops_area(self.get_structure_file(''.join(chain_ids))) if rc != 0: if termination != 'Clean termination': logger.error('Pops error for pdb: %s:' % self.pdb_file) logger.error(e) return [None, None, None] result = self.__read_pops_area(self.get_structure_file(''.join(chain_ids)) + '.out') # Distinguish the surface area by hydrophobic, hydrophilic, and total for item in result: if item[0] == 'hydrophobic:': hydrophobic = float(item[1]) elif item[0] == 'hydrophilic:': hydrophilic = float(item[1]) elif item[0] == 'total:': total = float(item[1]) sasa_complex = hydrophobic, hydrophilic, total # calculate SASA for chain, i.e. part one of the complex: termination, rc, e = self.__run_pops_area(self.get_structure_file(chain_ids[0])) if rc != 0: if termination != 'Clean termination': logger.error('Error in pops for pdb: %s:' % self.pdb_file) logger.error(e) return [None, None, None] result = self.__read_pops_area(self.get_structure_file(chain_ids[0]) + '.out') # Distinguish the surface area by hydrophobic, hydrophilic, and total for item in result: if item[0] == 'hydrophobic:': hydrophobic = float(item[1]) elif item[0] == 'hydrophilic:': hydrophilic = float(item[1]) elif item[0] == 'total:': total = float(item[1]) sasa_chain = hydrophobic, hydrophilic, total # calculate SASA for oppositeChain, i.e. the second part of the complex: termination, rc, e = self.__run_pops_area(self.get_structure_file(chain_ids[1])) if rc != 0: if termination != 'Clean termination': logger.error('Error in pops for pdb: %s:' % self.pdb_file) logger.error(e) return [None, None, None] result = self.__read_pops_area(self.get_structure_file(chain_ids[1]) + '.out') for item in result: if item[0] == 'hydrophobic:': hydrophobic = float(item[1]) elif item[0] == 'hydrophilic:': hydrophilic = float(item[1]) elif item[0] == 'total:': total = float(item[1]) sasa_oppositeChain = hydrophobic, hydrophilic, total sasa = [ 0, 0, 0 ] # hydrophobic sasa[0] = (sasa_chain[0] + sasa_oppositeChain[0] - sasa_complex[0]) / 2.0 # hydrophilic sasa[1] = (sasa_chain[1] + sasa_oppositeChain[1] - sasa_complex[1]) / 2.0 # total sasa[2] = (sasa_chain[2] + sasa_oppositeChain[2] - sasa_complex[2]) / 2.0 return sasa
def __run_pops_area(self, full_filename): system_command = ('pops --chainOut' ' --pdb ' + full_filename + ' --popsOut ' + full_filename + '.out') result, error_message, return_code = ( helper.subprocess_check_output_locally(self.working_dir, system_command) ) # The returncode can be non zero even if pops calculated the surface # area. In that case it is indicated by "clean termination" written # to the output. Hence this check: # if output[-1] == 'Clean termination' the run should be OK output = [ line for line in result.split('\n') if line != '' ] logger.debug(system_command) # logger.debug('pops result: %s' % result) # Prints the entire POPs output # logger.debug('pops error: %s' % e) error_message_1 = 'Warning: Atom distance too short! Probably incorrect POPS results!' if error_message_1 in error_message: logger.error(error_message_1) logger.debug('pops rc: %s' % return_code) return output[-1], return_code, error_message def __read_pops_area(self, filename): """ This function parses POPS output that looks like this:: === MOLECULE SASAs === hydrophobic: 5267.01 hydrophilic: 4313.68 total: 9580.69 """ keep = ['hydrophobic:', 'hydrophilic:', 'total:'] with open(filename, 'r') as pops: result = [ x.split(' ') for x in pops.readlines() if x != '' and x.split(' ')[0] in keep ] result = [ [ x.strip() for x in item if x != '' ] for item in result ] if not result or len(result) != 3: result = self.__read_pops_area_new(filename) return result def __read_pops_area_new(self, filename): """ This function parses POPS output that looks like this:: === MOLECULE SASAs === Phob/A^2 Phil/A^2 Total/A^2 5267.01 4313.68 9580.69 """ use_next_line = False with open(filename, 'r') as ifh: for line in ifh: if line.strip() == 'Phob/A^2\t\tPhil/A^2\t\tTotal/A^2': use_next_line = True continue if use_next_line: row = line.strip().split() result = [['hydrophobic:', row[0]], ['hydrophilic:', row[1]], ['total:', row[2]]] break return result