Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 69 additions & 53 deletions classify/blast.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,32 @@
import tools
import tools.samtools
import util.misc

import time
TOOL_NAME = "blastn"

'''
#Creating task.log
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("blast_py.log"),
logging.StreamHandler()
]
)
'''
#Setting up logging, blast_py.log
try:
log_directory = os.getcwd()

# Ensure the directory exists, if not, create it
if not os.path.exists(log_directory):
os.makedirs(log_directory)

#Set up logging directory path
log_file_path = os.path.join(log_directory, 'blast_py.log')

logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path),
logging.StreamHandler()
]
)
except Exception as e:
print ("Failed to set up logging:", str(e))
raise e

_log = logging.getLogger(__name__)

class BlastTools(tools.Tool):
Expand Down Expand Up @@ -49,12 +61,15 @@ class BlastnTool(BlastTools):
""" Tool wrapper for blastn """
subtool_name = 'blastn'

def get_hits_pipe(self, inPipe, db, threads=None, task=None, outfmt='6', max_target_seqs=1, output_type="read_id"):
_log.debug(f"Executing get_hits_pipe function. Called with outfmt: {outfmt}")
def get_hits_pipe(self, inPipe, db, threads, outfmt, task=None, max_target_seqs=1, output_type="read_id", taxidlist=None):
start_time = time.time()


#toggle between extracting read IDs only or full blast output (all lines)
if output_type not in ['read_id', 'full_line']:
_log.warning(f"Invalid output_type '{output_type}' specified. Defaulting to 'read_id'.")
output_type = 'read_id'
_log.debug(f"Prior to running cmd, executing get_hits_pipe function. Called with task: {task} ,type: {type(task)},outfmt: {outfmt}")
# run blastn and emit list of read IDs
threads = util.misc.sanitize_thread_count(threads)
cmd = [self.install_and_get_path(),
Expand All @@ -66,56 +81,57 @@ def get_hits_pipe(self, inPipe, db, threads=None, task=None, outfmt='6', max_tar
'-max_target_seqs', str(max_target_seqs),
'-task', str(task) if task else 'blastn',
]
#Add taxidlist if specified by user
if taxidlist:
cmd.extend(['-taxidlist', taxidlist])
_log.info(f"Using taxidlist: {taxidlist} in BLAST command")

cmd = [str(x) for x in cmd]
#Log BLAST command executed
_log.debug('Running blastn command: {}'.format(' '.join(cmd)))
_log.debug('| ' + ' '.join(cmd) + ' |')
blast_pipe = subprocess.Popen(cmd, stdin=inPipe, stdout=subprocess.PIPE)
output, error = blast_pipe.communicate()

#Display error message if BLAST failed
if blast_pipe.returncode!= 0:
_log.error('Error running blastn command: {}'.format(error))
raise subprocess.CalledProcessError(blast_pipe.returncode, cmd)
_log.info('Running blastn command: {}'.format(' '.join(cmd)))

# If read_id is defined, strip tab output to just query read ID names and emit (default)
last_read_id = None
for line in output.decode('UTF-8').splitlines():
if output_type == 'read_id':
#Split line by tab, and take the first element
read_id = line.split('\t')[0]
# Only emit if it is not a duplicate of the previous read ID
if read_id != last_read_id:
last_read_id = read_id
yield read_id
#Yield the full line without stripping whitespace
elif output_type == 'full_line':
yield line

#Display on CMD if BLAST fails
if blast_pipe.returncode!= 0:
_log.error('Error running blastn command: {}'.format(error))
raise subprocess.CalledProcessError(blast_pipe.returncode, cmd)
#Logging configuration written to blast_py.log if BLAST passes/fails
if blast_pipe.returncode == 0:
_log.info("Blastn process completed succesfully.")
else:
_log.error("Blastn process failed with exit code: %s", blast_pipe.returncode)
raise subprocess.CalledProcessError(blast_pipe.returncode, cmd)

#Try/finally block added to ensure resource packages are cleaned up regardless of error raised
try:
with subprocess.Popen(cmd, stdin=inPipe, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as blast_pipe:
output, error = blast_pipe.communicate()

if blast_pipe.returncode != 0:
_log.error(f'Error running blastn command: {error.decode()}')
raise subprocess.CalledProcessError(blast_pipe.returncode, cmd, output=output, stderr=error)

# Process the output line by line in a generator fashion
last_read_id = None
for line in output.decode('UTF-8').splitlines():
if output_type == 'read_id':
read_id = line.split('\t')[0]
if read_id != last_read_id:
last_read_id = read_id
yield read_id
elif output_type == 'full_line':
yield line

finally:
# Ensure resources are cleaned up
_log.info("Cleaning up subprocess resources.")

# Log successful completion and time taken
elapsed_time = time.time() - start_time
_log.debug(f"get_hits_pipe executed in {elapsed_time:.2f} seconds")
def get_hits_bam(self, inBam, db, threads=None):
return self.get_hits_pipe(
tools.samtools.SamtoolsTool().bam2fa_pipe(inBam),
db,
threads=threads)

def get_hits_fasta(self, inFasta, db, threads=None, task=None, outfmt='6', max_target_seqs=1, output_type='read_id'):
_log.debug(f"Executing get_hits_fasta function. Called with outfmt: {outfmt}")
def get_hits_fasta(self, inFasta, db, threads, outfmt, task, max_target_seqs=1, output_type='read_id', taxidlist=None):
start_time = time.time()
_log.debug(f"Executing get_hits_fasta function. Called with outfmt: {outfmt}, taxidlist: {taxidlist}")
with open(inFasta, 'rt') as inf:
for hit in self.get_hits_pipe(inf, db, threads=threads, task=None, outfmt=outfmt, max_target_seqs=max_target_seqs, output_type=output_type):
for hit in self.get_hits_pipe(inf, db=db, threads=threads, outfmt=outfmt, task=task, max_target_seqs=max_target_seqs, output_type=output_type, taxidlist=taxidlist):
yield hit


elapsed_time = time.time() - start_time
_log.debug(f"get_hits_fasta exectued in {elapsed_time:.2f} seconds")

class MakeblastdbTool(BlastTools):
""" Tool wrapper for makeblastdb """
subtool_name = 'makeblastdb'
Expand Down
Loading