Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
authorDiane Trout <diane@caltech.edu>
Sat, 12 May 2012 00:49:59 +0000 (17:49 -0700)
committerDiane Trout <diane@caltech.edu>
Sat, 12 May 2012 00:49:59 +0000 (17:49 -0700)
encode_submission/ucsc_gather.py
htsworkflow/pipelines/gerald.py
htsworkflow/pipelines/runfolder.py
htsworkflow/submission/daf.py

index 0ed8f7e34d2885927a966ab11e2d3ac280818890..c45e3820bbc8b9f146684bef9ed8f5443fb03f8e 100644 (file)
@@ -39,11 +39,19 @@ from htsworkflow.submission.condorfastq import CondorFastqExtract
 
 logger = logging.getLogger('ucsc_gather')
 
+TAR = '/bin/tar'
+LFTP = '/usr/bin/lftp'
+
 def main(cmdline=None):
     parser = make_parser()
     opts, args = parser.parse_args(cmdline)
     submission_uri = None
 
+    global TAR
+    global LFTP
+    TAR = opts.tar
+    LFTP = opts.lftp
+
     if opts.debug:
         logging.basicConfig(level = logging.DEBUG )
     elif opts.verbose:
@@ -93,6 +101,10 @@ def main(cmdline=None):
         mapper.scan_submission_dirs(results)
 
     if opts.make_ddf:
+        if not os.path.exists(TAR):
+            parser.error("%s does not exist, please specify --tar" % (TAR,))
+        if not os.path.exists(LFTP):
+            parser.error("%s does not exist, please specify --lftp" % (LFTP,))
         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
 
     if opts.zip_ddf:
@@ -120,6 +132,10 @@ def make_parser():
     model.add_option('--sparql', default=None, help="execute sparql query")
     model.add_option('--print-rdf', action="store_true", default=False,
       help="print ending model state")
+    model.add_option('--tar', default=TAR,
+                     help="override path to tar command")
+    model.add_option('--lftp', default=LFTP,
+                     help="override path to lftp command")
     parser.add_option_group(model)
     # commands
     commands = OptionGroup(parser, 'commands')
@@ -301,7 +317,7 @@ def zip_ddfs(view_map, library_result_map, daf_name):
 def make_condor_archive_script(name, files, outdir=None):
     script = """Universe = vanilla
 
-Executable = /bin/tar
+Executable = %(tar)s
 arguments = czvhf ../%(archivename)s %(filelist)s
 
 Error = compress.out.$(Process).log
@@ -323,7 +339,8 @@ queue
     context = {'archivename': make_submission_name(name),
                'filelist': " ".join(files),
                'initialdir': os.path.abspath(outdir),
-               'user': os.getlogin()}
+               'user': os.getlogin(),
+               'tar': TAR}
 
     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
     condor_stream = open(condor_script,'w')
@@ -332,11 +349,11 @@ queue
     return condor_script
 
 
-def make_condor_upload_script(name, outdir=None):
+def make_condor_upload_script(name, lftp, outdir=None):
     script = """Universe = vanilla
 
-Executable = /usr/bin/lftp
-arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
+Executable = %(lftp)s
+arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
 
 Error = upload.out.$(Process).log
 Output = upload.out.$(Process).log
@@ -358,7 +375,8 @@ queue
                'user': os.getlogin(),
                'ftpuser': ftpuser,
                'ftppassword': ftppassword,
-               'ftphost': encodeftp}
+               'ftphost': encodeftp,
+               'lftp': LFTP}
 
     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
     condor_stream = open(condor_script,'w')
index 06fc94b5433cfb00a131a8aee5e6ab78684e0cc4..9d32a5bfd741739edb4d685c7828180f69ed798d 100644 (file)
@@ -4,6 +4,7 @@ Provide access to information stored in the GERALD directory.
 from datetime import datetime, date
 import logging
 import os
+import stat
 import time
 
 from htsworkflow.pipelines.summary import Summary
