extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/grid/VariantStatisticsJobCreator.java

Code
Comments
Other
Rev Date Author Line
5765 29 Nov 19 nicklas 1 package net.sf.basedb.reggie.grid;
5765 29 Nov 19 nicklas 2
5766 02 Dec 19 nicklas 3 import java.io.StringWriter;
5765 29 Nov 19 nicklas 4 import java.util.Arrays;
5766 02 Dec 19 nicklas 5 import java.util.List;
5766 02 Dec 19 nicklas 6 import java.util.regex.Matcher;
5766 02 Dec 19 nicklas 7 import java.util.regex.Pattern;
5765 29 Nov 19 nicklas 8
5766 02 Dec 19 nicklas 9 import net.sf.basedb.core.BioSource;
5765 29 Nov 19 nicklas 10 import net.sf.basedb.core.DbControl;
5766 02 Dec 19 nicklas 11 import net.sf.basedb.core.DerivedBioAssay;
5766 02 Dec 19 nicklas 12 import net.sf.basedb.core.InvalidDataException;
5765 29 Nov 19 nicklas 13 import net.sf.basedb.core.ItemList;
5765 29 Nov 19 nicklas 14 import net.sf.basedb.core.ItemNotFoundException;
5765 29 Nov 19 nicklas 15 import net.sf.basedb.core.ItemParameterType;
5766 02 Dec 19 nicklas 16 import net.sf.basedb.core.ItemQuery;
5765 29 Nov 19 nicklas 17 import net.sf.basedb.core.Job;
5766 02 Dec 19 nicklas 18 import net.sf.basedb.core.ProgressReporter;
5765 29 Nov 19 nicklas 19 import net.sf.basedb.core.SessionControl;
5815 24 Jan 20 nicklas 20 import net.sf.basedb.core.StringParameterType;
5766 02 Dec 19 nicklas 21 import net.sf.basedb.core.Type;
5766 02 Dec 19 nicklas 22 import net.sf.basedb.core.query.Expressions;
5766 02 Dec 19 nicklas 23 import net.sf.basedb.core.query.Hql;
5766 02 Dec 19 nicklas 24 import net.sf.basedb.core.query.Restrictions;
5766 02 Dec 19 nicklas 25 import net.sf.basedb.core.snapshot.SnapshotManager;
5765 29 Nov 19 nicklas 26 import net.sf.basedb.opengrid.JobDefinition;
5772 03 Dec 19 nicklas 27 import net.sf.basedb.opengrid.JobStatus;
5765 29 Nov 19 nicklas 28 import net.sf.basedb.opengrid.OpenGridCluster;
5772 03 Dec 19 nicklas 29 import net.sf.basedb.opengrid.OpenGridSession;
5765 29 Nov 19 nicklas 30 import net.sf.basedb.opengrid.ScriptBuilder;
5765 29 Nov 19 nicklas 31 import net.sf.basedb.opengrid.config.ClusterConfig;
5765 29 Nov 19 nicklas 32 import net.sf.basedb.opengrid.config.JobConfig;
5766 02 Dec 19 nicklas 33 import net.sf.basedb.opengrid.filetransfer.StringUploadSource;
5772 03 Dec 19 nicklas 34 import net.sf.basedb.opengrid.service.JobCompletionHandler;
5765 29 Nov 19 nicklas 35 import net.sf.basedb.reggie.Reggie;
5765 29 Nov 19 nicklas 36 import net.sf.basedb.reggie.XmlConfig;
5766 02 Dec 19 nicklas 37 import net.sf.basedb.reggie.dao.Annotationtype;
5766 02 Dec 19 nicklas 38 import net.sf.basedb.reggie.dao.Subtype;
5765 29 Nov 19 nicklas 39 import net.sf.basedb.util.Values;
5766 02 Dec 19 nicklas 40 import net.sf.basedb.util.export.TableWriter;
5765 29 Nov 19 nicklas 41
5765 29 Nov 19 nicklas 42 /**
5765 29 Nov 19 nicklas 43   Helper class for generating variant
5765 29 Nov 19 nicklas 44   statistics script and send it to the cluster for execution.
5765 29 Nov 19 nicklas 45   
5765 29 Nov 19 nicklas 46   @author nicklas
5765 29 Nov 19 nicklas 47   @since 4.25
5765 29 Nov 19 nicklas 48 */
5765 29 Nov 19 nicklas 49 public class VariantStatisticsJobCreator 
6674 11 Apr 22 nicklas 50   extends AbstractJobCreator
5765 29 Nov 19 nicklas 51 {
5765 29 Nov 19 nicklas 52
5815 24 Jan 20 nicklas 53   private String outputBaseName = "scanb-tumors";
5765 29 Nov 19 nicklas 54   
5765 29 Nov 19 nicklas 55   public VariantStatisticsJobCreator()
5765 29 Nov 19 nicklas 56   {}
5765 29 Nov 19 nicklas 57
5765 29 Nov 19 nicklas 58   /**
5815 24 Jan 20 nicklas 59     Set the base name of files that are created.
5815 24 Jan 20 nicklas 60     Typically, this should be "scanb-tumors" or 
5815 24 Jan 20 nicklas 61     "scanb-normals" (.vcf.gz is and other extensions are automatically appended)
5815 24 Jan 20 nicklas 62   */
5815 24 Jan 20 nicklas 63   public void setOutputBaseName(String baseName)
5815 24 Jan 20 nicklas 64   {
5815 24 Jan 20 nicklas 65     this.outputBaseName = baseName;
5815 24 Jan 20 nicklas 66   }
5815 24 Jan 20 nicklas 67   
5815 24 Jan 20 nicklas 68   /**
5765 29 Nov 19 nicklas 69     Schedule a job on the cluster for collecting variant statistics.
5765 29 Nov 19 nicklas 70     @return The corresponding job in BASE
5765 29 Nov 19 nicklas 71   */
5766 02 Dec 19 nicklas 72   public Job createVariantStatisticsJob(DbControl dc, OpenGridCluster cluster, ItemList list, ProgressReporter progress)
5765 29 Nov 19 nicklas 73   {
5765 29 Nov 19 nicklas 74     SessionControl sc = dc.getSessionControl();
5765 29 Nov 19 nicklas 75     
5765 29 Nov 19 nicklas 76     ClusterConfig clusterCfg = cluster.getConfig();
5765 29 Nov 19 nicklas 77     XmlConfig cfg = Reggie.getConfig(cluster.getId());
5765 29 Nov 19 nicklas 78     if (cfg == null)
5765 29 Nov 19 nicklas 79     {
5765 29 Nov 19 nicklas 80       throw new ItemNotFoundException("No configuration in reggie-config.xml for cluster: " + cluster.getId());
5765 29 Nov 19 nicklas 81     }
5765 29 Nov 19 nicklas 82     
5768 02 Dec 19 nicklas 83     String projectRoot = cfg.getRequiredConfig("project-archive", null);
5768 02 Dec 19 nicklas 84     String pipeline_scripts_path = cfg.getRequiredConfig("programs/pipeline-scripts/path", null);
5768 02 Dec 19 nicklas 85     String bedtools_path = cfg.getRequiredConfig("programs/bedtools/path", null);
5768 02 Dec 19 nicklas 86
5765 29 Nov 19 nicklas 87     // Options for the job
5765 29 Nov 19 nicklas 88     JobConfig jobConfig = new JobConfig();
5765 29 Nov 19 nicklas 89     if (priority != null) jobConfig.setPriority(priority);
6981 17 Jan 23 nicklas 90     if (partition != null) jobConfig.setQsubOption("q", ScriptUtil.checkValidScriptParameter(partition));
6981 17 Jan 23 nicklas 91     jobConfig.convertOptionsTo(cluster.getConfig().getType());
5765 29 Nov 19 nicklas 92     
5765 29 Nov 19 nicklas 93     // Create job 
5765 29 Nov 19 nicklas 94     Job statJob = Job.getNew(dc, null, null, null);
5772 03 Dec 19 nicklas 95     statJob.setItemSubtype(Subtype.VARIANT_STATISTICS_JOB.get(dc));
5765 29 Nov 19 nicklas 96     statJob.setPluginVersion("reggie-"+Reggie.VERSION);
5765 29 Nov 19 nicklas 97     statJob.setSendMessage(Values.getBoolean(sc.getUserClientSetting("plugins.sendmessage"), false));
5765 29 Nov 19 nicklas 98     statJob.setName("Build variant statistics");
5765 29 Nov 19 nicklas 99     statJob.setParameterValue("list", new ItemParameterType<ItemList>(ItemList.class, null), list);
5815 24 Jan 20 nicklas 100     statJob.setParameterValue("outputBaseName", new StringParameterType(), outputBaseName);
5765 29 Nov 19 nicklas 101     if (debug) statJob.setName(statJob.getName() + " (debug)");
6981 17 Jan 23 nicklas 102     if (partition != null) statJob.setParameterValue("partition", new StringParameterType(), partition);
5765 29 Nov 19 nicklas 103     dc.saveItem(statJob);
5765 29 Nov 19 nicklas 104
5765 29 Nov 19 nicklas 105     ScriptBuilder script = new ScriptBuilder();
6665 05 Apr 22 nicklas 106     script.cmd(debug ? "set -ex" : "set -e");
5765 29 Nov 19 nicklas 107     script.comment("Setting up scripting environment and copying script to tmp folder");
5768 02 Dec 19 nicklas 108     script.cmd("export ScriptDir=" + pipeline_scripts_path);
5815 24 Jan 20 nicklas 109     script.cmd("OUTNAME="+outputBaseName);
5768 02 Dec 19 nicklas 110     script.newLine();
5768 02 Dec 19 nicklas 111
5768 02 Dec 19 nicklas 112     script.cmd("cd ${TMPDIR}");
5768 02 Dec 19 nicklas 113     script.cmd("cp ${ScriptDir}/mut_stats.py .");
5773 03 Dec 19 nicklas 114     script.cmd("mkdir results");
5773 03 Dec 19 nicklas 115     script.cmd("mkdir tmp");
5768 02 Dec 19 nicklas 116     script.newLine();
5765 29 Nov 19 nicklas 117     
5768 02 Dec 19 nicklas 118     String statCmd = "./mut_stats.py";
5768 02 Dec 19 nicklas 119     statCmd += " ${WD}/vcflist.txt";
5770 03 Dec 19 nicklas 120     statCmd += " ${WD}/progress";
5815 24 Jan 20 nicklas 121     statCmd += " ${WD}/${OUTNAME}.log";
5815 24 Jan 20 nicklas 122     statCmd += " > tmp/${OUTNAME}.vcf";
5768 02 Dec 19 nicklas 123     script.cmd(statCmd);
5772 03 Dec 19 nicklas 124     script.progress(95, "Sorting and indexing...");
5815 24 Jan 20 nicklas 125     script.cmd(bedtools_path+" sort -header -i tmp/${OUTNAME}.vcf | bgzip -c > results/${OUTNAME}.vcf.gz");
5815 24 Jan 20 nicklas 126     script.cmd("tabix results/${OUTNAME}.vcf.gz");
5773 03 Dec 19 nicklas 127     script.cmd("cp results/* ${WD}");
5765 29 Nov 19 nicklas 128     
5766 02 Dec 19 nicklas 129     script.progress(99, "Cleaning up temporary folders");
5766 02 Dec 19 nicklas 130
5766 02 Dec 19 nicklas 131     // Export a list with PATIENT name and path to RAW VCF file
5766 02 Dec 19 nicklas 132     StringWriter vcfList = new StringWriter(list.getSize()*80);
5766 02 Dec 19 nicklas 133     TableWriter tw = new TableWriter(vcfList);
5766 02 Dec 19 nicklas 134     // For each alignment we need to find the patient. We design a query
5766 02 Dec 19 nicklas 135     // to load it from the specimen name which we get by cutting the front
5766 02 Dec 19 nicklas 136     // of the alignment name
5766 02 Dec 19 nicklas 137     ItemQuery<BioSource> findPatBySpecimen = BioSource.getQuery();
5766 02 Dec 19 nicklas 138     Subtype.PATIENT.addFilter(dc, findPatBySpecimen);
5766 02 Dec 19 nicklas 139     findPatBySpecimen.setIncludes(Reggie.INCLUDE_IN_CURRENT_PROJECT);
5766 02 Dec 19 nicklas 140     findPatBySpecimen.join(Hql.innerJoin("childCreationEvents", "cce"));
5766 02 Dec 19 nicklas 141     findPatBySpecimen.join(Hql.innerJoin("cce", "event", "evt"));
5766 02 Dec 19 nicklas 142     findPatBySpecimen.join(Hql.innerJoin("evt", "bioMaterial", "cse")); // 'cse' should now reference a case
5766 02 Dec 19 nicklas 143     findPatBySpecimen.join(Hql.innerJoin("cse", "childCreationEvents", "cce2"));
5766 02 Dec 19 nicklas 144     findPatBySpecimen.join(Hql.innerJoin("cce2", "event", "evt2"));
5766 02 Dec 19 nicklas 145     findPatBySpecimen.join(Hql.innerJoin("evt2", "bioMaterial", "spm")); // 'spm' should now reference a specimen/nospecimen
5766 02 Dec 19 nicklas 146     findPatBySpecimen.restrict(Restrictions.eq(Hql.property("spm", "name"), Expressions.parameter("specimen")));
5765 29 Nov 19 nicklas 147     
6183 26 Mar 21 nicklas 148     ItemQuery<DerivedBioAssay> query = list.getMembers();
5766 02 Dec 19 nicklas 149     query.setIncludes(Reggie.INCLUDE_IN_CURRENT_PROJECT);
5766 02 Dec 19 nicklas 150     List<DerivedBioAssay> alignments = query.list(dc);
5765 29 Nov 19 nicklas 151     
5766 02 Dec 19 nicklas 152     SnapshotManager manager = new SnapshotManager();
5766 02 Dec 19 nicklas 153     int count = 0;
5766 02 Dec 19 nicklas 154     int total = alignments.size();
5766 02 Dec 19 nicklas 155     Pattern specimenName = Pattern.compile("(\\d+\\.\\d+)\\..*"); // Extract specimen name by taking first part of alignment name
5770 03 Dec 19 nicklas 156
5766 02 Dec 19 nicklas 157     for (DerivedBioAssay alignment : alignments)
5766 02 Dec 19 nicklas 158     {
5766 02 Dec 19 nicklas 159       count++;
5766 02 Dec 19 nicklas 160       if (progress != null && count % 100 == 0)
5766 02 Dec 19 nicklas 161       {
5766 02 Dec 19 nicklas 162         progress.display(count * 90 / total, "Exporting Patient and VCF list (" + count + " of " + total + ")...");
5766 02 Dec 19 nicklas 163       }
5766 02 Dec 19 nicklas 164       
5766 02 Dec 19 nicklas 165       Matcher m = specimenName.matcher(alignment.getName());
5766 02 Dec 19 nicklas 166       if (!m.matches())
5766 02 Dec 19 nicklas 167       {
5766 02 Dec 19 nicklas 168         throw new InvalidDataException("Could not get specimen name for alignment: "+ alignment.getName());
5766 02 Dec 19 nicklas 169       }
5766 02 Dec 19 nicklas 170       
5766 02 Dec 19 nicklas 171       String specimen = m.group(1);
5766 02 Dec 19 nicklas 172       findPatBySpecimen.setParameter("specimen", specimen, Type.STRING);
5766 02 Dec 19 nicklas 173       List<BioSource> patients = findPatBySpecimen.list(dc);
5766 02 Dec 19 nicklas 174       if (patients.size() != 1)
5766 02 Dec 19 nicklas 175       {
5766 02 Dec 19 nicklas 176         if (patients.size() == 0) throw new ItemNotFoundException("Could not find a patient item for alignment: " +alignment.getName());
5766 02 Dec 19 nicklas 177         throw new InvalidDataException("Found "+ patients.size() + " patient items for alignment: " + alignment.getName());
5766 02 Dec 19 nicklas 178       }
5766 02 Dec 19 nicklas 179       String patient = patients.get(0).getName();
5768 02 Dec 19 nicklas 180       String dataFolder = ScriptUtil.checkValidPath((String)Annotationtype.DATA_FILES_FOLDER.getAnnotationValue(dc, manager, alignment), true, true);
5770 03 Dec 19 nicklas 181       tw.tablePrintData(patient, projectRoot+dataFolder+"/variants-raw.vcf.gz", alignment.getName());
5766 02 Dec 19 nicklas 182     }
5766 02 Dec 19 nicklas 183     tw.flush();
5766 02 Dec 19 nicklas 184     tw.close();
5766 02 Dec 19 nicklas 185     
6674 11 Apr 22 nicklas 186     JobDefinition jobDef = new JobDefinition("VariantStatistics", jobConfig, batchConfig, statJob);
5765 29 Nov 19 nicklas 187     jobDef.setDebug(debug);
5766 02 Dec 19 nicklas 188     jobDef.addFile(new StringUploadSource("vcflist.txt", vcfList.toString()));
5765 29 Nov 19 nicklas 189     jobDef.setCmd(script.toString());
5765 29 Nov 19 nicklas 190     
5765 29 Nov 19 nicklas 191     ScriptUtil.submitJobs(dc, cluster, Arrays.asList(jobDef));
5765 29 Nov 19 nicklas 192     return statJob;
5765 29 Nov 19 nicklas 193   }
5765 29 Nov 19 nicklas 194
5772 03 Dec 19 nicklas 195   public static class VariantStatisticsJobCompletionHandler
5772 03 Dec 19 nicklas 196     implements JobCompletionHandler
5772 03 Dec 19 nicklas 197   {
5772 03 Dec 19 nicklas 198     
5772 03 Dec 19 nicklas 199     public VariantStatisticsJobCompletionHandler()
5772 03 Dec 19 nicklas 200     {}
5765 29 Nov 19 nicklas 201   
5772 03 Dec 19 nicklas 202     @Override
5772 03 Dec 19 nicklas 203     public String jobCompleted(SessionControl sc, OpenGridSession session, Job job, JobStatus status)
5772 03 Dec 19 nicklas 204     {
5772 03 Dec 19 nicklas 205       String jobName = status.getName();
5815 24 Jan 20 nicklas 206       String baseFileName = job.getParameterValue("outputBaseName");
5815 24 Jan 20 nicklas 207       String log = session.getJobFileAsString(jobName, baseFileName+".log", "UTF-8");
5772 03 Dec 19 nicklas 208       Stats stat = Stats.parse(log);
5772 03 Dec 19 nicklas 209       
5772 03 Dec 19 nicklas 210       return "Parsed " + stat.numVcf + " VCF files for " + stat.numPatients + " patients." + 
5773 03 Dec 19 nicklas 211         " Found "+Reggie.formatCount(stat.numVariants) + " variants." + 
5815 24 Jan 20 nicklas 212         " Result files can be found at: " + session.getHost().getWorkFolder(jobName) + 
5815 24 Jan 20 nicklas 213         "/" + baseFileName + ".*";
5772 03 Dec 19 nicklas 214     }
5772 03 Dec 19 nicklas 215   }
5772 03 Dec 19 nicklas 216
5772 03 Dec 19 nicklas 217   static class Stats
5772 03 Dec 19 nicklas 218   {
5772 03 Dec 19 nicklas 219     int numVcf = 0;
5772 03 Dec 19 nicklas 220     int numPatients = 0;
5772 03 Dec 19 nicklas 221     int numVariants = 0;
5772 03 Dec 19 nicklas 222     
5772 03 Dec 19 nicklas 223     static Stats parse(String statsOut)
5772 03 Dec 19 nicklas 224     {
5772 03 Dec 19 nicklas 225       Stats s = new Stats();
5772 03 Dec 19 nicklas 226       for (String line : statsOut.split("\n"))
5772 03 Dec 19 nicklas 227       {
5772 03 Dec 19 nicklas 228         String[] stat = line.split(":", 2);
5772 03 Dec 19 nicklas 229         String key = stat[0].strip();
5772 03 Dec 19 nicklas 230         int val = Values.getInt(stat[1].strip());
5772 03 Dec 19 nicklas 231         if ("NumberOfVCF".equals(key))
5772 03 Dec 19 nicklas 232         {
5772 03 Dec 19 nicklas 233           s.numVcf = val;
5772 03 Dec 19 nicklas 234         }
5772 03 Dec 19 nicklas 235         else if ("NumberOfPatients".equals(key))
5772 03 Dec 19 nicklas 236         {
5772 03 Dec 19 nicklas 237           s.numPatients = val;
5772 03 Dec 19 nicklas 238         }
5772 03 Dec 19 nicklas 239         else if ("NumberOfVariants".equals(key))
5772 03 Dec 19 nicklas 240         {
5772 03 Dec 19 nicklas 241           s.numVariants = val;
5772 03 Dec 19 nicklas 242         }
5772 03 Dec 19 nicklas 243       }
5772 03 Dec 19 nicklas 244       return s;
5772 03 Dec 19 nicklas 245     }
5772 03 Dec 19 nicklas 246   }
5765 29 Nov 19 nicklas 247 }