extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/grid/JobCompletionHandlerFactory.java

Code
Comments
Other
Rev Date Author Line
4271 16 Dec 16 nicklas 1 package net.sf.basedb.reggie.grid;
4271 16 Dec 16 nicklas 2
7079 27 Mar 23 nicklas 3 import org.slf4j.LoggerFactory;
7079 27 Mar 23 nicklas 4
4325 30 Jan 17 nicklas 5 import net.sf.basedb.core.DbControl;
4325 30 Jan 17 nicklas 6 import net.sf.basedb.core.ItemSubtype;
4271 16 Dec 16 nicklas 7 import net.sf.basedb.core.Job;
4291 10 Jan 17 nicklas 8 import net.sf.basedb.core.SessionControl;
4271 16 Dec 16 nicklas 9 import net.sf.basedb.opengrid.JobStatus;
4291 10 Jan 17 nicklas 10 import net.sf.basedb.opengrid.OpenGridSession;
4271 16 Dec 16 nicklas 11 import net.sf.basedb.opengrid.service.JobCompletionHandler;
4291 10 Jan 17 nicklas 12 import net.sf.basedb.reggie.autoconfirm.AutoConfirmService;
4291 10 Jan 17 nicklas 13 import net.sf.basedb.reggie.counter.CounterService;
5490 13 Jun 19 nicklas 14 import net.sf.basedb.reggie.dao.Pipeline;
4325 30 Jan 17 nicklas 15 import net.sf.basedb.reggie.dao.Subtype;
4286 09 Jan 17 nicklas 16 import net.sf.basedb.reggie.servlet.SequencingRunServlet;
4271 16 Dec 16 nicklas 17 import net.sf.basedb.util.extensions.ActionFactory;
4271 16 Dec 16 nicklas 18 import net.sf.basedb.util.extensions.ClientContext;
4271 16 Dec 16 nicklas 19 import net.sf.basedb.util.extensions.InvokationContext;
7079 27 Mar 23 nicklas 20 import net.sf.basedb.util.extensions.logging.ExtensionsLog;
7079 27 Mar 23 nicklas 21 import net.sf.basedb.util.extensions.logging.ExtensionsLogger;
4271 16 Dec 16 nicklas 22
4271 16 Dec 16 nicklas 23 /**
4271 16 Dec 16 nicklas 24   Handles callbacks from the Open Grid Service when jobs have ended on the cluster.
4271 16 Dec 16 nicklas 25   Note that callbacks are done for both successful and failed jobs and the we get 
4271 16 Dec 16 nicklas 26   notified about ALL jobs (also created by other extensions). We use the 
4271 16 Dec 16 nicklas 27   {@link Job#getPluginVersion()} to check if the job was created by us (=Reggie) so it
4271 16 Dec 16 nicklas 28   is important that all jobs are created with <code>Job.setPluginVersion("reggie-"+Reggie.VERSION)</code>.
4271 16 Dec 16 nicklas 29   Normally we ignore failed jobs. Successful jobs are detected by job type and sent to 
4271 16 Dec 16 nicklas 30   an action that is specific for that type.
4271 16 Dec 16 nicklas 31   
4271 16 Dec 16 nicklas 32 */
4271 16 Dec 16 nicklas 33 public class JobCompletionHandlerFactory 
4271 16 Dec 16 nicklas 34   implements ActionFactory<JobCompletionHandler>
4271 16 Dec 16 nicklas 35 {
4271 16 Dec 16 nicklas 36   
7079 27 Mar 23 nicklas 37   public static final String ID = "net.sf.basedb.reggie.grid.job-complete";
7079 27 Mar 23 nicklas 38
7079 27 Mar 23 nicklas 39   private static final ExtensionsLogger logger = 
7079 27 Mar 23 nicklas 40     ExtensionsLog.getLogger(ID, true).wrap(LoggerFactory.getLogger(JobCompletionHandlerFactory.class));
7079 27 Mar 23 nicklas 41
4271 16 Dec 16 nicklas 42   public JobCompletionHandlerFactory()
4271 16 Dec 16 nicklas 43   {}
4271 16 Dec 16 nicklas 44
4271 16 Dec 16 nicklas 45   @Override
4271 16 Dec 16 nicklas 46   public boolean prepareContext(InvokationContext<? super JobCompletionHandler> context) 
4271 16 Dec 16 nicklas 47   {
4271 16 Dec 16 nicklas 48     // Always true since we do not know anything about the job(s) that have been completed
4271 16 Dec 16 nicklas 49     return true;
4271 16 Dec 16 nicklas 50   }
4271 16 Dec 16 nicklas 51
4271 16 Dec 16 nicklas 52   @Override
4271 16 Dec 16 nicklas 53   public JobCompletionHandler[] getActions(InvokationContext<? super JobCompletionHandler> context) 
4271 16 Dec 16 nicklas 54   {
4271 16 Dec 16 nicklas 55     ClientContext cc = context.getClientContext();
4271 16 Dec 16 nicklas 56     Job job  = (Job)cc.getCurrentItem();
4291 10 Jan 17 nicklas 57     
4271 16 Dec 16 nicklas 58     String pluginVersion = job.getPluginVersion();
4271 16 Dec 16 nicklas 59     if (pluginVersion == null || !pluginVersion.startsWith("reggie"))
4271 16 Dec 16 nicklas 60     {
4271 16 Dec 16 nicklas 61       // This is not our job
4271 16 Dec 16 nicklas 62       return null;
4271 16 Dec 16 nicklas 63     }
4271 16 Dec 16 nicklas 64     
4271 16 Dec 16 nicklas 65     // Note that job.getStatus() has not been updated yet so we
4271 16 Dec 16 nicklas 66     // need to get the status information extracted from the cluster
4271 16 Dec 16 nicklas 67     JobStatus status = (JobStatus)cc.getAttribute("job-status");
4271 16 Dec 16 nicklas 68     
4271 16 Dec 16 nicklas 69     JobCompletionHandler action = null;
4325 30 Jan 17 nicklas 70     ItemSubtype jobType = job.getItemSubtype();
5490 13 Jun 19 nicklas 71     Pipeline pipeline = Pipeline.getByCName(job.getParameterValue("pipeline"));
5036 17 Oct 18 nicklas 72     if (jobType != null && status.getStatus() == Job.Status.DONE)
4271 16 Dec 16 nicklas 73     {
7079 27 Mar 23 nicklas 74       if (logger.isDebugEnabled())
7079 27 Mar 23 nicklas 75       {
7079 27 Mar 23 nicklas 76         logger.debug("Job is completed: " + job.getName()+" (type="+jobType.getName() +"; pipeline="+pipeline+")");
7079 27 Mar 23 nicklas 77       }
4325 30 Jan 17 nicklas 78       DbControl dc = null;
4325 30 Jan 17 nicklas 79       try
4325 30 Jan 17 nicklas 80       {
6599 22 Feb 22 nicklas 81         dc = cc.getSessionControl().newDbControl("Reggie: Completed job handler");
4325 30 Jan 17 nicklas 82         if (jobType.equals(Subtype.SEQUENCING_RUN_JOB.get(dc)))
4325 30 Jan 17 nicklas 83         {
4325 30 Jan 17 nicklas 84           action = new SequencingRunServlet.SequencingRunJobCompletionHandler();
4325 30 Jan 17 nicklas 85         }
4325 30 Jan 17 nicklas 86         else if (jobType.equals(Subtype.DEMUX_MERGE_JOB.get(dc)))
4325 30 Jan 17 nicklas 87         {
5490 13 Jun 19 nicklas 88           if (pipeline == Pipeline.RNA_SEQ)
5490 13 Jun 19 nicklas 89           {
5490 13 Jun 19 nicklas 90             action = new RnaSeqDemuxJobCreator.DemuxJobCompletionHandler();
5490 13 Jun 19 nicklas 91           }
5490 13 Jun 19 nicklas 92           else if (pipeline == Pipeline.MIPS)
5490 13 Jun 19 nicklas 93           {
5490 13 Jun 19 nicklas 94             action = new MipsDemuxJobCreator.DemuxJobCompletionHandler();
5490 13 Jun 19 nicklas 95           }
4325 30 Jan 17 nicklas 96         }
6181 25 Mar 21 nicklas 97         else if (jobType.equals(Subtype.FASTQ_IMPORT_JOB.get(dc)))
6181 25 Mar 21 nicklas 98         {
6181 25 Mar 21 nicklas 99           action = new ImportFastqJobCreator.FastqImportJobCompletionHandler();
6181 25 Mar 21 nicklas 100         }
4590 25 Sep 17 nicklas 101         else if (jobType.equals(Subtype.HISAT_ALIGN_JOB.get(dc)))
4325 30 Jan 17 nicklas 102         {
6814 26 Aug 22 nicklas 103           if (pipeline == Pipeline.RNASEQ_HISAT_STRINGTIE)
6814 26 Aug 22 nicklas 104           {
6814 26 Aug 22 nicklas 105             action = new HisatAlignJobCreator.AlignJobCompletionHandler();
6814 26 Aug 22 nicklas 106           }
6814 26 Aug 22 nicklas 107           else if (pipeline == Pipeline.RNASEQ_HISAT_2023)
6814 26 Aug 22 nicklas 108           {
6814 26 Aug 22 nicklas 109             action = new Hisat2023AlignJobCreator.AlignJobCompletionHandler();
6814 26 Aug 22 nicklas 110           }
4325 30 Jan 17 nicklas 111         }
7089 04 Apr 23 nicklas 112         else if (jobType.equals(Subtype.BWA_MEM2_ALIGN_JOB.get(dc)))
7089 04 Apr 23 nicklas 113         {
7089 04 Apr 23 nicklas 114           action = new BwaMem2AlignJobCreator.AlignJobCompletionHandler();
7089 04 Apr 23 nicklas 115         }
7287 15 Aug 23 nicklas 116         else if (jobType.equals(Subtype.ASCAT_JOB.get(dc)))
7287 15 Aug 23 nicklas 117         {
7287 15 Aug 23 nicklas 118           action = new AscatJobCreator.AscatJobCompletionHandler();
7287 15 Aug 23 nicklas 119         }
5826 18 Feb 20 nicklas 120         else if (jobType.equals(Subtype.MIPS_ALIGN_JOB.get(dc)))
5826 18 Feb 20 nicklas 121         {
5826 18 Feb 20 nicklas 122           action = new MipsAlignJobCreator.AlignJobCompletionHandler();
5826 18 Feb 20 nicklas 123         }
4543 27 Jun 17 nicklas 124         else if (jobType.equals(Subtype.TOPHAT_CUFFLINKS_JOB.get(dc)))
4543 27 Jun 17 nicklas 125         {
4543 27 Jun 17 nicklas 126           action = new CufflinksJobCreator.CufflinksJobCompletionHandler();
4543 27 Jun 17 nicklas 127         }
4664 30 Jan 18 nicklas 128         else if (jobType.equals(Subtype.STRINGTIE_JOB.get(dc)))
4664 30 Jan 18 nicklas 129         {
6821 29 Aug 22 nicklas 130           if (pipeline == Pipeline.RNASEQ_HISAT_STRINGTIE)
6821 29 Aug 22 nicklas 131           {
6821 29 Aug 22 nicklas 132             action = new StringTieJobCreator.StringTieJobCompletionHandler();
6821 29 Aug 22 nicklas 133           }
6821 29 Aug 22 nicklas 134           else if (pipeline == Pipeline.RNASEQ_STRINGTIE_2023)
6821 29 Aug 22 nicklas 135           {
6821 29 Aug 22 nicklas 136             action = new StringTie2023JobCreator.StringTieJobCompletionHandler();
6821 29 Aug 22 nicklas 137           }
4664 30 Jan 18 nicklas 138         }
5033 17 Oct 18 nicklas 139         else if (jobType.equals(Subtype.MBAF_JOB.get(dc)))
5033 17 Oct 18 nicklas 140         {
5033 17 Oct 18 nicklas 141           action = new MBafJobCreator.MBafJobCompletionHandler();
5033 17 Oct 18 nicklas 142         }
6870 16 Nov 22 nicklas 143         else if (jobType.equals(Subtype.METHYLATION_BETA_JOB.get(dc)))
6870 16 Nov 22 nicklas 144         {
6870 16 Nov 22 nicklas 145           action = new MethylationBetaJobCreator.BetaJobCompletionHandler();
6870 16 Nov 22 nicklas 146         }
5684 25 Oct 19 nicklas 147         else if (jobType.equals(Subtype.VARIANT_CALLING_JOB.get(dc)))
5684 25 Oct 19 nicklas 148         {
7388 31 Oct 23 nicklas 149           if (pipeline == Pipeline.RNASEQ_HISAT_VARIANTCALL)
7388 31 Oct 23 nicklas 150           {
7388 31 Oct 23 nicklas 151             action = new VariantCallingJobCreator.VariantCallJobCompletionHandler();
7388 31 Oct 23 nicklas 152           }
7388 31 Oct 23 nicklas 153           else if (pipeline == Pipeline.DNA_NORMAL_WGS)
7388 31 Oct 23 nicklas 154           {
7388 31 Oct 23 nicklas 155             action = new WgsVariantCallJobCreator.PanelOfNormalVariantCallJobCompletionHandler();
7388 31 Oct 23 nicklas 156           }
7411 10 Nov 23 nicklas 157           else if (pipeline == Pipeline.DNA_PAIRED_VARIANTCALL)
7410 10 Nov 23 nicklas 158           {
7410 10 Nov 23 nicklas 159             action = new WgsVariantCallJobCreator.PairedVariantCallJobCompletionHandler();
7410 10 Nov 23 nicklas 160           }
5684 25 Oct 19 nicklas 161         }
6387 15 Sep 21 nicklas 162         else if (jobType.equals(Subtype.TARGETED_GENOTYPING_JOB.get(dc)))
6387 15 Sep 21 nicklas 163         {
6387 15 Sep 21 nicklas 164           action = new TargetedGenotypeJobCreator.TargetedGenotypeJobCompletionHandler();
6387 15 Sep 21 nicklas 165         }
5772 03 Dec 19 nicklas 166         else if (jobType.equals(Subtype.VARIANT_STATISTICS_JOB.get(dc)))
5772 03 Dec 19 nicklas 167         {
7395 06 Nov 23 nicklas 168           if (pipeline == Pipeline.DNA_NORMAL_WGS)
7395 06 Nov 23 nicklas 169           {
7395 06 Nov 23 nicklas 170             action = new WgsVariantCallJobCreator.BuildPanelOfNormalsJobCompletionHandler();
7395 06 Nov 23 nicklas 171           }
7395 06 Nov 23 nicklas 172           else
7395 06 Nov 23 nicklas 173           {
7395 06 Nov 23 nicklas 174             action = new VariantStatisticsJobCreator.VariantStatisticsJobCompletionHandler();
7395 06 Nov 23 nicklas 175           }
5772 03 Dec 19 nicklas 176         }
4325 30 Jan 17 nicklas 177       }
4325 30 Jan 17 nicklas 178       finally
4325 30 Jan 17 nicklas 179       {
4325 30 Jan 17 nicklas 180         if (dc != null) dc.close();
4325 30 Jan 17 nicklas 181       }
7079 27 Mar 23 nicklas 182       if (logger.isDebugEnabled())
7079 27 Mar 23 nicklas 183       {
7079 27 Mar 23 nicklas 184         logger.debug("Job is completed: " + job.getName()+" (action="+action+")");
7079 27 Mar 23 nicklas 185       }
4286 09 Jan 17 nicklas 186     }
4271 16 Dec 16 nicklas 187     
5036 17 Oct 18 nicklas 188     return new JobCompletionHandler[] { new JobCompletionWrapper(action) };
4271 16 Dec 16 nicklas 189   }
4271 16 Dec 16 nicklas 190   
4291 10 Jan 17 nicklas 191   /**
4291 10 Jan 17 nicklas 192     Wrapper around job completion handlers that forces the
4291 10 Jan 17 nicklas 193     auto-confirm and counter services to update the next time.
4291 10 Jan 17 nicklas 194   */
4291 10 Jan 17 nicklas 195   static class JobCompletionWrapper
4291 10 Jan 17 nicklas 196     implements JobCompletionHandler
4291 10 Jan 17 nicklas 197   {
4291 10 Jan 17 nicklas 198
4291 10 Jan 17 nicklas 199     private final JobCompletionHandler realHandler;
4291 10 Jan 17 nicklas 200     
4291 10 Jan 17 nicklas 201     public JobCompletionWrapper(JobCompletionHandler realHandler) 
4291 10 Jan 17 nicklas 202     {
4291 10 Jan 17 nicklas 203       this.realHandler = realHandler;
4291 10 Jan 17 nicklas 204     }
4291 10 Jan 17 nicklas 205     
4291 10 Jan 17 nicklas 206     @Override
4291 10 Jan 17 nicklas 207     public String jobCompleted(SessionControl sc, OpenGridSession session, Job job, JobStatus status)
4291 10 Jan 17 nicklas 208     {
5036 17 Oct 18 nicklas 209       String msg = null;
5036 17 Oct 18 nicklas 210       try
5036 17 Oct 18 nicklas 211       {
5036 17 Oct 18 nicklas 212         if (realHandler != null)
5036 17 Oct 18 nicklas 213         {
5036 17 Oct 18 nicklas 214           msg = realHandler.jobCompleted(sc, session, job, status);
5036 17 Oct 18 nicklas 215         }
5036 17 Oct 18 nicklas 216       }
5036 17 Oct 18 nicklas 217       finally
5036 17 Oct 18 nicklas 218       {
5036 17 Oct 18 nicklas 219         // Force auto-confirmation it should wait a few seconds to make sure
5036 17 Oct 18 nicklas 220         // that the transaction that is handling the job is committed
5036 17 Oct 18 nicklas 221         AutoConfirmService.getInstance().setForceCheck(15);
5036 17 Oct 18 nicklas 222         CounterService.getInstance().setForceCount();
5036 17 Oct 18 nicklas 223       }
4291 10 Jan 17 nicklas 224       return msg;
4291 10 Jan 17 nicklas 225     }
4291 10 Jan 17 nicklas 226     
4291 10 Jan 17 nicklas 227   }
4271 16 Dec 16 nicklas 228   
4271 16 Dec 16 nicklas 229 }