@@ -91,6 +92,8 @@ class Gerald(object):
             self._lanes = {}
             tree = self._gerald.tree
             analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
+            if analysis is None:
+                return
             # according to the pipeline specs I think their fields
             # are sampleName_laneID, with sampleName defaulting to s
             # since laneIDs are constant lets just try using
@@ -104,6 +107,10 @@ class Gerald(object):
             if self._lane is None:
                 self._initalize_lanes()
             return self._lanes[key]
+        def get(self, key, default):
+            if self._lane is None:
+                self._initalize_lanes()
+            return self._lanes.get(key, None)
         def keys(self):
             if self._lane is None:
                 self._initalize_lanes()
@@ -138,8 +145,13 @@ class Gerald(object):
         if self.tree is None:
             return datetime.today()
         timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
-        epochstamp = time.mktime(time.strptime(timestamp, '%c'))
-        return datetime.fromtimestamp(epochstamp)
+        if timestamp is not None:
+            epochstamp = time.mktime(time.strptime(timestamp, '%c'))
+            return datetime.fromtimestamp(epochstamp)
+        if self.pathname is not None:
+            epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
+            return datetime.fromtimestamp(epochstamp)
+        return datetime.today()
     date = property(_get_date)
 
     def _get_time(self):
@@ -162,10 +174,12 @@ class Gerald(object):
             root = os.path.join(root,'')
 
         experiment_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
-        if experiment_dir is None:
-            return None
-        experiment_dir = experiment_dir.replace(root, '')
-        if len(experiment_dir) == 0:
+        if experiment_dir is not None:
+            experiment_dir = experiment_dir.replace(root, '')
+        experiment_dir = self.tree.findtext('Defaults/EXPT_DIR')
+        if experiment_dir is not None:
+            _, experiment_dir = os.path.split(experiment_dir)
+        if experiment_dir is None or len(experiment_dir) == 0:
             return None
 
         dirnames = experiment_dir.split(os.path.sep)
@@ -229,12 +243,18 @@ def gerald(pathname):
     g.tree = ElementTree.parse(config_pathname).getroot()
 
     # parse Summary.htm file
-    summary_pathname = os.path.join(g.pathname, 'Summary.xml')
-    if os.path.exists(summary_pathname):
+    summary_xml = os.path.join(g.pathname, 'Summary.xml')
+    summary_htm = os.path.join(g.pathname, 'Summary.htm')
+    status_files_summary = os.path.join(g.pathname, '..', 'Data', 'Status_Files', 'Summary.htm')
+    if os.path.exists(summary_xml):
         LOGGER.info("Parsing Summary.xml")
-    else:
+        summary_pathname = summary_xml
+    elif os.path.exists(summary_htm):
         summary_pathname = os.path.join(g.pathname, 'Summary.htm')
         LOGGER.info("Parsing Summary.htm")
+    else:
+        summary_pathname = status_files_summary
+        LOGGER.info("Parsing %s" % (status_files_summary,))
     g.summary = Summary(summary_pathname)
     # parse eland files
     g.eland_results = eland(g.pathname, g)
index 3389c2d9ab8ad6f3ffc4b2378c0c8fcd7ff8139a..3bfb928d763f442faa4b4b852b8683efabaae93f 100644 (file)
@@ -47,6 +47,7 @@ class PipelineRun(object):
           self.pathname = None
         self._name = None
         self._flowcell_id = flowcell_id
+        self.datadir = None
         self.image_analysis = None
         self.bustard = None
         self.gerald = None
@@ -177,7 +178,7 @@ def get_runs(runfolder, flowcell_id=None):
     from htsworkflow.pipelines import bustard
     from htsworkflow.pipelines import gerald
 
-    def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
+    def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
         LOGGER.info("Looking for bustard directories in %s" % (pathname,))
         bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
         # RTA BaseCalls looks enough like Bustard.
@@ -192,6 +193,21 @@ def get_runs(runfolder, flowcell_id=None):
                 try:
                     g = gerald.gerald(gerald_pathname)
                     p = PipelineRun(runfolder, flowcell_id)
