extensions/net.sf.basedb.opengrid/trunk/src/net/sf/basedb/opengrid/engine/DirectEngine.java

Code
Comments
Other
Rev Date Author Line
6613 28 Feb 22 nicklas 1 package net.sf.basedb.opengrid.engine;
6613 28 Feb 22 nicklas 2
6613 28 Feb 22 nicklas 3 import java.util.Arrays;
6613 28 Feb 22 nicklas 4 import java.util.regex.Pattern;
6613 28 Feb 22 nicklas 5
6613 28 Feb 22 nicklas 6 import org.slf4j.LoggerFactory;
6613 28 Feb 22 nicklas 7
6613 28 Feb 22 nicklas 8 import net.sf.basedb.core.Job;
6613 28 Feb 22 nicklas 9 import net.sf.basedb.opengrid.CmdResult;
6613 28 Feb 22 nicklas 10 import net.sf.basedb.opengrid.JobDefinition;
6613 28 Feb 22 nicklas 11 import net.sf.basedb.opengrid.JobIdentifier;
6613 28 Feb 22 nicklas 12 import net.sf.basedb.opengrid.JobStatus;
6614 28 Feb 22 nicklas 13 import net.sf.basedb.opengrid.OpenGrid;
6613 28 Feb 22 nicklas 14 import net.sf.basedb.opengrid.OpenGridSession;
6627 07 Mar 22 nicklas 15 import net.sf.basedb.opengrid.ScriptBuilder;
6672 11 Apr 22 nicklas 16 import net.sf.basedb.opengrid.config.BatchConfig;
6613 28 Feb 22 nicklas 17 import net.sf.basedb.opengrid.config.ClusterConfig;
6639 11 Mar 22 nicklas 18 import net.sf.basedb.opengrid.config.JobConfig;
6614 28 Feb 22 nicklas 19 import net.sf.basedb.opengrid.filetransfer.InputStreamUploadSource;
6613 28 Feb 22 nicklas 20 import net.sf.basedb.opengrid.filetransfer.StringUploadSource;
6613 28 Feb 22 nicklas 21 import net.sf.basedb.opengrid.filetransfer.UploadSource;
7075 27 Mar 23 nicklas 22 import net.sf.basedb.opengrid.service.OpenGridService;
6613 28 Feb 22 nicklas 23 import net.sf.basedb.util.Values;
7075 27 Mar 23 nicklas 24 import net.sf.basedb.util.extensions.logging.ExtensionsLog;
7075 27 Mar 23 nicklas 25 import net.sf.basedb.util.extensions.logging.ExtensionsLogger;
6615 01 Mar 22 nicklas 26
6613 28 Feb 22 nicklas 27 /**
6613 28 Feb 22 nicklas 28   Engine implementation for servers without a job manager. Jobs are stared immediately
6613 28 Feb 22 nicklas 29   when submitted. Intended to be used for development and testing only.
6613 28 Feb 22 nicklas 30   
6613 28 Feb 22 nicklas 31   @author nicklas
6613 28 Feb 22 nicklas 32   @since 1.5
6613 28 Feb 22 nicklas 33 */
6613 28 Feb 22 nicklas 34 public class DirectEngine 
6613 28 Feb 22 nicklas 35   implements ClusterEngine
6613 28 Feb 22 nicklas 36 {
6613 28 Feb 22 nicklas 37
7075 27 Mar 23 nicklas 38   private static final ExtensionsLogger logger = 
7075 27 Mar 23 nicklas 39       ExtensionsLog.getLogger(OpenGridService.ID, true).wrap(LoggerFactory.getLogger(DirectEngine.class));
6613 28 Feb 22 nicklas 40
6613 28 Feb 22 nicklas 41   public DirectEngine() 
6613 28 Feb 22 nicklas 42   {}
6613 28 Feb 22 nicklas 43   
6613 28 Feb 22 nicklas 44   @Override
6613 28 Feb 22 nicklas 45   public void setDefaultConfig(ClusterConfig config)
6613 28 Feb 22 nicklas 46   {
6613 28 Feb 22 nicklas 47     config.setOpenGridInfoCommand("cat /etc/os-release | grep PRETTY_NAME | cut -d '\"' -f 2");
6613 28 Feb 22 nicklas 48   }
6613 28 Feb 22 nicklas 49
6613 28 Feb 22 nicklas 50   @Override
6614 28 Feb 22 nicklas 51   public JobSubmission createJobSubmission(OpenGridSession session, JobDefinition job, String workFolder, String tmpFolder) 
6627 07 Mar 22 nicklas 52   {
6627 07 Mar 22 nicklas 53     UploadSource options = createOptionsFile(session, job, workFolder, tmpFolder);
6629 07 Mar 22 nicklas 54     UploadSource submit = getScript("submit.sh");
6629 07 Mar 22 nicklas 55     UploadSource run = getScript("run.sh");
6614 28 Feb 22 nicklas 56     UploadSource jobScript = new StringUploadSource("job.sh", createJobScript(job, workFolder, tmpFolder));
6629 07 Mar 22 nicklas 57     return new JobSubmission("/bin/bash "+workFolder + "/submit.sh "+workFolder+"/options.sh", Arrays.asList(options, submit, run, jobScript));
6613 28 Feb 22 nicklas 58   }
6613 28 Feb 22 nicklas 59   
6613 28 Feb 22 nicklas 60   @Override
6613 28 Feb 22 nicklas 61   public CmdResult<JobStatus> getStatusInQueue(OpenGridSession session, JobIdentifier jobId, int timeAdjustment) 
6613 28 Feb 22 nicklas 62   {
6614 28 Feb 22 nicklas 63     return session.execute(new StatusCmd(session, jobId, timeAdjustment), 5);
6613 28 Feb 22 nicklas 64   }
6614 28 Feb 22 nicklas 65
6613 28 Feb 22 nicklas 66   @Override
6613 28 Feb 22 nicklas 67   public CmdResult<JobStatus> getStatusIfFinished(OpenGridSession session, JobIdentifier jobId, int timeAdjustment) 
6613 28 Feb 22 nicklas 68   {
6614 28 Feb 22 nicklas 69     return session.execute(new StatusCmd(session, jobId, timeAdjustment), 5);
6613 28 Feb 22 nicklas 70   }
6613 28 Feb 22 nicklas 71
6613 28 Feb 22 nicklas 72   @Override
6613 28 Feb 22 nicklas 73   public CmdResult<String> cancelJob(OpenGridSession session, JobIdentifier jobId) 
6613 28 Feb 22 nicklas 74   {
6615 01 Mar 22 nicklas 75     return session.executeCmd("kill -- -" + jobId.getClusterJobId(), 5);
6613 28 Feb 22 nicklas 76   }
6613 28 Feb 22 nicklas 77
6627 07 Mar 22 nicklas 78   private UploadSource createOptionsFile(OpenGridSession session, JobDefinition job, String workFolder, String tmpFolder)
6614 28 Feb 22 nicklas 79   {
6639 11 Mar 22 nicklas 80     JobConfig config = job.getConfig();
6672 11 Apr 22 nicklas 81     BatchConfig batchConfig = job.getBatchConfig();
6672 11 Apr 22 nicklas 82
6627 07 Mar 22 nicklas 83     ScriptBuilder options = new ScriptBuilder();
6627 07 Mar 22 nicklas 84     options.export("JOB_NAME", job.getName());
6627 07 Mar 22 nicklas 85     if (job.getDebug()) options.export("JOB_DEBUG", "1");
6627 07 Mar 22 nicklas 86     options.export("WD", workFolder);
6627 07 Mar 22 nicklas 87     options.export("SGE_STDERR_PATH", "${WD}/stderr");
6627 07 Mar 22 nicklas 88     options.export("SGE_STDOUT_PATH", "${WD}/stdout");
6627 07 Mar 22 nicklas 89     options.export("TMPDIR", tmpFolder);
6627 07 Mar 22 nicklas 90     options.export("NSLOTS", "`nproc`");
6639 11 Mar 22 nicklas 91     if (config.getPriority() != null)
6639 11 Mar 22 nicklas 92     {
6639 11 Mar 22 nicklas 93       options.export("NICE", Integer.toString(config.getPriority()/-50));
6639 11 Mar 22 nicklas 94     }
6672 11 Apr 22 nicklas 95     if (batchConfig != null)
6672 11 Apr 22 nicklas 96     {
6672 11 Apr 22 nicklas 97       int delay = batchConfig.getDelayForNextJob();
6672 11 Apr 22 nicklas 98       if (delay > 0) 
6672 11 Apr 22 nicklas 99       {
6672 11 Apr 22 nicklas 100         options.export("JOB_DELAY", Integer.toString(delay));
6672 11 Apr 22 nicklas 101       }
6672 11 Apr 22 nicklas 102     }
6672 11 Apr 22 nicklas 103     
6627 07 Mar 22 nicklas 104     options.export("STATUS_FILE", getJobStatusPath(session, "${JOB_ID}"));
6627 07 Mar 22 nicklas 105     return options.toUploadSource("options.sh");
6614 28 Feb 22 nicklas 106   }
6614 28 Feb 22 nicklas 107   
6629 07 Mar 22 nicklas 108   private UploadSource getScript(String name)
6615 01 Mar 22 nicklas 109   {
6629 07 Mar 22 nicklas 110     return new InputStreamUploadSource(name, OpenGrid.class.getResourceAsStream("/net/sf/basedb/opengrid/engine/direct/"+name));
6615 01 Mar 22 nicklas 111   }
6615 01 Mar 22 nicklas 112   
6615 01 Mar 22 nicklas 113   
6613 28 Feb 22 nicklas 114   /**
6614 28 Feb 22 nicklas 115     Get the path to where the status information for a job is saved.
6614 28 Feb 22 nicklas 116   */
6614 28 Feb 22 nicklas 117   private static String getJobStatusPath(OpenGridSession session, String jobId)
6614 28 Feb 22 nicklas 118   {
6614 28 Feb 22 nicklas 119     return session.getHost().getConfig().getJobFolder()+"/status-"+jobId;
6614 28 Feb 22 nicklas 120   }
6614 28 Feb 22 nicklas 121   
6614 28 Feb 22 nicklas 122   /**
6629 07 Mar 22 nicklas 123     Generates a script that executes the job script.
6613 28 Feb 22 nicklas 124   */
6614 28 Feb 22 nicklas 125   public String createJobScript(JobDefinition job, String workFolder, String tmpFolder)
6613 28 Feb 22 nicklas 126   {
6667 05 Apr 22 nicklas 127     ScriptBuilder script = new ScriptBuilder();
6667 05 Apr 22 nicklas 128     script.cmd("#!/bin/bash");
6667 05 Apr 22 nicklas 129     script.cmd(job.getCmd());
6613 28 Feb 22 nicklas 130     return script.toString();
6613 28 Feb 22 nicklas 131   }
6613 28 Feb 22 nicklas 132
6613 28 Feb 22 nicklas 133   /**
6614 28 Feb 22 nicklas 134     Implementation for getting information about a running or
6614 28 Feb 22 nicklas 135     finished job. We simply 'cat' the STATUS_FILE for the job
6614 28 Feb 22 nicklas 136     and expect useful information to be in there.
6613 28 Feb 22 nicklas 137   */
6614 28 Feb 22 nicklas 138   public static class StatusCmd
6613 28 Feb 22 nicklas 139     extends CmdResult<JobStatus>
6613 28 Feb 22 nicklas 140   {
6613 28 Feb 22 nicklas 141     private final JobIdentifier jobId;
6613 28 Feb 22 nicklas 142     private final int timeAdjustment;
6614 28 Feb 22 nicklas 143     
6614 28 Feb 22 nicklas 144     public StatusCmd(OpenGridSession session, JobIdentifier jobId, int timeAdjustment) 
6613 28 Feb 22 nicklas 145     {
6614 28 Feb 22 nicklas 146       super("cat "+getJobStatusPath(session, jobId.getClusterJobId()));
6613 28 Feb 22 nicklas 147       this.jobId = jobId;
6613 28 Feb 22 nicklas 148       this.timeAdjustment = timeAdjustment;
6613 28 Feb 22 nicklas 149     }
6614 28 Feb 22 nicklas 150     
6613 28 Feb 22 nicklas 151     @Override
6613 28 Feb 22 nicklas 152     protected void parseResult() 
6613 28 Feb 22 nicklas 153     {
6613 28 Feb 22 nicklas 154       if (getExitStatus() != 0) return;
6613 28 Feb 22 nicklas 155       
6613 28 Feb 22 nicklas 156       if (logger.isDebugEnabled())
6613 28 Feb 22 nicklas 157       {
6614 28 Feb 22 nicklas 158         logger.debug("Got status information for job: " + jobId);
6613 28 Feb 22 nicklas 159       }
6613 28 Feb 22 nicklas 160       try
6613 28 Feb 22 nicklas 161       {
6614 28 Feb 22 nicklas 162         ProcessJobStatus status = new ProcessJobStatus(jobId);
6614 28 Feb 22 nicklas 163         status.readFromStatusFile(getStdout(), timeAdjustment);
6613 28 Feb 22 nicklas 164         setResult(status);
6613 28 Feb 22 nicklas 165       }
6613 28 Feb 22 nicklas 166       catch (Exception ex)
6613 28 Feb 22 nicklas 167       {
6614 28 Feb 22 nicklas 168         setException(new RuntimeException("Could not parse status output for job: " + jobId, ex));
6614 28 Feb 22 nicklas 169         logger.error("Could not parse status output for job: " + jobId, ex);
6613 28 Feb 22 nicklas 170       }
6614 28 Feb 22 nicklas 171     }
6613 28 Feb 22 nicklas 172   }
6613 28 Feb 22 nicklas 173   
6613 28 Feb 22 nicklas 174   /**
6614 28 Feb 22 nicklas 175     Job status information for direct jobs.
6613 28 Feb 22 nicklas 176   */
6614 28 Feb 22 nicklas 177   public static class ProcessJobStatus
6613 28 Feb 22 nicklas 178     extends JobStatus
6613 28 Feb 22 nicklas 179   {
6614 28 Feb 22 nicklas 180     
6614 28 Feb 22 nicklas 181     public ProcessJobStatus(JobIdentifier jobId)
6613 28 Feb 22 nicklas 182     {
6613 28 Feb 22 nicklas 183       super(jobId);
6613 28 Feb 22 nicklas 184     }
6614 28 Feb 22 nicklas 185         
6613 28 Feb 22 nicklas 186     /**
6614 28 Feb 22 nicklas 187       Parse information from the STATUS_FILE.
6613 28 Feb 22 nicklas 188     */
6614 28 Feb 22 nicklas 189     void readFromStatusFile(String text, int timeAdjustment)
6613 28 Feb 22 nicklas 190     {
6614 28 Feb 22 nicklas 191       String[] lines = text.split("\n");
6614 28 Feb 22 nicklas 192       Pattern p = Pattern.compile(":");
6613 28 Feb 22 nicklas 193       
6614 28 Feb 22 nicklas 194       boolean submitted = false;
6614 28 Feb 22 nicklas 195       boolean started = false;
6614 28 Feb 22 nicklas 196       boolean ended = false;
6613 28 Feb 22 nicklas 197       
6613 28 Feb 22 nicklas 198       for (String line : lines)
6613 28 Feb 22 nicklas 199       {
6613 28 Feb 22 nicklas 200         String[] kv = p.split(line, 2);
6613 28 Feb 22 nicklas 201         if (kv.length == 2)
6613 28 Feb 22 nicklas 202         {
6614 28 Feb 22 nicklas 203           String key = kv[0].trim();
6613 28 Feb 22 nicklas 204           String value = kv[1].trim();
6613 28 Feb 22 nicklas 205           
6614 28 Feb 22 nicklas 206           if ("Name".equals(key))
6613 28 Feb 22 nicklas 207           {
6613 28 Feb 22 nicklas 208             setName(value);
6613 28 Feb 22 nicklas 209           }
6614 28 Feb 22 nicklas 210           else if ("Host".equals(key))
6613 28 Feb 22 nicklas 211           {
6614 28 Feb 22 nicklas 212             setNodeName(value);
6613 28 Feb 22 nicklas 213           }
6614 28 Feb 22 nicklas 214           else if ("Submitted".equals(key))
6613 28 Feb 22 nicklas 215           {
6614 28 Feb 22 nicklas 216             setSubmissionTime(ClusterConfig.DATE_CMD.parseString(value).getTime() + timeAdjustment * 1000);
6614 28 Feb 22 nicklas 217             submitted = true;
6613 28 Feb 22 nicklas 218           }
6614 28 Feb 22 nicklas 219           else if ("Started".equals(key))
6613 28 Feb 22 nicklas 220           {
6614 28 Feb 22 nicklas 221             setStartTime(ClusterConfig.DATE_CMD.parseString(value).getTime() + timeAdjustment * 1000);
6614 28 Feb 22 nicklas 222             started = true;
6613 28 Feb 22 nicklas 223           }
6614 28 Feb 22 nicklas 224           else if ("Ended".equals(key))
6613 28 Feb 22 nicklas 225           {
6614 28 Feb 22 nicklas 226             setEndTime(ClusterConfig.DATE_CMD.parseString(value).getTime() + timeAdjustment * 1000);
6614 28 Feb 22 nicklas 227             ended = true;
6613 28 Feb 22 nicklas 228           }
6614 28 Feb 22 nicklas 229           else if ("ExitCode".equals(key))
6613 28 Feb 22 nicklas 230           {
6613 28 Feb 22 nicklas 231             setExitCode(Values.getInt(value));
6613 28 Feb 22 nicklas 232           }
6613 28 Feb 22 nicklas 233         }
6613 28 Feb 22 nicklas 234       }
6614 28 Feb 22 nicklas 235       
6614 28 Feb 22 nicklas 236       if (ended)
6614 28 Feb 22 nicklas 237       {
6614 28 Feb 22 nicklas 238         setStatus(getExitCode() == 0 ? Job.Status.DONE : Job.Status.ERROR);
6614 28 Feb 22 nicklas 239       }
6614 28 Feb 22 nicklas 240       else if (started)
6614 28 Feb 22 nicklas 241       {
6614 28 Feb 22 nicklas 242         setStatus(Job.Status.EXECUTING);
6614 28 Feb 22 nicklas 243       }
6614 28 Feb 22 nicklas 244       else
6614 28 Feb 22 nicklas 245       {
6614 28 Feb 22 nicklas 246         setStatus(Job.Status.WAITING);
6614 28 Feb 22 nicklas 247       }
6613 28 Feb 22 nicklas 248     }
6613 28 Feb 22 nicklas 249   }
6614 28 Feb 22 nicklas 250
6613 28 Feb 22 nicklas 251 }