extensions/net.sf.basedb.opengrid/trunk/src/net/sf/basedb/opengrid/service/OpenGridService.java

Code
Comments
Other
Rev Date Author Line
4203 02 Nov 16 nicklas 1 package net.sf.basedb.opengrid.service;
4203 02 Nov 16 nicklas 2
4205 03 Nov 16 nicklas 3 import java.util.ArrayList;
4254 25 Nov 16 nicklas 4 import java.util.Collection;
4229 10 Nov 16 nicklas 5 import java.util.Collections;
4203 02 Nov 16 nicklas 6 import java.util.HashMap;
4255 28 Nov 16 nicklas 7 import java.util.HashSet;
4284 21 Dec 16 nicklas 8 import java.util.LinkedHashMap;
4205 03 Nov 16 nicklas 9 import java.util.LinkedHashSet;
4205 03 Nov 16 nicklas 10 import java.util.List;
4203 02 Nov 16 nicklas 11 import java.util.Map;
4203 02 Nov 16 nicklas 12 import java.util.Set;
4203 02 Nov 16 nicklas 13 import java.util.TimerTask;
4203 02 Nov 16 nicklas 14
4203 02 Nov 16 nicklas 15 import org.slf4j.LoggerFactory;
4203 02 Nov 16 nicklas 16
4203 02 Nov 16 nicklas 17 import net.sf.basedb.clients.web.extensions.service.ServiceControllerAction;
4203 02 Nov 16 nicklas 18 import net.sf.basedb.core.Application;
4205 03 Nov 16 nicklas 19 import net.sf.basedb.core.DbControl;
4255 28 Nov 16 nicklas 20 import net.sf.basedb.core.Include;
4301 13 Jan 17 nicklas 21 import net.sf.basedb.core.ItemNotFoundException;
4255 28 Nov 16 nicklas 22 import net.sf.basedb.core.ItemQuery;
4205 03 Nov 16 nicklas 23 import net.sf.basedb.core.Job;
4255 28 Nov 16 nicklas 24 import net.sf.basedb.core.JobAgent;
4255 28 Nov 16 nicklas 25 import net.sf.basedb.core.Permission;
4255 28 Nov 16 nicklas 26 import net.sf.basedb.core.PermissionDeniedException;
4245 21 Nov 16 nicklas 27 import net.sf.basedb.core.Project;
4203 02 Nov 16 nicklas 28 import net.sf.basedb.core.SessionControl;
4229 10 Nov 16 nicklas 29 import net.sf.basedb.core.StringUtil;
4205 03 Nov 16 nicklas 30 import net.sf.basedb.core.SystemItems;
4205 03 Nov 16 nicklas 31 import net.sf.basedb.core.User;
4205 03 Nov 16 nicklas 32 import net.sf.basedb.opengrid.CmdResult;
4203 02 Nov 16 nicklas 33 import net.sf.basedb.opengrid.JobIdentifier;
4212 08 Nov 16 nicklas 34 import net.sf.basedb.opengrid.JobStatus;
4203 02 Nov 16 nicklas 35 import net.sf.basedb.opengrid.OpenGridCluster;
4205 03 Nov 16 nicklas 36 import net.sf.basedb.opengrid.OpenGridSession;
7380 18 Oct 23 nicklas 37 import net.sf.basedb.opengrid.config.ClusterConfig;
4254 25 Nov 16 nicklas 38 import net.sf.basedb.opengrid.config.XmlConfig;
4205 03 Nov 16 nicklas 39 import net.sf.basedb.util.FileUtil;
4245 21 Nov 16 nicklas 40 import net.sf.basedb.util.Values;
4245 21 Nov 16 nicklas 41 import net.sf.basedb.util.extensions.ActionIterator;
4245 21 Nov 16 nicklas 42 import net.sf.basedb.util.extensions.ClientContext;
4203 02 Nov 16 nicklas 43 import net.sf.basedb.util.extensions.Extension;
4245 21 Nov 16 nicklas 44 import net.sf.basedb.util.extensions.ExtensionsInvoker;
4245 21 Nov 16 nicklas 45 import net.sf.basedb.util.extensions.Registry;
7075 27 Mar 23 nicklas 46 import net.sf.basedb.util.extensions.logging.ExtensionsLog;
7075 27 Mar 23 nicklas 47 import net.sf.basedb.util.extensions.logging.ExtensionsLogger;
4245 21 Nov 16 nicklas 48 import net.sf.basedb.util.extensions.manager.Settings;
5995 21 Aug 20 nicklas 49 import net.sf.basedb.util.filter.Filter;
5995 21 Aug 20 nicklas 50 import net.sf.basedb.util.filter.StaticFilter;
4203 02 Nov 16 nicklas 51
4203 02 Nov 16 nicklas 52 /**
4203 02 Nov 16 nicklas 53   Service class for asynchronous communication with Open Grid
4203 02 Nov 16 nicklas 54   Scheduler clusters. When the service is running it will at regular
4203 02 Nov 16 nicklas 55   intervals query the registered clusters for waiting and running
4296 12 Jan 17 nicklas 56   jobs. The service will also maintain a list of available Open
4296 12 Jan 17 nicklas 57   Grid Clusters that are configured in the opengrid-config.xml file
4296 12 Jan 17 nicklas 58   (should located in the WEB-INF/classes directory).
4203 02 Nov 16 nicklas 59   
4203 02 Nov 16 nicklas 60   
4203 02 Nov 16 nicklas 61   @author nicklas
4203 02 Nov 16 nicklas 62   @since 1.0
4203 02 Nov 16 nicklas 63 */
4203 02 Nov 16 nicklas 64 public class OpenGridService 
4203 02 Nov 16 nicklas 65 {
7075 27 Mar 23 nicklas 66   /**
7075 27 Mar 23 nicklas 67     The ID of the service extension.
7075 27 Mar 23 nicklas 68     @since 1.10
7075 27 Mar 23 nicklas 69   */
7075 27 Mar 23 nicklas 70   public static final String ID = "net.sf.basedb.opengrid.service";
7075 27 Mar 23 nicklas 71   
7075 27 Mar 23 nicklas 72   private static final ExtensionsLogger logger = 
7075 27 Mar 23 nicklas 73     ExtensionsLog.getLogger(ID, true).wrap(LoggerFactory.getLogger(OpenGridService.class));
7075 27 Mar 23 nicklas 74   
4203 02 Nov 16 nicklas 75   // The singleton
4203 02 Nov 16 nicklas 76   private static OpenGridService instance = null;
4203 02 Nov 16 nicklas 77
4203 02 Nov 16 nicklas 78   /**
4203 02 Nov 16 nicklas 79     Get the singleton instance of the service. If the service has
4203 02 Nov 16 nicklas 80     not been created yet it is created at this time.
4203 02 Nov 16 nicklas 81   */
4203 02 Nov 16 nicklas 82   public static final OpenGridService getInstance()
4203 02 Nov 16 nicklas 83   {
4203 02 Nov 16 nicklas 84     if (instance == null)
4203 02 Nov 16 nicklas 85     {
4203 02 Nov 16 nicklas 86       synchronized (OpenGridService.class)
4203 02 Nov 16 nicklas 87       {
4203 02 Nov 16 nicklas 88         if (instance == null)
4203 02 Nov 16 nicklas 89         {
4203 02 Nov 16 nicklas 90           OpenGridService tmp = new OpenGridService();
4203 02 Nov 16 nicklas 91           instance = tmp;
4203 02 Nov 16 nicklas 92         }
4203 02 Nov 16 nicklas 93       }
4203 02 Nov 16 nicklas 94     }
4205 03 Nov 16 nicklas 95     // Keep the session control alive
4205 03 Nov 16 nicklas 96     if (instance.rootSc != null) instance.rootSc.updateLastAccess();
4203 02 Nov 16 nicklas 97     return instance;
4203 02 Nov 16 nicklas 98   }
4203 02 Nov 16 nicklas 99   
4226 10 Nov 16 nicklas 100   // At least 1 minute between status updates (30 seconds when debugging)
7380 18 Oct 23 nicklas 101   private static final long MIN_WAIT_INTERVAL_NORMAL = 60_000;
7380 18 Oct 23 nicklas 102   private static final long MIN_WAIT_INTERVAL_DEBUG = 30_000;
4226 10 Nov 16 nicklas 103   
7380 18 Oct 23 nicklas 104   // At least 1 hour between cleanups (30 seconds when debugging)
7380 18 Oct 23 nicklas 105   private static final long MIN_CLEANUP_INTERVAL_NORMAL = 3_600_000;
7380 18 Oct 23 nicklas 106   private static final long MIN_CLEANUP_INTERVAL_DEBUG = 30_000;
7380 18 Oct 23 nicklas 107   
4203 02 Nov 16 nicklas 108   // Map Cluster ID -> Cluster
4203 02 Nov 16 nicklas 109   private final Map<String, OpenGridCluster> clusters;
4203 02 Nov 16 nicklas 110   
4203 02 Nov 16 nicklas 111   private volatile boolean isRunning;
4203 02 Nov 16 nicklas 112   private TimerTask jobStatusTimer;
7380 18 Oct 23 nicklas 113   private TimerTask cleanupTimer;
4226 10 Nov 16 nicklas 114   private long lastJobStatusUpdate;
7380 18 Oct 23 nicklas 115   private long lastCleanup;
4226 10 Nov 16 nicklas 116   
4203 02 Nov 16 nicklas 117   private SessionControl systemSc;
4205 03 Nov 16 nicklas 118   private SessionControl rootSc;
4203 02 Nov 16 nicklas 119   private Extension<ServiceControllerAction> ext;
7075 27 Mar 23 nicklas 120 //  private ServiceLogger logger;
4203 02 Nov 16 nicklas 121
4254 25 Nov 16 nicklas 122   private final Set<JobIdentifier> jobsToAbort;
4254 25 Nov 16 nicklas 123   private final Set<JobIdentifier> jobsToUpdate;
4284 21 Dec 16 nicklas 124   private final Map<JobIdentifier, JobStatusUpdater> nonGridJobsToUpdate;
4254 25 Nov 16 nicklas 125   private final Set<JobIdentifier> unknownJobs;
4205 03 Nov 16 nicklas 126   
4203 02 Nov 16 nicklas 127   private OpenGridService()
4203 02 Nov 16 nicklas 128   {
6634 08 Mar 22 nicklas 129     clusters = new LinkedHashMap<>();
4205 03 Nov 16 nicklas 130     jobsToAbort = new LinkedHashSet<>();
4205 03 Nov 16 nicklas 131     jobsToUpdate = new LinkedHashSet<>();
4284 21 Dec 16 nicklas 132     nonGridJobsToUpdate = new LinkedHashMap<>();
4222 09 Nov 16 nicklas 133     unknownJobs = new LinkedHashSet<>();
4203 02 Nov 16 nicklas 134   }
4203 02 Nov 16 nicklas 135   
4265 15 Dec 16 nicklas 136   /**
4265 15 Dec 16 nicklas 137     Check if a cluster with the given id has been defined and
4265 15 Dec 16 nicklas 138     registered with this service.
4265 15 Dec 16 nicklas 139   */
4265 15 Dec 16 nicklas 140   public boolean isDefined(String clusterId)
4265 15 Dec 16 nicklas 141   {
4265 15 Dec 16 nicklas 142     return clusters.containsKey(clusterId);
4265 15 Dec 16 nicklas 143   }
4254 25 Nov 16 nicklas 144
4254 25 Nov 16 nicklas 145   /**
4255 28 Nov 16 nicklas 146     Get information about a cluster with known id. If the cluster is
4255 28 Nov 16 nicklas 147     linked to a job agent, a permission check is made to see if the
4255 28 Nov 16 nicklas 148     logged in user has USE permission on the job agent.
4255 28 Nov 16 nicklas 149     
4255 28 Nov 16 nicklas 150     @param dc An open DbControl for permission check against job agents
4255 28 Nov 16 nicklas 151       (null is allowed if the cluster is not linked to a job agent)
4255 28 Nov 16 nicklas 152     @param clusterId The ID of the cluster
4255 28 Nov 16 nicklas 153     
4254 25 Nov 16 nicklas 154     @return A cluster instance or null if not found
4255 28 Nov 16 nicklas 155
4255 28 Nov 16 nicklas 156     @throws ItemNotFoundException If a cluster is found but the job agent it is
4255 28 Nov 16 nicklas 157       linked to is not found
4255 28 Nov 16 nicklas 158     @throws PermissionDeniedException If the cluster is linked to a job agent
4255 28 Nov 16 nicklas 159       that the user doens't have permission to use
4254 25 Nov 16 nicklas 160    */
4255 28 Nov 16 nicklas 161   public OpenGridCluster getClusterById(DbControl dc, String clusterId)
4205 03 Nov 16 nicklas 162   {
4255 28 Nov 16 nicklas 163     OpenGridCluster cluster = clusters.get(clusterId);
4255 28 Nov 16 nicklas 164     if (cluster == null) return null;
4255 28 Nov 16 nicklas 165     
4255 28 Nov 16 nicklas 166     String jobAgentId = cluster.getConfig().getJobAgentExternalId();
4255 28 Nov 16 nicklas 167     if (jobAgentId != null)
4255 28 Nov 16 nicklas 168     {
4255 28 Nov 16 nicklas 169       if (dc == null) throw new PermissionDeniedException(Permission.USE, jobAgentId);
4255 28 Nov 16 nicklas 170       JobAgent agent = JobAgent.getByExternalId(dc, jobAgentId);
4255 28 Nov 16 nicklas 171       agent.checkPermission(Permission.USE);
4255 28 Nov 16 nicklas 172     }
4255 28 Nov 16 nicklas 173     return cluster;
4205 03 Nov 16 nicklas 174   }
4205 03 Nov 16 nicklas 175   
4203 02 Nov 16 nicklas 176   /**
5995 21 Aug 20 nicklas 177     @see #getClusters(DbControl, Collection, Filter)
5995 21 Aug 20 nicklas 178   */
5995 21 Aug 20 nicklas 179   public Collection<OpenGridCluster> getClusters(DbControl dc, Collection<Include> include)
5995 21 Aug 20 nicklas 180   {
5995 21 Aug 20 nicklas 181     return getClusters(dc, include, null);
5995 21 Aug 20 nicklas 182   }
5995 21 Aug 20 nicklas 183   
5995 21 Aug 20 nicklas 184   /**
4255 28 Nov 16 nicklas 185     Get all clusters that the logged in user is allowed to use.
4255 28 Nov 16 nicklas 186     This method uses a query for job agents with the given include
4255 28 Nov 16 nicklas 187     settings (for example, {@link Include#IN_PROJECT}). Clusters that
4255 28 Nov 16 nicklas 188     are open to all are always included.
4255 28 Nov 16 nicklas 189     
4255 28 Nov 16 nicklas 190     @param dc An open DbControl
4255 28 Nov 16 nicklas 191     @param include A set of include options for the job agent query
4255 28 Nov 16 nicklas 192       (if null the default query options are used)
5995 21 Aug 20 nicklas 193     @param filter Optional filter, if not specified all clusters are returned
5995 21 Aug 20 nicklas 194     @since 1.4
4254 25 Nov 16 nicklas 195   */
5995 21 Aug 20 nicklas 196   public Collection<OpenGridCluster> getClusters(DbControl dc, Collection<Include> include, Filter<OpenGridCluster> filter)
4254 25 Nov 16 nicklas 197   {
4255 28 Nov 16 nicklas 198     ItemQuery<JobAgent> query = JobAgent.getQuery();
4255 28 Nov 16 nicklas 199     if (include != null) query.setIncludes(include);
5995 21 Aug 20 nicklas 200     return getClusters(query.list(dc), filter);
4254 25 Nov 16 nicklas 201   }
4254 25 Nov 16 nicklas 202   
4254 25 Nov 16 nicklas 203   /**
5995 21 Aug 20 nicklas 204     @see #getClusters(Collection, Filter)
5995 21 Aug 20 nicklas 205   */
5995 21 Aug 20 nicklas 206   public Collection<OpenGridCluster> getClusters(Collection<JobAgent> jobAgents)
5995 21 Aug 20 nicklas 207   {
5995 21 Aug 20 nicklas 208     return getClusters(jobAgents, null);
5995 21 Aug 20 nicklas 209   }
5995 21 Aug 20 nicklas 210   
5995 21 Aug 20 nicklas 211   /**
4255 28 Nov 16 nicklas 212     Get a collection with all registered clusters that are 
4255 28 Nov 16 nicklas 213     either open to all or referencing one of the listed job agents
4255 28 Nov 16 nicklas 214     where the logged in user has at least USE permission. 
4255 28 Nov 16 nicklas 215     
4255 28 Nov 16 nicklas 216     @param jobAgents A list of job agents (must exist in the database)
4255 28 Nov 16 nicklas 217       If null or empty, only clusters that are open to all are 
4255 28 Nov 16 nicklas 218       returned
5995 21 Aug 20 nicklas 219     @param filter Optional filter, if not specified all clusters are returned
5995 21 Aug 20 nicklas 220     @since 1.4
4255 28 Nov 16 nicklas 221   */
5995 21 Aug 20 nicklas 222   public Collection<OpenGridCluster> getClusters(Collection<JobAgent> jobAgents, Filter<OpenGridCluster> filter)
4255 28 Nov 16 nicklas 223   {
4255 28 Nov 16 nicklas 224     // Collect the external id of the given job agents
4255 28 Nov 16 nicklas 225     Set<String> usableJobAgents = new HashSet<>();
4255 28 Nov 16 nicklas 226     if (jobAgents != null)
4255 28 Nov 16 nicklas 227     {
4255 28 Nov 16 nicklas 228       for (JobAgent agent : jobAgents)
4255 28 Nov 16 nicklas 229       {
4255 28 Nov 16 nicklas 230         if (agent.isInDatabase() && agent.hasPermission(Permission.USE))
4255 28 Nov 16 nicklas 231         {
4255 28 Nov 16 nicklas 232           usableJobAgents.add(agent.getExternalId());
4255 28 Nov 16 nicklas 233         }
4255 28 Nov 16 nicklas 234       }
4255 28 Nov 16 nicklas 235     }
4255 28 Nov 16 nicklas 236     
4255 28 Nov 16 nicklas 237     List<OpenGridCluster> result = new ArrayList<>();
5995 21 Aug 20 nicklas 238     if (filter == null) filter = new StaticFilter<OpenGridCluster>(true);
4255 28 Nov 16 nicklas 239     for (OpenGridCluster cluster : clusters.values())
4255 28 Nov 16 nicklas 240     {
5995 21 Aug 20 nicklas 241       if (filter.evaluate(cluster)) 
4255 28 Nov 16 nicklas 242       {
5995 21 Aug 20 nicklas 243         String jobAgentId = cluster.getConfig().getJobAgentExternalId();
5995 21 Aug 20 nicklas 244         if (jobAgentId == null || usableJobAgents.contains(jobAgentId))
5995 21 Aug 20 nicklas 245         {
5995 21 Aug 20 nicklas 246           result.add(cluster);
5995 21 Aug 20 nicklas 247         }
4255 28 Nov 16 nicklas 248       }
4255 28 Nov 16 nicklas 249     }
4255 28 Nov 16 nicklas 250     return result;
4255 28 Nov 16 nicklas 251   }
4255 28 Nov 16 nicklas 252   
4255 28 Nov 16 nicklas 253   /**
4203 02 Nov 16 nicklas 254     Is the Open Grid service running or not?
4203 02 Nov 16 nicklas 255   */
4203 02 Nov 16 nicklas 256   public boolean isRunning()
4203 02 Nov 16 nicklas 257   {
4203 02 Nov 16 nicklas 258     return isRunning;
4203 02 Nov 16 nicklas 259   }
4203 02 Nov 16 nicklas 260   
4203 02 Nov 16 nicklas 261   /**
4203 02 Nov 16 nicklas 262     Start the service if it is not running.
4203 02 Nov 16 nicklas 263   */
4254 25 Nov 16 nicklas 264   synchronized void start(SessionControl systemSc, Extension<ServiceControllerAction> ext)
4203 02 Nov 16 nicklas 265   {
4222 09 Nov 16 nicklas 266     if (isRunning) return;
4222 09 Nov 16 nicklas 267
4222 09 Nov 16 nicklas 268     logger.debug("Starting Open Grid cluster service");
4254 25 Nov 16 nicklas 269     try
4254 25 Nov 16 nicklas 270     {
4254 25 Nov 16 nicklas 271       this.systemSc = systemSc;
4254 25 Nov 16 nicklas 272       this.rootSc = systemSc.impersonateLogin(SystemItems.getId(User.ROOT), "Open Grid Scheduler Services");
4254 25 Nov 16 nicklas 273       this.ext = ext;
4254 25 Nov 16 nicklas 274       
4254 25 Nov 16 nicklas 275       XmlConfig cfg = new XmlConfig();
4254 25 Nov 16 nicklas 276       cfg.readFromResource("/opengrid-config.xml");
4254 25 Nov 16 nicklas 277       for (OpenGridCluster cluster : cfg.getClusters())
4254 25 Nov 16 nicklas 278       {
4255 28 Nov 16 nicklas 279         if (clusters.containsKey(cluster.getId()))
4255 28 Nov 16 nicklas 280         {
4255 28 Nov 16 nicklas 281           logger.warn("Ignoring duplicate cluster definition: " + cluster);
4255 28 Nov 16 nicklas 282         }
4255 28 Nov 16 nicklas 283         else
4255 28 Nov 16 nicklas 284         {
4255 28 Nov 16 nicklas 285           clusters.put(cluster.getId(), cluster);
4255 28 Nov 16 nicklas 286         }
4254 25 Nov 16 nicklas 287       }
4254 25 Nov 16 nicklas 288       
4254 25 Nov 16 nicklas 289       // Start timer for processing async requests
4254 25 Nov 16 nicklas 290       jobStatusTimer = Application.getScheduler().scheduleAtFixedRate(
4254 25 Nov 16 nicklas 291         new JobStatusTimerTask(), 5000, 5000, false);
7380 18 Oct 23 nicklas 292       
7380 18 Oct 23 nicklas 293       // Start timer for cleaning up on the clusters
7380 18 Oct 23 nicklas 294       cleanupTimer = Application.getScheduler().scheduleAtFixedRate(
7380 18 Oct 23 nicklas 295         new CleanupTimerTask(), 60000, 60000, false);
4254 25 Nov 16 nicklas 296     }
4254 25 Nov 16 nicklas 297     catch (RuntimeException ex)
4254 25 Nov 16 nicklas 298     {
4254 25 Nov 16 nicklas 299       logger.debug("Could not start Open Grid cluster service", ex);
4254 25 Nov 16 nicklas 300       stopInternal();
4254 25 Nov 16 nicklas 301       throw ex;
4254 25 Nov 16 nicklas 302     }
4222 09 Nov 16 nicklas 303     
4222 09 Nov 16 nicklas 304     isRunning = true;
4222 09 Nov 16 nicklas 305     logger.debug("Open Grid cluster service is now running");
4203 02 Nov 16 nicklas 306   }
4203 02 Nov 16 nicklas 307   
4203 02 Nov 16 nicklas 308   /**
4203 02 Nov 16 nicklas 309     Stop the service if it is running.
4203 02 Nov 16 nicklas 310   */
4254 25 Nov 16 nicklas 311   synchronized void stop()
4203 02 Nov 16 nicklas 312   {
4222 09 Nov 16 nicklas 313     if (!isRunning) return;
4254 25 Nov 16 nicklas 314     logger.debug("Stopping Open Grid cluster service");
4254 25 Nov 16 nicklas 315     stopInternal();
4254 25 Nov 16 nicklas 316     logger.debug("Open Grid cluster service has stopped");
4254 25 Nov 16 nicklas 317   }
4222 09 Nov 16 nicklas 318
4254 25 Nov 16 nicklas 319   private void stopInternal()
4254 25 Nov 16 nicklas 320   {
4222 09 Nov 16 nicklas 321     if (jobStatusTimer != null)
4203 02 Nov 16 nicklas 322     {
4222 09 Nov 16 nicklas 323       jobStatusTimer.cancel();
4222 09 Nov 16 nicklas 324       jobStatusTimer = null;
4203 02 Nov 16 nicklas 325     }
7380 18 Oct 23 nicklas 326     if (cleanupTimer != null)
7380 18 Oct 23 nicklas 327     {
7380 18 Oct 23 nicklas 328       cleanupTimer.cancel();
7380 18 Oct 23 nicklas 329       cleanupTimer = null;
7380 18 Oct 23 nicklas 330     }
4254 25 Nov 16 nicklas 331     
4254 25 Nov 16 nicklas 332     jobsToAbort.clear();
4254 25 Nov 16 nicklas 333     jobsToUpdate.clear();
4284 21 Dec 16 nicklas 334     nonGridJobsToUpdate.clear();
4254 25 Nov 16 nicklas 335     unknownJobs.clear();
4254 25 Nov 16 nicklas 336     clusters.clear();
4254 25 Nov 16 nicklas 337     
4222 09 Nov 16 nicklas 338     if (rootSc != null) rootSc.logout();
4222 09 Nov 16 nicklas 339     rootSc = null;
4222 09 Nov 16 nicklas 340     systemSc = null;
4222 09 Nov 16 nicklas 341     ext = null;
4222 09 Nov 16 nicklas 342     isRunning = false;
4203 02 Nov 16 nicklas 343   }
4254 25 Nov 16 nicklas 344   
4205 03 Nov 16 nicklas 345   /**
4205 03 Nov 16 nicklas 346     Register a job that should be aborted. The job
4205 03 Nov 16 nicklas 347     identifier may or may not include a BASE job id.
4205 03 Nov 16 nicklas 348     If a BASE job id is specified the information in
4205 03 Nov 16 nicklas 349     the database is updated once the job has been
4205 03 Nov 16 nicklas 350     aborted on the cluster.
4205 03 Nov 16 nicklas 351     
4205 03 Nov 16 nicklas 352     @param jobId The job identifier
4205 03 Nov 16 nicklas 353   */
4203 02 Nov 16 nicklas 354   public void asyncJobAbort(JobIdentifier jobId)
4203 02 Nov 16 nicklas 355   {
4205 03 Nov 16 nicklas 356     if (jobId == null)
4205 03 Nov 16 nicklas 357     {
4205 03 Nov 16 nicklas 358       throw new NullPointerException("jobId");
4205 03 Nov 16 nicklas 359     }
4203 02 Nov 16 nicklas 360     if (!isRunning) return;
7075 27 Mar 23 nicklas 361     if (logger.isDebugEnabled()) logger.debug("Request abort job: " + jobId);
4205 03 Nov 16 nicklas 362     synchronized (jobsToAbort)
4205 03 Nov 16 nicklas 363     {
4205 03 Nov 16 nicklas 364       jobsToAbort.add(jobId);
4205 03 Nov 16 nicklas 365     }
4203 02 Nov 16 nicklas 366   }
4203 02 Nov 16 nicklas 367   
4205 03 Nov 16 nicklas 368   /**
4205 03 Nov 16 nicklas 369     Register a status update request for a job. The job
4205 03 Nov 16 nicklas 370     identifier may or may not include a BASE job id.
4205 03 Nov 16 nicklas 371     If a BASE job id is specified the information in
4205 03 Nov 16 nicklas 372     the database is updated once the job information has
4205 03 Nov 16 nicklas 373     been retrieved from the cluster.
4205 03 Nov 16 nicklas 374     
4205 03 Nov 16 nicklas 375     @param jobId The job identifier
4205 03 Nov 16 nicklas 376   */
4203 02 Nov 16 nicklas 377   public void asyncJobStatusUpdate(JobIdentifier jobId)
4203 02 Nov 16 nicklas 378   {
4205 03 Nov 16 nicklas 379     if (jobId == null)
4205 03 Nov 16 nicklas 380     {
4205 03 Nov 16 nicklas 381       throw new NullPointerException("jobId");
4205 03 Nov 16 nicklas 382     }
4203 02 Nov 16 nicklas 383     if (!isRunning) return;
4205 03 Nov 16 nicklas 384     if (logger.isDebugEnabled()) logger.debug("Request status update: " + jobId);
4205 03 Nov 16 nicklas 385     synchronized (jobsToUpdate)
4205 03 Nov 16 nicklas 386     {
4205 03 Nov 16 nicklas 387       jobsToUpdate.add(jobId);
4205 03 Nov 16 nicklas 388     }
4205 03 Nov 16 nicklas 389   }
4205 03 Nov 16 nicklas 390   
4226 10 Nov 16 nicklas 391   /**
4284 21 Dec 16 nicklas 392     Register a status update request for a job that is is 
4284 21 Dec 16 nicklas 393     not an Open Grid job. This exists as a service to other
4284 21 Dec 16 nicklas 394     extensions that need to connect to a remote host and
4284 21 Dec 16 nicklas 395     make some checks via SSH.
4284 21 Dec 16 nicklas 396     
4309 17 Jan 17 nicklas 397     If a cluster that matches the {@link JobIdentifier#getClusterId()} 
4309 17 Jan 17 nicklas 398     is found the {@link JobStatusUpdater#getJobStatus(OpenGridSession, JobIdentifier)}
4309 17 Jan 17 nicklas 399     is provided with a connected session, otherwise that parameter is null.
4309 17 Jan 17 nicklas 400     
4309 17 Jan 17 nicklas 401     The {@link JobIdentifier#getClusterJobId()} can always be substituted 
4309 17 Jan 17 nicklas 402     with some other ID.
4309 17 Jan 17 nicklas 403     
4309 17 Jan 17 nicklas 404     @param jobId A job identifier with exceptions as noted above
4301 13 Jan 17 nicklas 405     @param updater The actual instance that performs the update check
4284 21 Dec 16 nicklas 406   */
4284 21 Dec 16 nicklas 407   public void asyncJobStatusUpdate(JobIdentifier jobId, JobStatusUpdater updater)
4284 21 Dec 16 nicklas 408   {
4284 21 Dec 16 nicklas 409     if (jobId == null) throw new NullPointerException("jobId");
4284 21 Dec 16 nicklas 410     if (updater == null) throw new NullPointerException("updater["+jobId + "]");
4284 21 Dec 16 nicklas 411     if (!isRunning) return;
4284 21 Dec 16 nicklas 412     if (logger.isDebugEnabled()) logger.debug("Request status update: " + jobId);
4284 21 Dec 16 nicklas 413     synchronized (nonGridJobsToUpdate)
4284 21 Dec 16 nicklas 414     {
4284 21 Dec 16 nicklas 415       nonGridJobsToUpdate.put(jobId, updater);
4284 21 Dec 16 nicklas 416     }
4284 21 Dec 16 nicklas 417   }
4284 21 Dec 16 nicklas 418   
4284 21 Dec 16 nicklas 419   /**
4226 10 Nov 16 nicklas 420     Process async requests since the last time. ABORT requests are
4226 10 Nov 16 nicklas 421     always processed. STATUS requests are only processed if it was 
4226 10 Nov 16 nicklas 422     more the MIN_WAIT_INTERVAL milliseconds since that last update.
4226 10 Nov 16 nicklas 423    */
4205 03 Nov 16 nicklas 424   void processAsyncRequests()
4205 03 Nov 16 nicklas 425   {
4226 10 Nov 16 nicklas 426     long now = System.currentTimeMillis();
4226 10 Nov 16 nicklas 427     long timeSinceLastUpdate = now - lastJobStatusUpdate;
7075 27 Mar 23 nicklas 428     long minWaitInterval = logger.isDebugEnabled() ? MIN_WAIT_INTERVAL_DEBUG : MIN_WAIT_INTERVAL_NORMAL;
7075 27 Mar 23 nicklas 429     boolean doStatusUpdate = timeSinceLastUpdate > minWaitInterval;
4284 21 Dec 16 nicklas 430
7075 27 Mar 23 nicklas 431     logger.debug("Processing async requests");
7075 27 Mar 23 nicklas 432     if (logger.isTraceEnabled())
4205 03 Nov 16 nicklas 433     {
7075 27 Mar 23 nicklas 434       logger.trace("jobsToAbort=" + jobsToAbort);
7075 27 Mar 23 nicklas 435       logger.trace("jobsToUpdate=" + jobsToUpdate);
7075 27 Mar 23 nicklas 436       logger.trace("nonGridJobsToUpdate=" + nonGridJobsToUpdate);
7075 27 Mar 23 nicklas 437       logger.trace("unknownJobs=" + unknownJobs);
7075 27 Mar 23 nicklas 438       logger.trace("timeSinceLastUpdate=" + timeSinceLastUpdate);
7075 27 Mar 23 nicklas 439       logger.trace("doStatusUpdate=" + doStatusUpdate);
4205 03 Nov 16 nicklas 440     }
4203 02 Nov 16 nicklas 441     
4205 03 Nov 16 nicklas 442     // Create at most one session to each cluster
4205 03 Nov 16 nicklas 443     Map<String, OpenGridSession> sessions = new HashMap<>();
4205 03 Nov 16 nicklas 444     
4205 03 Nov 16 nicklas 445     // Make copy of current async requests and clear the queues
4226 10 Nov 16 nicklas 446     List<JobIdentifier> tmpAbort = null;
4226 10 Nov 16 nicklas 447     List<JobIdentifier> tmpUpdate = null;
4284 21 Dec 16 nicklas 448     Map<JobIdentifier, JobStatusUpdater> tmpNonGridUpdate = null;
4222 09 Nov 16 nicklas 449     List<JobIdentifier> tmpUnknown = new ArrayList<>();
4205 03 Nov 16 nicklas 450     synchronized (jobsToAbort)
4205 03 Nov 16 nicklas 451     {
4205 03 Nov 16 nicklas 452       tmpAbort = new ArrayList<>(jobsToAbort);
4205 03 Nov 16 nicklas 453       jobsToAbort.clear();
4205 03 Nov 16 nicklas 454     }
4226 10 Nov 16 nicklas 455     
6830 02 Sep 22 nicklas 456     List<JobIdentifier> updatedNow = new ArrayList<>();
4226 10 Nov 16 nicklas 457     if (doStatusUpdate)
4205 03 Nov 16 nicklas 458     {
4226 10 Nov 16 nicklas 459       synchronized (jobsToUpdate) 
4226 10 Nov 16 nicklas 460       {
4226 10 Nov 16 nicklas 461         jobsToUpdate.addAll(unknownJobs); // Always re-check jobs that was 'not found' the last time
4226 10 Nov 16 nicklas 462         jobsToUpdate.removeAll(tmpAbort); // No update for aborted jobs
4226 10 Nov 16 nicklas 463         tmpUpdate = new ArrayList<>(jobsToUpdate);
4226 10 Nov 16 nicklas 464         jobsToUpdate.clear();
4226 10 Nov 16 nicklas 465       }
4284 21 Dec 16 nicklas 466       
4284 21 Dec 16 nicklas 467       synchronized (nonGridJobsToUpdate) 
4284 21 Dec 16 nicklas 468       {
4284 21 Dec 16 nicklas 469         tmpNonGridUpdate = new HashMap<>(nonGridJobsToUpdate);
4284 21 Dec 16 nicklas 470         nonGridJobsToUpdate.clear();
4284 21 Dec 16 nicklas 471       }
4205 03 Nov 16 nicklas 472     }
4205 03 Nov 16 nicklas 473     
4205 03 Nov 16 nicklas 474     try
4205 03 Nov 16 nicklas 475     {
4205 03 Nov 16 nicklas 476       
7075 27 Mar 23 nicklas 477       logger.debug("Jobs to abort: " + tmpAbort.size());
4205 03 Nov 16 nicklas 478       for (JobIdentifier jobId : tmpAbort)
4205 03 Nov 16 nicklas 479       {
4205 03 Nov 16 nicklas 480         logger.debug("Processing abort: " + jobId);
4205 03 Nov 16 nicklas 481         
4205 03 Nov 16 nicklas 482         // Get new or existing session
4205 03 Nov 16 nicklas 483         OpenGridSession session = getSession(jobId, sessions);
4205 03 Nov 16 nicklas 484         
4205 03 Nov 16 nicklas 485         // If we could not get connection to cluster, skip this and continue with next
4205 03 Nov 16 nicklas 486         if (session == null) continue;
4205 03 Nov 16 nicklas 487
4222 09 Nov 16 nicklas 488         CmdResult<String> qdel = session.qdel(jobId);
4205 03 Nov 16 nicklas 489         if (jobId.getBaseJobId() != 0)
4205 03 Nov 16 nicklas 490         {
4302 16 Jan 17 nicklas 491           if (qdel.getExitStatus() == 0)
4302 16 Jan 17 nicklas 492           {
4302 16 Jan 17 nicklas 493             setProgressOnBaseJob(jobId, 99, qdel.getStdout());
4302 16 Jan 17 nicklas 494           }
4302 16 Jan 17 nicklas 495           else
4302 16 Jan 17 nicklas 496           {
4302 16 Jan 17 nicklas 497             setErrorOnBaseJob(jobId, "Failed to abort job: " + qdel.getStderr());
4302 16 Jan 17 nicklas 498           }
4205 03 Nov 16 nicklas 499         }
4205 03 Nov 16 nicklas 500       }
4205 03 Nov 16 nicklas 501       
7075 27 Mar 23 nicklas 502       if (!doStatusUpdate)
4205 03 Nov 16 nicklas 503       {
7075 27 Mar 23 nicklas 504         logger.debug("No status update since not long enough wait time ["+(timeSinceLastUpdate/1000)+" seconds]");
7075 27 Mar 23 nicklas 505       }
7075 27 Mar 23 nicklas 506       else
7075 27 Mar 23 nicklas 507       {
7075 27 Mar 23 nicklas 508         logger.debug("Jobs for status update: " + tmpUpdate.size());
4245 21 Nov 16 nicklas 509         JobCompletionInvoker jobCompletionInvoker = null;
4245 21 Nov 16 nicklas 510         
4226 10 Nov 16 nicklas 511         for (JobIdentifier jobId : tmpUpdate)
4212 08 Nov 16 nicklas 512         {
4226 10 Nov 16 nicklas 513           // Get new or existing session
4226 10 Nov 16 nicklas 514           OpenGridSession session = getSession(jobId, sessions);
4222 09 Nov 16 nicklas 515           
4226 10 Nov 16 nicklas 516           // If we could not get connection to cluster, skip this and continue with next job
4226 10 Nov 16 nicklas 517           if (session == null) continue;
4226 10 Nov 16 nicklas 518         
4226 10 Nov 16 nicklas 519           CmdResult<JobStatus> qstat = session.qstat(jobId, true);
4226 10 Nov 16 nicklas 520           JobStatus status = qstat.getResult();
4222 09 Nov 16 nicklas 521           
4226 10 Nov 16 nicklas 522           if (status == null && qstat.getExitStatus() == 1)
4212 08 Nov 16 nicklas 523           {
4226 10 Nov 16 nicklas 524             logger.debug("No 'qstat' information for job: " + jobId);
4226 10 Nov 16 nicklas 525             // No information was found by 'qstat'
4226 10 Nov 16 nicklas 526             // Check with 'qacct' if the job has ended
4226 10 Nov 16 nicklas 527             
4226 10 Nov 16 nicklas 528             CmdResult<JobStatus> qacct = session.qacct(jobId, true);
4226 10 Nov 16 nicklas 529             status = qacct.getResult();
4226 10 Nov 16 nicklas 530             
4226 10 Nov 16 nicklas 531             if (status == null && qacct.getExitStatus() == 1)
4212 08 Nov 16 nicklas 532             {
4226 10 Nov 16 nicklas 533               logger.debug("No 'qacct' information for job: " + jobId);
4226 10 Nov 16 nicklas 534               // Allow one 'not found' since there is a delay
4226 10 Nov 16 nicklas 535               // between a job disappearing from 'qstat' and appearing in 'qacct'
4226 10 Nov 16 nicklas 536               if (unknownJobs.contains(jobId))
4226 10 Nov 16 nicklas 537               {
4226 10 Nov 16 nicklas 538                 logger.debug("Reporting not found: " + jobId);
4226 10 Nov 16 nicklas 539                 // This is the second time the job was not found, report it as an error
4226 10 Nov 16 nicklas 540                 setErrorOnBaseJob(jobId, qacct.getStderr());
4226 10 Nov 16 nicklas 541               }
4226 10 Nov 16 nicklas 542               else
4226 10 Nov 16 nicklas 543               {
4226 10 Nov 16 nicklas 544                 logger.debug("Allowing one more status update for job: " + jobId);
4226 10 Nov 16 nicklas 545                 tmpUnknown.add(jobId);
4226 10 Nov 16 nicklas 546               }
4212 08 Nov 16 nicklas 547             }
4212 08 Nov 16 nicklas 548           }
4226 10 Nov 16 nicklas 549           
4226 10 Nov 16 nicklas 550           if (status != null)
4222 09 Nov 16 nicklas 551           {
6830 02 Sep 22 nicklas 552             updatedNow.add(jobId);
4226 10 Nov 16 nicklas 553             // We know something about this job...
4226 10 Nov 16 nicklas 554             // If it is WAITING do nothing
4226 10 Nov 16 nicklas 555             // If it is EXECUTING get the progress file
4229 10 Nov 16 nicklas 556             Job.Status jobStatus = status.getStatus();
4229 10 Nov 16 nicklas 557             if (jobStatus == Job.Status.EXECUTING)
4222 09 Nov 16 nicklas 558             {
4229 10 Nov 16 nicklas 559               session.readProgress(status);
4229 10 Nov 16 nicklas 560             }
4229 10 Nov 16 nicklas 561             else if (jobStatus == Job.Status.ERROR)
4229 10 Nov 16 nicklas 562             {
4229 10 Nov 16 nicklas 563               if (status.getMessage() == null)
4226 10 Nov 16 nicklas 564               {
4229 10 Nov 16 nicklas 565                 session.readStderr(status);
4226 10 Nov 16 nicklas 566               }
4222 09 Nov 16 nicklas 567             }
4226 10 Nov 16 nicklas 568             
4226 10 Nov 16 nicklas 569             if (jobId.getBaseJobId() != 0)
4226 10 Nov 16 nicklas 570             {
4245 21 Nov 16 nicklas 571               if (jobCompletionInvoker == null)
4245 21 Nov 16 nicklas 572               {
4245 21 Nov 16 nicklas 573                 jobCompletionInvoker = new JobCompletionInvoker(rootSc);
4245 21 Nov 16 nicklas 574               }
4245 21 Nov 16 nicklas 575               updateJobStatusInBase(status, session, jobCompletionInvoker);
4226 10 Nov 16 nicklas 576             }
4222 09 Nov 16 nicklas 577           }
4222 09 Nov 16 nicklas 578         }
4222 09 Nov 16 nicklas 579         
4226 10 Nov 16 nicklas 580         unknownJobs.clear();
4226 10 Nov 16 nicklas 581         unknownJobs.addAll(tmpUnknown);
4226 10 Nov 16 nicklas 582         
4284 21 Dec 16 nicklas 583         for (Map.Entry<JobIdentifier, JobStatusUpdater> entry : tmpNonGridUpdate.entrySet())
4284 21 Dec 16 nicklas 584         {
4284 21 Dec 16 nicklas 585           JobIdentifier jobId = entry.getKey();
4284 21 Dec 16 nicklas 586           
4309 17 Jan 17 nicklas 587           // Get new or existing session          
4309 17 Jan 17 nicklas 588           OpenGridSession session = null;
4309 17 Jan 17 nicklas 589           if (isDefined(jobId.getClusterId()))
4309 17 Jan 17 nicklas 590           {
4309 17 Jan 17 nicklas 591             session = getSession(jobId, sessions);
4309 17 Jan 17 nicklas 592             // If we could not get connection to cluster, skip this and continue with next job
4309 17 Jan 17 nicklas 593             if (session == null) continue;
4309 17 Jan 17 nicklas 594           }
4284 21 Dec 16 nicklas 595
4284 21 Dec 16 nicklas 596           JobStatusUpdater updater = entry.getValue();
4284 21 Dec 16 nicklas 597           JobStatus status = updater.getJobStatus(session, jobId);
4284 21 Dec 16 nicklas 598
4284 21 Dec 16 nicklas 599           if (jobId.getBaseJobId() != 0)
4284 21 Dec 16 nicklas 600           {
4284 21 Dec 16 nicklas 601             if (jobCompletionInvoker == null)
4284 21 Dec 16 nicklas 602             {
4284 21 Dec 16 nicklas 603               jobCompletionInvoker = new JobCompletionInvoker(rootSc);
4284 21 Dec 16 nicklas 604             }
4284 21 Dec 16 nicklas 605             updateJobStatusInBase(status, session, jobCompletionInvoker);
6830 02 Sep 22 nicklas 606             updatedNow.add(jobId);
4284 21 Dec 16 nicklas 607           }
4284 21 Dec 16 nicklas 608         }
4284 21 Dec 16 nicklas 609         
6830 02 Sep 22 nicklas 610         // Update requests may have been registered during our processing here
6830 02 Sep 22 nicklas 611         // so we simply remove those requests
6830 02 Sep 22 nicklas 612         synchronized (jobsToUpdate) 
6830 02 Sep 22 nicklas 613         {
6830 02 Sep 22 nicklas 614           jobsToUpdate.removeAll(updatedNow);
6830 02 Sep 22 nicklas 615         }
6830 02 Sep 22 nicklas 616         synchronized (nonGridJobsToUpdate) 
6830 02 Sep 22 nicklas 617         {
6830 02 Sep 22 nicklas 618           nonGridJobsToUpdate.keySet().removeAll(updatedNow);
6830 02 Sep 22 nicklas 619         }
6830 02 Sep 22 nicklas 620         lastJobStatusUpdate = now;
4205 03 Nov 16 nicklas 621       }
4205 03 Nov 16 nicklas 622     }
4205 03 Nov 16 nicklas 623     finally
4205 03 Nov 16 nicklas 624     {
4205 03 Nov 16 nicklas 625       for (OpenGridSession session : sessions.values())
4205 03 Nov 16 nicklas 626       {
6071 20 Nov 20 nicklas 627         if (session != null)
6071 20 Nov 20 nicklas 628         {
6071 20 Nov 20 nicklas 629           logger.debug("Closing connection: "+ session.getHost());
6071 20 Nov 20 nicklas 630           FileUtil.close(session);
6071 20 Nov 20 nicklas 631         }
4205 03 Nov 16 nicklas 632       }
4205 03 Nov 16 nicklas 633     }
4203 02 Nov 16 nicklas 634   }
4203 02 Nov 16 nicklas 635   
7380 18 Oct 23 nicklas 636   
4203 02 Nov 16 nicklas 637   /**
7380 18 Oct 23 nicklas 638     
7380 18 Oct 23 nicklas 639    */
7380 18 Oct 23 nicklas 640   void processCleanupTasks()
7380 18 Oct 23 nicklas 641   {
7380 18 Oct 23 nicklas 642     long now = System.currentTimeMillis();
7380 18 Oct 23 nicklas 643     long timeSinceLastCleanup = now - lastCleanup;
7380 18 Oct 23 nicklas 644     long minWaitInterval = logger.isDebugEnabled() ? MIN_CLEANUP_INTERVAL_DEBUG : MIN_CLEANUP_INTERVAL_NORMAL;
7380 18 Oct 23 nicklas 645     boolean doCleanup = timeSinceLastCleanup > minWaitInterval;
7380 18 Oct 23 nicklas 646   
7380 18 Oct 23 nicklas 647     logger.debug("Processing cleanup");
7380 18 Oct 23 nicklas 648     if (logger.isTraceEnabled())
7380 18 Oct 23 nicklas 649     {
7380 18 Oct 23 nicklas 650       logger.trace("timeSinceLastCleanup=" + timeSinceLastCleanup);
7380 18 Oct 23 nicklas 651       logger.trace("doCleanup=" + doCleanup);
7380 18 Oct 23 nicklas 652     }
7380 18 Oct 23 nicklas 653     
7380 18 Oct 23 nicklas 654     if (!doCleanup)
7380 18 Oct 23 nicklas 655     {
7380 18 Oct 23 nicklas 656       logger.debug("No cleanup since not long enough wait time ["+(timeSinceLastCleanup/1000)+" seconds]");
7380 18 Oct 23 nicklas 657       return;
7380 18 Oct 23 nicklas 658     }
7380 18 Oct 23 nicklas 659
7380 18 Oct 23 nicklas 660     for (OpenGridCluster cluster : clusters.values())
7380 18 Oct 23 nicklas 661     {
7380 18 Oct 23 nicklas 662       // Number of days; 0 or missing=no automatic removal
7380 18 Oct 23 nicklas 663       ClusterConfig cfg = cluster.getConfig();
7380 18 Oct 23 nicklas 664       int autoRemove = Values.getInt(cfg.getCustomOption("auto-remove-job-folders"), 0);
7380 18 Oct 23 nicklas 665       if (autoRemove > 0)
7380 18 Oct 23 nicklas 666       {
7380 18 Oct 23 nicklas 667         OpenGridSession session = null;
7380 18 Oct 23 nicklas 668         try
7380 18 Oct 23 nicklas 669         {
7380 18 Oct 23 nicklas 670           logger.debug("Connecting to cluster: " + cluster);
7380 18 Oct 23 nicklas 671           session = cluster.connect(5);
7380 18 Oct 23 nicklas 672           CmdResult<String> findAndDelete = session.executeCmd("find "+cfg.getJobFolder()+" -mindepth 1 -maxdepth 1 -type d -mtime +"+autoRemove+" -print -exec rm -rf {} \\;", 30);
7380 18 Oct 23 nicklas 673           if (findAndDelete.getExitStatus() == 0)
7380 18 Oct 23 nicklas 674           {
7380 18 Oct 23 nicklas 675             String[] deleted = findAndDelete.getStdout().split("\n");
7380 18 Oct 23 nicklas 676             int numDeleted = 0;
7380 18 Oct 23 nicklas 677             for (String folder : deleted)
7380 18 Oct 23 nicklas 678             {
7380 18 Oct 23 nicklas 679               if (folder.length() > 0) 
7380 18 Oct 23 nicklas 680               {
7380 18 Oct 23 nicklas 681                 numDeleted++;
7380 18 Oct 23 nicklas 682                 logger.trace("Removed folder: "+folder);
7380 18 Oct 23 nicklas 683               }
7380 18 Oct 23 nicklas 684             }
7380 18 Oct 23 nicklas 685             logger.info("Removed "+numDeleted+" job folders on cluster: "+cluster);
7380 18 Oct 23 nicklas 686           }
7380 18 Oct 23 nicklas 687         }
7380 18 Oct 23 nicklas 688         catch (RuntimeException ex)
7380 18 Oct 23 nicklas 689         {
7380 18 Oct 23 nicklas 690           logger.warn("Could not connect to cluster: " + cluster.getId(), ex);
7380 18 Oct 23 nicklas 691         }
7380 18 Oct 23 nicklas 692         finally
7380 18 Oct 23 nicklas 693         {
7380 18 Oct 23 nicklas 694           FileUtil.close(session);
7380 18 Oct 23 nicklas 695         }
7380 18 Oct 23 nicklas 696       }
7380 18 Oct 23 nicklas 697       else
7380 18 Oct 23 nicklas 698       {
7380 18 Oct 23 nicklas 699         logger.debug("No automatic cleanup on cluster: "+cluster);
7380 18 Oct 23 nicklas 700       }
7380 18 Oct 23 nicklas 701     }
7380 18 Oct 23 nicklas 702   }
7380 18 Oct 23 nicklas 703
7380 18 Oct 23 nicklas 704   
7380 18 Oct 23 nicklas 705   /**
4205 03 Nov 16 nicklas 706     Get an existing session from the cache or create a new connection if
4205 03 Nov 16 nicklas 707     it doesn't exists. If no connection could be created null is returned.
4205 03 Nov 16 nicklas 708   */
4205 03 Nov 16 nicklas 709   private OpenGridSession getSession(JobIdentifier jobId, Map<String, OpenGridSession> sessions)
4205 03 Nov 16 nicklas 710   {
4205 03 Nov 16 nicklas 711     OpenGridSession session = null;
4205 03 Nov 16 nicklas 712     if (sessions.containsKey(jobId.getClusterId()))
4205 03 Nov 16 nicklas 713     {
4205 03 Nov 16 nicklas 714       session = sessions.get(jobId.getClusterId());
4205 03 Nov 16 nicklas 715     }
4205 03 Nov 16 nicklas 716     else
4205 03 Nov 16 nicklas 717     {
4205 03 Nov 16 nicklas 718       session = tryConnect(jobId);
4205 03 Nov 16 nicklas 719       sessions.put(jobId.getClusterId(), session);
4205 03 Nov 16 nicklas 720     }
4205 03 Nov 16 nicklas 721     return session;
4205 03 Nov 16 nicklas 722   }
4205 03 Nov 16 nicklas 723   
4205 03 Nov 16 nicklas 724   /**
4205 03 Nov 16 nicklas 725     Try to connect to the cluster referenced by the job identifier.
4205 03 Nov 16 nicklas 726     Exceptions are handled inside the method which only return null
4205 03 Nov 16 nicklas 727     if no connection could be established.
4205 03 Nov 16 nicklas 728   */
4205 03 Nov 16 nicklas 729   private OpenGridSession tryConnect(JobIdentifier jobId)
4205 03 Nov 16 nicklas 730   {
4205 03 Nov 16 nicklas 731     OpenGridSession session = null;
4205 03 Nov 16 nicklas 732     logger.debug("Connecting to cluster: " + jobId.getClusterId());
4205 03 Nov 16 nicklas 733     OpenGridCluster cluster = clusters.get(jobId.getClusterId());
4205 03 Nov 16 nicklas 734     if (cluster == null)
4205 03 Nov 16 nicklas 735     {
4205 03 Nov 16 nicklas 736       logger.warn("Unknown cluster specified by job: " + jobId);
4205 03 Nov 16 nicklas 737     }
4205 03 Nov 16 nicklas 738     else
4205 03 Nov 16 nicklas 739     {
4205 03 Nov 16 nicklas 740       try
4205 03 Nov 16 nicklas 741       {
4205 03 Nov 16 nicklas 742         session = cluster.connect(5);
4205 03 Nov 16 nicklas 743       }
4205 03 Nov 16 nicklas 744       catch (RuntimeException ex)
4205 03 Nov 16 nicklas 745       {
4205 03 Nov 16 nicklas 746         logger.warn("Could not connect to cluster: " + jobId, ex);
4205 03 Nov 16 nicklas 747       }
4205 03 Nov 16 nicklas 748     }
4205 03 Nov 16 nicklas 749     return session;
4205 03 Nov 16 nicklas 750   }
4205 03 Nov 16 nicklas 751   
4205 03 Nov 16 nicklas 752   /**
4205 03 Nov 16 nicklas 753     Set ERROR status on a BASE job. This is done in a 
4205 03 Nov 16 nicklas 754     separate transaction. Exceptions are logged but not
4205 03 Nov 16 nicklas 755     re-thrown out of this method.
4205 03 Nov 16 nicklas 756     @return TRUE if the operation succeeded, FALSE if not
4205 03 Nov 16 nicklas 757   */
4205 03 Nov 16 nicklas 758   private boolean setErrorOnBaseJob(JobIdentifier jobId, String msg)
4205 03 Nov 16 nicklas 759   {
4205 03 Nov 16 nicklas 760     DbControl dc = null;
4205 03 Nov 16 nicklas 761     boolean ok = false;
4205 03 Nov 16 nicklas 762     try
4205 03 Nov 16 nicklas 763     {
4205 03 Nov 16 nicklas 764       dc = rootSc.newDbControl();
4205 03 Nov 16 nicklas 765       Job baseJob = Job.getById(dc, jobId.getBaseJobId());
4205 03 Nov 16 nicklas 766       baseJob.doneError(msg);
4205 03 Nov 16 nicklas 767       dc.commit();
4205 03 Nov 16 nicklas 768       ok = true;
4205 03 Nov 16 nicklas 769     }
4205 03 Nov 16 nicklas 770     catch (RuntimeException ex)
4205 03 Nov 16 nicklas 771     {
4205 03 Nov 16 nicklas 772       logger.warn("Could not set ERROR status on job: " + jobId, ex);
4205 03 Nov 16 nicklas 773     }
4205 03 Nov 16 nicklas 774     finally
4205 03 Nov 16 nicklas 775     {
4205 03 Nov 16 nicklas 776       if (dc != null) dc.close();
4205 03 Nov 16 nicklas 777     }
4205 03 Nov 16 nicklas 778     return ok;
4205 03 Nov 16 nicklas 779   }
4205 03 Nov 16 nicklas 780   
4302 16 Jan 17 nicklas 781   /**
4302 16 Jan 17 nicklas 782     Update the progress status on a BASE job. This is done in a 
4302 16 Jan 17 nicklas 783     separate transaction. Exceptions are logged but not
4302 16 Jan 17 nicklas 784     re-thrown out of this method.
4302 16 Jan 17 nicklas 785     @return TRUE if the operation succeeded, FALSE if not
4302 16 Jan 17 nicklas 786   */
4302 16 Jan 17 nicklas 787   private boolean setProgressOnBaseJob(JobIdentifier jobId, int progress, String msg)
4302 16 Jan 17 nicklas 788   {
4302 16 Jan 17 nicklas 789     DbControl dc = null;
4302 16 Jan 17 nicklas 790     boolean ok = false;
4302 16 Jan 17 nicklas 791     try
4302 16 Jan 17 nicklas 792     {
4302 16 Jan 17 nicklas 793       dc = rootSc.newDbControl();
4302 16 Jan 17 nicklas 794       Job baseJob = Job.getById(dc, jobId.getBaseJobId());
4302 16 Jan 17 nicklas 795       baseJob.setProgress(progress, msg);
4302 16 Jan 17 nicklas 796       dc.commit();
4302 16 Jan 17 nicklas 797       ok = true;
4302 16 Jan 17 nicklas 798     }
4302 16 Jan 17 nicklas 799     catch (RuntimeException ex)
4302 16 Jan 17 nicklas 800     {
4302 16 Jan 17 nicklas 801       logger.warn("Could not set progress on job: " + jobId, ex);
4302 16 Jan 17 nicklas 802     }
4302 16 Jan 17 nicklas 803     finally
4302 16 Jan 17 nicklas 804     {
4302 16 Jan 17 nicklas 805       if (dc != null) dc.close();
4302 16 Jan 17 nicklas 806     }
4302 16 Jan 17 nicklas 807     return ok;
4302 16 Jan 17 nicklas 808   }
4302 16 Jan 17 nicklas 809
4222 09 Nov 16 nicklas 810   
4245 21 Nov 16 nicklas 811   private boolean updateJobStatusInBase(JobStatus jobStatus, OpenGridSession session, JobCompletionInvoker jobCompletionInvoker)
4222 09 Nov 16 nicklas 812   {
4222 09 Nov 16 nicklas 813     JobIdentifier jobId = jobStatus.getJobIdentifier();
4222 09 Nov 16 nicklas 814     DbControl dc = null;
4222 09 Nov 16 nicklas 815     boolean ok = false;
4222 09 Nov 16 nicklas 816     try
4222 09 Nov 16 nicklas 817     {
4222 09 Nov 16 nicklas 818       dc = rootSc.newDbControl();
4222 09 Nov 16 nicklas 819       
4222 09 Nov 16 nicklas 820       Job baseJob = Job.getById(dc, jobId.getBaseJobId());
4222 09 Nov 16 nicklas 821       Job.Status currentStatus = baseJob.getStatus();
4222 09 Nov 16 nicklas 822       Job.Status newStatus = jobStatus.getStatus();
4222 09 Nov 16 nicklas 823     
4222 09 Nov 16 nicklas 824       if (logger.isDebugEnabled())
4222 09 Nov 16 nicklas 825       {
4222 09 Nov 16 nicklas 826         logger.debug("updateJobStatusInBase: " + jobId + "; current=" + currentStatus + "; new="+newStatus);
4222 09 Nov 16 nicklas 827       }
4222 09 Nov 16 nicklas 828       
4302 16 Jan 17 nicklas 829       if (currentStatus == Job.Status.DONE || currentStatus == Job.Status.ERROR)
4222 09 Nov 16 nicklas 830       {
4222 09 Nov 16 nicklas 831         // Skip update since the current status seems to have changed
4222 09 Nov 16 nicklas 832         // since the request for a status update was made
4222 09 Nov 16 nicklas 833         // (eg. due to manual changes)
4222 09 Nov 16 nicklas 834         return false;
4222 09 Nov 16 nicklas 835       }
4222 09 Nov 16 nicklas 836       
4302 16 Jan 17 nicklas 837       // Basically, we now know that the current status is either WAITING, EXECUTING or ABORTING
4222 09 Nov 16 nicklas 838       if (newStatus == Job.Status.WAITING) 
4222 09 Nov 16 nicklas 839       {
4222 09 Nov 16 nicklas 840         // Still waiting so there is nothing to do
4222 09 Nov 16 nicklas 841         return false;
4222 09 Nov 16 nicklas 842       }
4222 09 Nov 16 nicklas 843       
4263 14 Dec 16 nicklas 844       // If we get here we know for sure that the job is executing or has ended
4222 09 Nov 16 nicklas 845       String serverName = jobId.getClusterId();
4263 14 Dec 16 nicklas 846       String nodeName = jobStatus.getNodeName();
4222 09 Nov 16 nicklas 847       
4222 09 Nov 16 nicklas 848       if (currentStatus == Job.Status.WAITING)
4222 09 Nov 16 nicklas 849       {
4263 14 Dec 16 nicklas 850         baseJob.start(null, serverName, nodeName, null, jobStatus.getStartDate());
4222 09 Nov 16 nicklas 851       }
4222 09 Nov 16 nicklas 852       
4222 09 Nov 16 nicklas 853       if (newStatus == Job.Status.EXECUTING)
4222 09 Nov 16 nicklas 854       {
4222 09 Nov 16 nicklas 855         baseJob.setProgress(jobStatus.getProgress(), jobStatus.getMessage());
4222 09 Nov 16 nicklas 856       }
4222 09 Nov 16 nicklas 857       else if (newStatus == Job.Status.DONE)
4222 09 Nov 16 nicklas 858       {
4245 21 Nov 16 nicklas 859         try
4245 21 Nov 16 nicklas 860         {
4245 21 Nov 16 nicklas 861           String msg = jobCompletionInvoker.call(session, baseJob, jobStatus);
4245 21 Nov 16 nicklas 862           if (msg == null) msg = "Job completed";
4245 21 Nov 16 nicklas 863           baseJob.doneOk(msg, jobStatus.getEndDate());
4245 21 Nov 16 nicklas 864         }
4245 21 Nov 16 nicklas 865         catch (Exception ex)
4245 21 Nov 16 nicklas 866         {
4245 21 Nov 16 nicklas 867           baseJob.doneError(StringUtil.trimString(ex.getMessage(), 200), 
4245 21 Nov 16 nicklas 868               Collections.singleton(ex), jobStatus.getEndDate());
4245 21 Nov 16 nicklas 869         }
4222 09 Nov 16 nicklas 870       }
4222 09 Nov 16 nicklas 871       else if (newStatus == Job.Status.ERROR)
4222 09 Nov 16 nicklas 872       {
4229 10 Nov 16 nicklas 873         Exception ex = null;
4245 21 Nov 16 nicklas 874         String msg = null;
4245 21 Nov 16 nicklas 875         try
4229 10 Nov 16 nicklas 876         {
4245 21 Nov 16 nicklas 877           msg = jobCompletionInvoker.call(session, baseJob, jobStatus);
4245 21 Nov 16 nicklas 878           if (msg != null)
4245 21 Nov 16 nicklas 879           {
4245 21 Nov 16 nicklas 880             ex = new RuntimeException(StringUtil.trimString(msg, Job.MAX_STACK_TRACE_LENGTH));
4245 21 Nov 16 nicklas 881           }
4245 21 Nov 16 nicklas 882           else
4245 21 Nov 16 nicklas 883           {
4302 16 Jan 17 nicklas 884             msg = jobStatus.getExitCode() == 137 ? "Aborted by user." : "Job failed for an unknown reason.";
4245 21 Nov 16 nicklas 885           }
4229 10 Nov 16 nicklas 886         }
4245 21 Nov 16 nicklas 887         catch (Exception e)
4245 21 Nov 16 nicklas 888         {
4245 21 Nov 16 nicklas 889           ex = e;
4245 21 Nov 16 nicklas 890           msg = ex.getMessage();
4245 21 Nov 16 nicklas 891         }
4245 21 Nov 16 nicklas 892
4245 21 Nov 16 nicklas 893         baseJob.doneError(StringUtil.trimString("[" + jobStatus.getExitCode() + "] " + msg, 200), 
4245 21 Nov 16 nicklas 894             ex == null ? null : Collections.singleton(ex), jobStatus.getEndDate());
4222 09 Nov 16 nicklas 895       }
4222 09 Nov 16 nicklas 896       
4222 09 Nov 16 nicklas 897       dc.commit();
4222 09 Nov 16 nicklas 898       ok = true;
4222 09 Nov 16 nicklas 899     }
4222 09 Nov 16 nicklas 900     catch (RuntimeException ex)
4222 09 Nov 16 nicklas 901     {
4222 09 Nov 16 nicklas 902       logger.warn("Could not update status on job: " + jobId, ex);
4222 09 Nov 16 nicklas 903     }
4222 09 Nov 16 nicklas 904     finally
4222 09 Nov 16 nicklas 905     {
4222 09 Nov 16 nicklas 906       if (dc != null) dc.close();
4222 09 Nov 16 nicklas 907     }
4222 09 Nov 16 nicklas 908     return ok;
4222 09 Nov 16 nicklas 909   }
4222 09 Nov 16 nicklas 910   
4205 03 Nov 16 nicklas 911   /**
4203 02 Nov 16 nicklas 912     Task for checking current status of jobs that has been
4203 02 Nov 16 nicklas 913     sent to the cluster.
4203 02 Nov 16 nicklas 914   */
4203 02 Nov 16 nicklas 915   static class JobStatusTimerTask
4203 02 Nov 16 nicklas 916     extends TimerTask
4203 02 Nov 16 nicklas 917   {
4203 02 Nov 16 nicklas 918     JobStatusTimerTask() 
4203 02 Nov 16 nicklas 919     {}
4203 02 Nov 16 nicklas 920     
4203 02 Nov 16 nicklas 921     @Override
4203 02 Nov 16 nicklas 922     public void run() 
4203 02 Nov 16 nicklas 923     {
7380 18 Oct 23 nicklas 924       logger.trace("Job status timer running");
4205 03 Nov 16 nicklas 925       getInstance().processAsyncRequests();
4203 02 Nov 16 nicklas 926     }
4203 02 Nov 16 nicklas 927   
4203 02 Nov 16 nicklas 928   }
4245 21 Nov 16 nicklas 929   
7380 18 Oct 23 nicklas 930   /**
7380 18 Oct 23 nicklas 931     Task for checking current status of jobs that has been
7380 18 Oct 23 nicklas 932     sent to the cluster.
7380 18 Oct 23 nicklas 933   */
7380 18 Oct 23 nicklas 934   static class CleanupTimerTask
7380 18 Oct 23 nicklas 935     extends TimerTask
7380 18 Oct 23 nicklas 936   {
7380 18 Oct 23 nicklas 937     CleanupTimerTask() 
7380 18 Oct 23 nicklas 938     {}
7380 18 Oct 23 nicklas 939     
7380 18 Oct 23 nicklas 940     @Override
7380 18 Oct 23 nicklas 941     public void run() 
7380 18 Oct 23 nicklas 942     {
7380 18 Oct 23 nicklas 943       logger.trace("Cleanup timer running");
7380 18 Oct 23 nicklas 944       getInstance().processCleanupTasks();
7380 18 Oct 23 nicklas 945     }  
7380 18 Oct 23 nicklas 946   }
7380 18 Oct 23 nicklas 947   
4245 21 Nov 16 nicklas 948   static class JobCompletionInvoker
4245 21 Nov 16 nicklas 949   {
4245 21 Nov 16 nicklas 950     
4245 21 Nov 16 nicklas 951     private final SessionControl rootSc;
4245 21 Nov 16 nicklas 952     private final ClientContext context;
4245 21 Nov 16 nicklas 953     private final ExtensionsInvoker<JobCompletionHandler> invoker;
4245 21 Nov 16 nicklas 954     
4245 21 Nov 16 nicklas 955     JobCompletionInvoker(SessionControl rootSc) 
4245 21 Nov 16 nicklas 956     {
4245 21 Nov 16 nicklas 957       this.rootSc = rootSc;
4245 21 Nov 16 nicklas 958       this.context = new ClientContext(rootSc);
4245 21 Nov 16 nicklas 959       
4245 21 Nov 16 nicklas 960       Registry registry = Application.getExtensionsManager().getRegistry();
4245 21 Nov 16 nicklas 961       Settings settings = Application.getExtensionsManager().getSettings();
5505 17 Jun 19 nicklas 962       this.invoker = registry.useExtensions(
4245 21 Nov 16 nicklas 963           context, settings, new String[] { "net.sf.basedb.opengrid.job-complete" });
4245 21 Nov 16 nicklas 964     }
4245 21 Nov 16 nicklas 965     
4245 21 Nov 16 nicklas 966     /**
4245 21 Nov 16 nicklas 967       Call the extensions for the given job.
4245 21 Nov 16 nicklas 968       @return The messages from the extensions (separated with newline). If
4245 21 Nov 16 nicklas 969         no extension returns a message, the message from the job status is
4245 21 Nov 16 nicklas 970         used.
4245 21 Nov 16 nicklas 971     */
4245 21 Nov 16 nicklas 972     String call(OpenGridSession session, Job baseJob, JobStatus jobStatus)
4245 21 Nov 16 nicklas 973     {
4245 21 Nov 16 nicklas 974       context.setCurrentItem(baseJob);
4245 21 Nov 16 nicklas 975       context.setAttribute("job-status", jobStatus);
4245 21 Nov 16 nicklas 976       
4245 21 Nov 16 nicklas 977       SessionControl impersonated = null;
4245 21 Nov 16 nicklas 978       SessionControl forJob = null;
4245 21 Nov 16 nicklas 979       List<String> messages = new ArrayList<>();
4245 21 Nov 16 nicklas 980       try
4245 21 Nov 16 nicklas 981       {
4245 21 Nov 16 nicklas 982         // A single try-catch since we want processing to be aborted
4245 21 Nov 16 nicklas 983         // as soon as a handler throws an exception
4245 21 Nov 16 nicklas 984         ActionIterator<JobCompletionHandler> it = invoker.iterate();
4245 21 Nov 16 nicklas 985         while (it.hasNext())
4245 21 Nov 16 nicklas 986         {
4245 21 Nov 16 nicklas 987           JobCompletionHandler handler = it.next();
4245 21 Nov 16 nicklas 988           if (handler == null) continue;
4245 21 Nov 16 nicklas 989           
4245 21 Nov 16 nicklas 990           if (impersonated == null)
4245 21 Nov 16 nicklas 991           {
4245 21 Nov 16 nicklas 992             // Create an impersonated session control
4245 21 Nov 16 nicklas 993             impersonated = rootSc.impersonateLogin(baseJob, "Finalizing job: " + baseJob.getName());
4245 21 Nov 16 nicklas 994             if (baseJob.getActiveProjectId() > 0)
4245 21 Nov 16 nicklas 995             {
4245 21 Nov 16 nicklas 996               impersonated.setActiveProject(Project.getById(baseJob.getDbControl(), baseJob.getActiveProjectId()));
4245 21 Nov 16 nicklas 997             }
4245 21 Nov 16 nicklas 998             // Create a delegate job session control for proper change history logging
4245 21 Nov 16 nicklas 999             forJob = impersonated.getJobSessionControl(baseJob);
4245 21 Nov 16 nicklas 1000           }
4245 21 Nov 16 nicklas 1001           
4245 21 Nov 16 nicklas 1002           messages.add(handler.jobCompleted(forJob, session, baseJob, jobStatus));
4245 21 Nov 16 nicklas 1003         }
4245 21 Nov 16 nicklas 1004       }
4245 21 Nov 16 nicklas 1005       finally
4245 21 Nov 16 nicklas 1006       {
4245 21 Nov 16 nicklas 1007         if (forJob != null) forJob.close();
4245 21 Nov 16 nicklas 1008         if (impersonated != null) impersonated.close();
4245 21 Nov 16 nicklas 1009       }
4245 21 Nov 16 nicklas 1010       
4245 21 Nov 16 nicklas 1011       String msg = Values.getString(messages, "\n", true);
4302 16 Jan 17 nicklas 1012       if (msg.length() == 0) msg = Values.getStringOrNull(jobStatus.getMessage());
4245 21 Nov 16 nicklas 1013       return msg;
4245 21 Nov 16 nicklas 1014     }
4203 02 Nov 16 nicklas 1015
4245 21 Nov 16 nicklas 1016   }
4245 21 Nov 16 nicklas 1017
4203 02 Nov 16 nicklas 1018 }