+                    p.datadir = datadir
+                    p.image_analysis = image_analysis
+                    p.bustard = b
+                    p.gerald = g
+                    runs.append(p)
+                except IOError, e:
+                    LOGGER.error("Ignoring " + str(e))
+
+            aligned_glob = os.path.join(runfolder, 'Aligned*')
+            for aligned in glob(aligned_glob):
+                LOGGER.info("Found aligned directory %s" % (aligned,))
+                try:
+                    g = gerald.gerald(aligned)
+                    p = PipelineRun(runfolder, flowcell_id)
+                    p.datadir = datadir
                     p.image_analysis = image_analysis
                     p.bustard = b
                     p.gerald = g
@@ -228,7 +244,7 @@ def get_runs(runfolder, flowcell_id=None):
             )
         else:
             scan_post_image_analysis(
-                runs, runfolder, image_analysis, ipar_pathname
+                runs, runfolder, datadir, image_analysis, ipar_pathname
             )
 
     return runs
@@ -407,12 +423,17 @@ def save_flowcell_reports(data_dir, cycle_dir):
         os.chdir(cwd)
 
 
-def save_summary_file(gerald_object, cycle_dir):
+def save_summary_file(pipeline, cycle_dir):
     # Copy Summary.htm
-    summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
-    if os.path.exists(summary_path):
-        LOGGER.info('Copying %s to %s' % (summary_path, cycle_dir))
-        shutil.copy(summary_path, cycle_dir)
+    gerald_object = pipeline.gerald
+    gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
+    status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
+    if os.path.exists(gerald_summary):
+        LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
+        shutil.copy(gerald_summary, cycle_dir)
+    elif os.path.exists(status_files_summary):
+        LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
+        shutil.copy(status_files_summary, cycle_dir)
     else:
         LOGGER.info('Summary file %s was not found' % (summary_path,))
 
@@ -551,13 +572,17 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r
       if site is not None:
         lanes = []
         for lane in range(1, 9):
-          if r.gerald.lanes[lane].analysis != 'none':
+          lane_parameters = r.gerald.lanes.get(lane, None)
+          if lane_parameters is not None and lane_parameters.analysis != 'none':
             lanes.append(lane)
 
         run_name = srf.pathname_to_run_name(r.pathname)
         seq_cmds = []
+        LOGGER.info("Raw Format is: %s" % (raw_format, ))
         if raw_format == 'fastq':
-            srf.copy_hiseq_project_fastqs(run_name, r.bustard.pathname, site, cycle_dir)
+            rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
+            LOGGER.info("raw data = %s" % (rawpath,))
+            srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
         elif raw_format == 'qseq':
             seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
         elif raw_format == 'srf':
@@ -571,7 +596,7 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r
       g = r.gerald
 
       # save summary file
-      save_summary_file(g, cycle_dir)
+      save_summary_file(r, cycle_dir)
 
       # compress eland result files
       compress_eland_results(g, cycle_dir, num_jobs)
index 96037b9647c93146a9887334c3ec1ebdf9266b0d..a74d71a667d1cb788cbdaab24cdf8661738c9413 100644 (file)
@@ -24,7 +24,7 @@ from htsworkflow.util.rdfhelp import \
      fromTypedNode
 from htsworkflow.util.hashfile import make_md5sum
 
-logger = logging.getLogger(__name__)
+LOGGER = logging.getLogger(__name__)
 
 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
 VARIABLES_TERM_NAME = 'variables'
@@ -130,7 +130,7 @@ def parse_stream(stream):
     if view_name is not None:
         attributes['views'][view_name] = view_attributes
 
-    logger.debug("DAF Attributes" + pformat(attributes))
+    LOGGER.debug("DAF Attributes" + pformat(attributes))
     return attributes
 
 
