extensions/net.sf.basedb.opengrid/trunk/src/net/sf/basedb/opengrid/service/OpenGridSignalHandlerFactory.java

Code
Comments
Other
Rev Date Author Line
4203 02 Nov 16 nicklas 1 package net.sf.basedb.opengrid.service;
4203 02 Nov 16 nicklas 2
4203 02 Nov 16 nicklas 3 import java.net.URI;
4203 02 Nov 16 nicklas 4 import java.util.Arrays;
4203 02 Nov 16 nicklas 5 import java.util.Collection;
4203 02 Nov 16 nicklas 6 import java.util.Collections;
4203 02 Nov 16 nicklas 7
4203 02 Nov 16 nicklas 8 import org.slf4j.LoggerFactory;
4203 02 Nov 16 nicklas 9
4203 02 Nov 16 nicklas 10 import net.sf.basedb.core.Job;
4203 02 Nov 16 nicklas 11 import net.sf.basedb.core.signal.ExtensionSignalTransporter;
4203 02 Nov 16 nicklas 12 import net.sf.basedb.core.signal.Signal;
4203 02 Nov 16 nicklas 13 import net.sf.basedb.core.signal.SignalHandler;
4203 02 Nov 16 nicklas 14 import net.sf.basedb.opengrid.JobIdentifier;
4203 02 Nov 16 nicklas 15 import net.sf.basedb.util.extensions.ActionFactory;
4203 02 Nov 16 nicklas 16 import net.sf.basedb.util.extensions.InvokationContext;
7075 27 Mar 23 nicklas 17 import net.sf.basedb.util.extensions.logging.ExtensionsLog;
7075 27 Mar 23 nicklas 18 import net.sf.basedb.util.extensions.logging.ExtensionsLogger;
4203 02 Nov 16 nicklas 19
4203 02 Nov 16 nicklas 20 /**
4203 02 Nov 16 nicklas 21   Signal handler implementation for Open Grid Scheduler clusters.
4203 02 Nov 16 nicklas 22   Jobs that are started on a cluster should be registered in BASE
4203 02 Nov 16 nicklas 23   with an external id equal to the job id on the cluster,
4203 02 Nov 16 nicklas 24   {@link ExtensionSignalTransporter} as the signal transporter implementation
4203 02 Nov 16 nicklas 25   and the URI from {@link #getSignalUri(JobIdentifier)} as the
4203 02 Nov 16 nicklas 26   signal transporter parameter.
4203 02 Nov 16 nicklas 27   
4203 02 Nov 16 nicklas 28   Signal URI:s have the form:
4203 02 Nov 16 nicklas 29   
4203 02 Nov 16 nicklas 30   ogx://cluster-id/?ABORT,STATUS#job-id
4203 02 Nov 16 nicklas 31
4203 02 Nov 16 nicklas 32   where cluster-id is the username, ip-address and port of the cluster.
4203 02 Nov 16 nicklas 33 */
4203 02 Nov 16 nicklas 34 public class OpenGridSignalHandlerFactory 
4203 02 Nov 16 nicklas 35   implements ActionFactory<SignalHandler>
4203 02 Nov 16 nicklas 36 {
4203 02 Nov 16 nicklas 37
4203 02 Nov 16 nicklas 38   /**
7075 27 Mar 23 nicklas 39     The ID of the signal handler extension.
7075 27 Mar 23 nicklas 40     @since 1.10
7075 27 Mar 23 nicklas 41   */
7075 27 Mar 23 nicklas 42   public static final String ID = "net.sf.basedb.opengrid.job-signal";
7075 27 Mar 23 nicklas 43   
7075 27 Mar 23 nicklas 44   /**
4203 02 Nov 16 nicklas 45     All supported signals by this signal handler.
4203 02 Nov 16 nicklas 46   */
4203 02 Nov 16 nicklas 47   public static final Collection<Signal> SUPPORTED_SIGNALS =
4203 02 Nov 16 nicklas 48     Collections.unmodifiableList(Arrays.asList(Signal.ABORT, Signal.STATUS));
4203 02 Nov 16 nicklas 49
7075 27 Mar 23 nicklas 50   private static final ExtensionsLogger logger =
7075 27 Mar 23 nicklas 51     ExtensionsLog.getLogger(ID, true).wrap(LoggerFactory.getLogger(OpenGridSignalHandlerFactory.class));
4203 02 Nov 16 nicklas 52   
4203 02 Nov 16 nicklas 53   /**
4203 02 Nov 16 nicklas 54     Generate a signal URI that is used to send signals to a given
4203 02 Nov 16 nicklas 55     job on a cluster.
4301 13 Jan 17 nicklas 56     @param jobId The job identifier
4203 02 Nov 16 nicklas 57   */
4203 02 Nov 16 nicklas 58   public static String getSignalUri(JobIdentifier jobId)
4203 02 Nov 16 nicklas 59   {
4203 02 Nov 16 nicklas 60     StringBuilder uri = new StringBuilder();
4203 02 Nov 16 nicklas 61     uri.append("ogx://");
4203 02 Nov 16 nicklas 62     uri.append(jobId.getClusterId());
4203 02 Nov 16 nicklas 63     uri.append("/?");
4203 02 Nov 16 nicklas 64     String delimiter = "";
4203 02 Nov 16 nicklas 65     for (Signal s : SUPPORTED_SIGNALS)
4203 02 Nov 16 nicklas 66     {
4203 02 Nov 16 nicklas 67       uri.append(delimiter);
4203 02 Nov 16 nicklas 68       uri.append(s.getId());
4203 02 Nov 16 nicklas 69       delimiter = ",";
4203 02 Nov 16 nicklas 70     }
4203 02 Nov 16 nicklas 71     uri.append("#");
4203 02 Nov 16 nicklas 72     uri.append(jobId.getClusterJobId());
4203 02 Nov 16 nicklas 73     logger.debug("Created signal URI: " +uri.toString());
4203 02 Nov 16 nicklas 74     return uri.toString();
4203 02 Nov 16 nicklas 75   }
4203 02 Nov 16 nicklas 76
4203 02 Nov 16 nicklas 77   public OpenGridSignalHandlerFactory()
4203 02 Nov 16 nicklas 78   {}
4203 02 Nov 16 nicklas 79
4203 02 Nov 16 nicklas 80   /**
4203 02 Nov 16 nicklas 81     If the signal schema is "ogx" this factory should handle it.
4203 02 Nov 16 nicklas 82   */
4203 02 Nov 16 nicklas 83   @Override
4203 02 Nov 16 nicklas 84   public boolean prepareContext(InvokationContext<? super SignalHandler> context) 
4203 02 Nov 16 nicklas 85   {
4203 02 Nov 16 nicklas 86     URI signalURI = (URI)context.getClientContext().getAttribute("signal-uri");
4203 02 Nov 16 nicklas 87     return "ogx".equals(signalURI.getScheme());
4203 02 Nov 16 nicklas 88   }
4203 02 Nov 16 nicklas 89
4203 02 Nov 16 nicklas 90   
4203 02 Nov 16 nicklas 91   @Override
4203 02 Nov 16 nicklas 92   public SignalHandler[] getActions(InvokationContext<? super SignalHandler> context) 
4203 02 Nov 16 nicklas 93   {
4203 02 Nov 16 nicklas 94     URI signalURI = (URI)context.getClientContext().getAttribute("signal-uri");
4203 02 Nov 16 nicklas 95     Job job = (Job)context.getClientContext().getCurrentItem();
4203 02 Nov 16 nicklas 96     if (logger.isDebugEnabled())
4203 02 Nov 16 nicklas 97     {
4203 02 Nov 16 nicklas 98       logger.debug("Creating signal handler for " + signalURI);
4203 02 Nov 16 nicklas 99     }
4203 02 Nov 16 nicklas 100     String jobId = signalURI.getUserInfo();
4203 02 Nov 16 nicklas 101     return new SignalHandler[] { new OpenGridSignalHandler(signalURI, job, SUPPORTED_SIGNALS) };
4203 02 Nov 16 nicklas 102   }
4203 02 Nov 16 nicklas 103
4203 02 Nov 16 nicklas 104   
4203 02 Nov 16 nicklas 105   /**
4203 02 Nov 16 nicklas 106     Signal handler implementation. Parameters are retreived from
4203 02 Nov 16 nicklas 107     the context by the factory class.
4203 02 Nov 16 nicklas 108   */
4203 02 Nov 16 nicklas 109   static class OpenGridSignalHandler
4203 02 Nov 16 nicklas 110     implements SignalHandler
4203 02 Nov 16 nicklas 111   {
4203 02 Nov 16 nicklas 112   
4203 02 Nov 16 nicklas 113     private final URI signalURI;
4203 02 Nov 16 nicklas 114     private final Job job;
4203 02 Nov 16 nicklas 115     private final Collection<Signal> supportedSignals;
4203 02 Nov 16 nicklas 116     
4203 02 Nov 16 nicklas 117     OpenGridSignalHandler(URI signalURI, Job job, Collection<Signal> signals)
4203 02 Nov 16 nicklas 118     {
4203 02 Nov 16 nicklas 119       this.signalURI = signalURI;
4203 02 Nov 16 nicklas 120       this.job = job;
4203 02 Nov 16 nicklas 121       this.supportedSignals = signals;
4203 02 Nov 16 nicklas 122     }
4203 02 Nov 16 nicklas 123     
4203 02 Nov 16 nicklas 124     @Override
4203 02 Nov 16 nicklas 125     public Collection<Signal> getSupportedSignals() 
4203 02 Nov 16 nicklas 126     {
4203 02 Nov 16 nicklas 127       return supportedSignals;
4203 02 Nov 16 nicklas 128     }
4203 02 Nov 16 nicklas 129     
4203 02 Nov 16 nicklas 130     @Override
4203 02 Nov 16 nicklas 131     public boolean supports(Signal signal) 
4203 02 Nov 16 nicklas 132     {
4203 02 Nov 16 nicklas 133       return supportedSignals.contains(signal);
4203 02 Nov 16 nicklas 134     }
4203 02 Nov 16 nicklas 135
4203 02 Nov 16 nicklas 136     @Override
4203 02 Nov 16 nicklas 137     public void handleSignal(Signal signal) 
4203 02 Nov 16 nicklas 138     {
4203 02 Nov 16 nicklas 139       if (logger.isDebugEnabled())
4203 02 Nov 16 nicklas 140       {
4203 02 Nov 16 nicklas 141         logger.debug("Got signal " + signal.getId() + " for " + signalURI);
4203 02 Nov 16 nicklas 142       }
4203 02 Nov 16 nicklas 143       if (!supports(signal)) return;
4203 02 Nov 16 nicklas 144
4203 02 Nov 16 nicklas 145       JobIdentifier jobId = new JobIdentifier(signalURI.getAuthority(), signalURI.getFragment(), job.getId());
4203 02 Nov 16 nicklas 146       OpenGridService srv = OpenGridService.getInstance();
4203 02 Nov 16 nicklas 147       
4203 02 Nov 16 nicklas 148       if (signal == Signal.ABORT)
4203 02 Nov 16 nicklas 149       {
4203 02 Nov 16 nicklas 150         srv.asyncJobAbort(jobId);
4203 02 Nov 16 nicklas 151       }
4203 02 Nov 16 nicklas 152       else if (signal == Signal.STATUS)
4203 02 Nov 16 nicklas 153       {
4203 02 Nov 16 nicklas 154         srv.asyncJobStatusUpdate(jobId);
4203 02 Nov 16 nicklas 155       }
4203 02 Nov 16 nicklas 156     }
4203 02 Nov 16 nicklas 157
4203 02 Nov 16 nicklas 158   }
4203 02 Nov 16 nicklas 159   
4203 02 Nov 16 nicklas 160 }