@@ -254,7 +254,7 @@ class UCSCSubmission(object):
              otherwise specifies model to use
         """
         if daf_file is None and model is None:
-            logger.error("We need a DAF or Model containing a DAF to work")
+            LOGGER.error("We need a DAF or Model containing a DAF to work")
 
         self.name = name
         self.submissionSet = get_submission_uri(self.name)
@@ -297,11 +297,11 @@ class UCSCSubmission(object):
         """Examine files in our result directory
         """
         for lib_id, result_dir in result_map.items():
-            logger.info("Importing %s from %s" % (lib_id, result_dir))
+            LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
             try:
                 self.import_submission_dir(result_dir, lib_id)
             except MetadataLookupException, e:
-                logger.error("Skipping %s: %s" % (lib_id, str(e)))
+                LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
 
     def import_submission_dir(self, submission_dir, library_id):
         """Import a submission directories and update our model as needed
@@ -324,10 +324,10 @@ class UCSCSubmission(object):
         """
         path, filename = os.path.split(pathname)
 
-        logger.debug("Searching for view")
+        LOGGER.debug("Searching for view")
         view = self.find_view(filename)
         if view is None:
-            logger.warn("Unrecognized file: {0}".format(pathname))
+            LOGGER.warn("Unrecognized file: {0}".format(pathname))
             return None
         if str(view) == str(libraryOntology['ignore']):
             return None
@@ -339,7 +339,7 @@ class UCSCSubmission(object):
                                        dafTermOntology['name']))
         if view_name is None:
             errmsg = 'Could not find view name for {0}'
-            logger.warning(errmsg.format(str(view)))
+            LOGGER.warning(errmsg.format(str(view)))
             return
 
         view_name = str(view_name)
@@ -349,7 +349,7 @@ class UCSCSubmission(object):
             RDF.Statement(self.submissionSet,
                           dafTermOntology['has_submission'],
                           submissionNode))
-        logger.debug("Adding statements to {0}".format(str(submissionNode)))
+        LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
         self.model.add_statement(RDF.Statement(submissionNode,
                                                submissionOntology['has_view'],
                                                submissionView))
@@ -364,7 +364,7 @@ class UCSCSubmission(object):
                                                submissionOntology['library'],
                                                libNode))
 
-        logger.debug("Adding statements to {0}".format(str(submissionView)))
+        LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
         # add track specific information
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['view'], view))
@@ -380,11 +380,11 @@ class UCSCSubmission(object):
         # add file specific information
         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
 
-        logger.debug("Done.")
+        LOGGER.debug("Done.")
 
     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
         # add file specific information
-        logger.debug("Updating file md5sum")
+        LOGGER.debug("Updating file md5sum")
         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
         submission_pathname = os.path.join(submission_dir, filename)
         self.model.add_statement(
@@ -399,7 +399,7 @@ class UCSCSubmission(object):
         md5 = make_md5sum(submission_pathname)
         if md5 is None:
             errmsg = "Unable to produce md5sum for {0}"
-            logger.warning(errmsg.format(submission_pathname))
+            LOGGER.warning(errmsg.format(submission_pathname))
         else:
             self.model.add_statement(
                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
@@ -513,7 +513,7 @@ class UCSCSubmission(object):
         else:
             msg = "Found wrong number of view names for {0} len = {1}"
             msg = msg.format(str(view), len(names))
-            logger.error(msg)
+            LOGGER.error(msg)
             raise RuntimeError(msg)
 
     def _get_filename_view_map(self):
@@ -528,11 +528,11 @@ class UCSCSubmission(object):
         for s in self.model.find_statements(filename_query):
             view_name = s.subject
             literal_re = s.object.literal_value['string']
-            logger.debug("Found: %s" % (literal_re,))
+            LOGGER.debug("Found: %s" % (literal_re,))
             try:
                 filename_re = re.compile(literal_re)
             except re.error, e:
-                logger.error("Unable to compile: %s" % (literal_re,))
+                LOGGER.error("Unable to compile: %s" % (literal_re,))
             patterns[literal_re] = view_name
         return patterns