4203 |
02 Nov 16 |
nicklas |
1 |
package net.sf.basedb.opengrid.service; |
4203 |
02 Nov 16 |
nicklas |
2 |
|
4205 |
03 Nov 16 |
nicklas |
3 |
import java.util.ArrayList; |
4254 |
25 Nov 16 |
nicklas |
4 |
import java.util.Collection; |
4229 |
10 Nov 16 |
nicklas |
5 |
import java.util.Collections; |
4203 |
02 Nov 16 |
nicklas |
6 |
import java.util.HashMap; |
4255 |
28 Nov 16 |
nicklas |
7 |
import java.util.HashSet; |
4284 |
21 Dec 16 |
nicklas |
8 |
import java.util.LinkedHashMap; |
4205 |
03 Nov 16 |
nicklas |
9 |
import java.util.LinkedHashSet; |
4205 |
03 Nov 16 |
nicklas |
10 |
import java.util.List; |
4203 |
02 Nov 16 |
nicklas |
11 |
import java.util.Map; |
4203 |
02 Nov 16 |
nicklas |
12 |
import java.util.Set; |
4203 |
02 Nov 16 |
nicklas |
13 |
import java.util.TimerTask; |
4203 |
02 Nov 16 |
nicklas |
14 |
|
4203 |
02 Nov 16 |
nicklas |
15 |
import org.slf4j.LoggerFactory; |
4203 |
02 Nov 16 |
nicklas |
16 |
|
4203 |
02 Nov 16 |
nicklas |
17 |
import net.sf.basedb.clients.web.extensions.service.ServiceControllerAction; |
4203 |
02 Nov 16 |
nicklas |
18 |
import net.sf.basedb.core.Application; |
4205 |
03 Nov 16 |
nicklas |
19 |
import net.sf.basedb.core.DbControl; |
4255 |
28 Nov 16 |
nicklas |
20 |
import net.sf.basedb.core.Include; |
4301 |
13 Jan 17 |
nicklas |
21 |
import net.sf.basedb.core.ItemNotFoundException; |
4255 |
28 Nov 16 |
nicklas |
22 |
import net.sf.basedb.core.ItemQuery; |
4205 |
03 Nov 16 |
nicklas |
23 |
import net.sf.basedb.core.Job; |
4255 |
28 Nov 16 |
nicklas |
24 |
import net.sf.basedb.core.JobAgent; |
4255 |
28 Nov 16 |
nicklas |
25 |
import net.sf.basedb.core.Permission; |
4255 |
28 Nov 16 |
nicklas |
26 |
import net.sf.basedb.core.PermissionDeniedException; |
4245 |
21 Nov 16 |
nicklas |
27 |
import net.sf.basedb.core.Project; |
4203 |
02 Nov 16 |
nicklas |
28 |
import net.sf.basedb.core.SessionControl; |
4229 |
10 Nov 16 |
nicklas |
29 |
import net.sf.basedb.core.StringUtil; |
4205 |
03 Nov 16 |
nicklas |
30 |
import net.sf.basedb.core.SystemItems; |
4205 |
03 Nov 16 |
nicklas |
31 |
import net.sf.basedb.core.User; |
4205 |
03 Nov 16 |
nicklas |
32 |
import net.sf.basedb.opengrid.CmdResult; |
4203 |
02 Nov 16 |
nicklas |
33 |
import net.sf.basedb.opengrid.JobIdentifier; |
4212 |
08 Nov 16 |
nicklas |
34 |
import net.sf.basedb.opengrid.JobStatus; |
4203 |
02 Nov 16 |
nicklas |
35 |
import net.sf.basedb.opengrid.OpenGridCluster; |
4205 |
03 Nov 16 |
nicklas |
36 |
import net.sf.basedb.opengrid.OpenGridSession; |
7380 |
18 Oct 23 |
nicklas |
37 |
import net.sf.basedb.opengrid.config.ClusterConfig; |
4254 |
25 Nov 16 |
nicklas |
38 |
import net.sf.basedb.opengrid.config.XmlConfig; |
4205 |
03 Nov 16 |
nicklas |
39 |
import net.sf.basedb.util.FileUtil; |
4245 |
21 Nov 16 |
nicklas |
40 |
import net.sf.basedb.util.Values; |
4245 |
21 Nov 16 |
nicklas |
41 |
import net.sf.basedb.util.extensions.ActionIterator; |
4245 |
21 Nov 16 |
nicklas |
42 |
import net.sf.basedb.util.extensions.ClientContext; |
4203 |
02 Nov 16 |
nicklas |
43 |
import net.sf.basedb.util.extensions.Extension; |
4245 |
21 Nov 16 |
nicklas |
44 |
import net.sf.basedb.util.extensions.ExtensionsInvoker; |
4245 |
21 Nov 16 |
nicklas |
45 |
import net.sf.basedb.util.extensions.Registry; |
7075 |
27 Mar 23 |
nicklas |
46 |
import net.sf.basedb.util.extensions.logging.ExtensionsLog; |
7075 |
27 Mar 23 |
nicklas |
47 |
import net.sf.basedb.util.extensions.logging.ExtensionsLogger; |
4245 |
21 Nov 16 |
nicklas |
48 |
import net.sf.basedb.util.extensions.manager.Settings; |
5995 |
21 Aug 20 |
nicklas |
49 |
import net.sf.basedb.util.filter.Filter; |
5995 |
21 Aug 20 |
nicklas |
50 |
import net.sf.basedb.util.filter.StaticFilter; |
4203 |
02 Nov 16 |
nicklas |
51 |
|
4203 |
02 Nov 16 |
nicklas |
52 |
/** |
4203 |
02 Nov 16 |
nicklas |
Service class for asynchronous communication with Open Grid |
4203 |
02 Nov 16 |
nicklas |
Scheduler clusters. When the service is running it will at regular |
4203 |
02 Nov 16 |
nicklas |
intervals query the registered clusters for waiting and running |
4296 |
12 Jan 17 |
nicklas |
jobs. The service will also maintain a list of available Open |
4296 |
12 Jan 17 |
nicklas |
Grid Clusters that are configured in the opengrid-config.xml file |
4296 |
12 Jan 17 |
nicklas |
(should located in the WEB-INF/classes directory). |
4203 |
02 Nov 16 |
nicklas |
59 |
|
4203 |
02 Nov 16 |
nicklas |
60 |
|
4203 |
02 Nov 16 |
nicklas |
@author nicklas |
4203 |
02 Nov 16 |
nicklas |
@since 1.0 |
4203 |
02 Nov 16 |
nicklas |
63 |
*/ |
4203 |
02 Nov 16 |
nicklas |
64 |
public class OpenGridService |
4203 |
02 Nov 16 |
nicklas |
65 |
{ |
7075 |
27 Mar 23 |
nicklas |
66 |
/** |
7075 |
27 Mar 23 |
nicklas |
The ID of the service extension. |
7075 |
27 Mar 23 |
nicklas |
@since 1.10 |
7075 |
27 Mar 23 |
nicklas |
69 |
*/ |
7075 |
27 Mar 23 |
nicklas |
70 |
public static final String ID = "net.sf.basedb.opengrid.service"; |
7075 |
27 Mar 23 |
nicklas |
71 |
|
7075 |
27 Mar 23 |
nicklas |
72 |
private static final ExtensionsLogger logger = |
7075 |
27 Mar 23 |
nicklas |
73 |
ExtensionsLog.getLogger(ID, true).wrap(LoggerFactory.getLogger(OpenGridService.class)); |
7075 |
27 Mar 23 |
nicklas |
74 |
|
4203 |
02 Nov 16 |
nicklas |
// The singleton |
4203 |
02 Nov 16 |
nicklas |
76 |
private static OpenGridService instance = null; |
4203 |
02 Nov 16 |
nicklas |
77 |
|
4203 |
02 Nov 16 |
nicklas |
78 |
/** |
4203 |
02 Nov 16 |
nicklas |
Get the singleton instance of the service. If the service has |
4203 |
02 Nov 16 |
nicklas |
not been created yet it is created at this time. |
4203 |
02 Nov 16 |
nicklas |
81 |
*/ |
4203 |
02 Nov 16 |
nicklas |
82 |
public static final OpenGridService getInstance() |
4203 |
02 Nov 16 |
nicklas |
83 |
{ |
4203 |
02 Nov 16 |
nicklas |
84 |
if (instance == null) |
4203 |
02 Nov 16 |
nicklas |
85 |
{ |
4203 |
02 Nov 16 |
nicklas |
86 |
synchronized (OpenGridService.class) |
4203 |
02 Nov 16 |
nicklas |
87 |
{ |
4203 |
02 Nov 16 |
nicklas |
88 |
if (instance == null) |
4203 |
02 Nov 16 |
nicklas |
89 |
{ |
4203 |
02 Nov 16 |
nicklas |
90 |
OpenGridService tmp = new OpenGridService(); |
4203 |
02 Nov 16 |
nicklas |
91 |
instance = tmp; |
4203 |
02 Nov 16 |
nicklas |
92 |
} |
4203 |
02 Nov 16 |
nicklas |
93 |
} |
4203 |
02 Nov 16 |
nicklas |
94 |
} |
4205 |
03 Nov 16 |
nicklas |
// Keep the session control alive |
4205 |
03 Nov 16 |
nicklas |
96 |
if (instance.rootSc != null) instance.rootSc.updateLastAccess(); |
4203 |
02 Nov 16 |
nicklas |
97 |
return instance; |
4203 |
02 Nov 16 |
nicklas |
98 |
} |
4203 |
02 Nov 16 |
nicklas |
99 |
|
4226 |
10 Nov 16 |
nicklas |
// At least 1 minute between status updates (30 seconds when debugging) |
7380 |
18 Oct 23 |
nicklas |
101 |
private static final long MIN_WAIT_INTERVAL_NORMAL = 60_000; |
7380 |
18 Oct 23 |
nicklas |
102 |
private static final long MIN_WAIT_INTERVAL_DEBUG = 30_000; |
4226 |
10 Nov 16 |
nicklas |
103 |
|
7380 |
18 Oct 23 |
nicklas |
// At least 1 hour between cleanups (30 seconds when debugging) |
7380 |
18 Oct 23 |
nicklas |
105 |
private static final long MIN_CLEANUP_INTERVAL_NORMAL = 3_600_000; |
7380 |
18 Oct 23 |
nicklas |
106 |
private static final long MIN_CLEANUP_INTERVAL_DEBUG = 30_000; |
7380 |
18 Oct 23 |
nicklas |
107 |
|
4203 |
02 Nov 16 |
nicklas |
// Map Cluster ID -> Cluster |
4203 |
02 Nov 16 |
nicklas |
109 |
private final Map<String, OpenGridCluster> clusters; |
4203 |
02 Nov 16 |
nicklas |
110 |
|
4203 |
02 Nov 16 |
nicklas |
111 |
private volatile boolean isRunning; |
4203 |
02 Nov 16 |
nicklas |
112 |
private TimerTask jobStatusTimer; |
7380 |
18 Oct 23 |
nicklas |
113 |
private TimerTask cleanupTimer; |
4226 |
10 Nov 16 |
nicklas |
114 |
private long lastJobStatusUpdate; |
7380 |
18 Oct 23 |
nicklas |
115 |
private long lastCleanup; |
4226 |
10 Nov 16 |
nicklas |
116 |
|
4203 |
02 Nov 16 |
nicklas |
117 |
private SessionControl systemSc; |
4205 |
03 Nov 16 |
nicklas |
118 |
private SessionControl rootSc; |
4203 |
02 Nov 16 |
nicklas |
119 |
private Extension<ServiceControllerAction> ext; |
7075 |
27 Mar 23 |
nicklas |
// private ServiceLogger logger; |
4203 |
02 Nov 16 |
nicklas |
121 |
|
4254 |
25 Nov 16 |
nicklas |
122 |
private final Set<JobIdentifier> jobsToAbort; |
4254 |
25 Nov 16 |
nicklas |
123 |
private final Set<JobIdentifier> jobsToUpdate; |
4284 |
21 Dec 16 |
nicklas |
124 |
private final Map<JobIdentifier, JobStatusUpdater> nonGridJobsToUpdate; |
4254 |
25 Nov 16 |
nicklas |
125 |
private final Set<JobIdentifier> unknownJobs; |
4205 |
03 Nov 16 |
nicklas |
126 |
|
4203 |
02 Nov 16 |
nicklas |
127 |
private OpenGridService() |
4203 |
02 Nov 16 |
nicklas |
128 |
{ |
6634 |
08 Mar 22 |
nicklas |
129 |
clusters = new LinkedHashMap<>(); |
4205 |
03 Nov 16 |
nicklas |
130 |
jobsToAbort = new LinkedHashSet<>(); |
4205 |
03 Nov 16 |
nicklas |
131 |
jobsToUpdate = new LinkedHashSet<>(); |
4284 |
21 Dec 16 |
nicklas |
132 |
nonGridJobsToUpdate = new LinkedHashMap<>(); |
4222 |
09 Nov 16 |
nicklas |
133 |
unknownJobs = new LinkedHashSet<>(); |
4203 |
02 Nov 16 |
nicklas |
134 |
} |
4203 |
02 Nov 16 |
nicklas |
135 |
|
4265 |
15 Dec 16 |
nicklas |
136 |
/** |
4265 |
15 Dec 16 |
nicklas |
Check if a cluster with the given id has been defined and |
4265 |
15 Dec 16 |
nicklas |
registered with this service. |
4265 |
15 Dec 16 |
nicklas |
139 |
*/ |
4265 |
15 Dec 16 |
nicklas |
140 |
public boolean isDefined(String clusterId) |
4265 |
15 Dec 16 |
nicklas |
141 |
{ |
4265 |
15 Dec 16 |
nicklas |
142 |
return clusters.containsKey(clusterId); |
4265 |
15 Dec 16 |
nicklas |
143 |
} |
4254 |
25 Nov 16 |
nicklas |
144 |
|
4254 |
25 Nov 16 |
nicklas |
145 |
/** |
4255 |
28 Nov 16 |
nicklas |
Get information about a cluster with known id. If the cluster is |
4255 |
28 Nov 16 |
nicklas |
linked to a job agent, a permission check is made to see if the |
4255 |
28 Nov 16 |
nicklas |
logged in user has USE permission on the job agent. |
4255 |
28 Nov 16 |
nicklas |
149 |
|
4255 |
28 Nov 16 |
nicklas |
@param dc An open DbControl for permission check against job agents |
4255 |
28 Nov 16 |
nicklas |
(null is allowed if the cluster is not linked to a job agent) |
4255 |
28 Nov 16 |
nicklas |
@param clusterId The ID of the cluster |
4255 |
28 Nov 16 |
nicklas |
153 |
|
4254 |
25 Nov 16 |
nicklas |
@return A cluster instance or null if not found |
4255 |
28 Nov 16 |
nicklas |
155 |
|
4255 |
28 Nov 16 |
nicklas |
@throws ItemNotFoundException If a cluster is found but the job agent it is |
4255 |
28 Nov 16 |
nicklas |
linked to is not found |
4255 |
28 Nov 16 |
nicklas |
@throws PermissionDeniedException If the cluster is linked to a job agent |
4255 |
28 Nov 16 |
nicklas |
that the user doens't have permission to use |
4254 |
25 Nov 16 |
nicklas |
160 |
*/ |
4255 |
28 Nov 16 |
nicklas |
161 |
public OpenGridCluster getClusterById(DbControl dc, String clusterId) |
4205 |
03 Nov 16 |
nicklas |
162 |
{ |
4255 |
28 Nov 16 |
nicklas |
163 |
OpenGridCluster cluster = clusters.get(clusterId); |
4255 |
28 Nov 16 |
nicklas |
164 |
if (cluster == null) return null; |
4255 |
28 Nov 16 |
nicklas |
165 |
|
4255 |
28 Nov 16 |
nicklas |
166 |
String jobAgentId = cluster.getConfig().getJobAgentExternalId(); |
4255 |
28 Nov 16 |
nicklas |
167 |
if (jobAgentId != null) |
4255 |
28 Nov 16 |
nicklas |
168 |
{ |
4255 |
28 Nov 16 |
nicklas |
169 |
if (dc == null) throw new PermissionDeniedException(Permission.USE, jobAgentId); |
4255 |
28 Nov 16 |
nicklas |
170 |
JobAgent agent = JobAgent.getByExternalId(dc, jobAgentId); |
4255 |
28 Nov 16 |
nicklas |
171 |
agent.checkPermission(Permission.USE); |
4255 |
28 Nov 16 |
nicklas |
172 |
} |
4255 |
28 Nov 16 |
nicklas |
173 |
return cluster; |
4205 |
03 Nov 16 |
nicklas |
174 |
} |
4205 |
03 Nov 16 |
nicklas |
175 |
|
4203 |
02 Nov 16 |
nicklas |
176 |
/** |
5995 |
21 Aug 20 |
nicklas |
@see #getClusters(DbControl, Collection, Filter) |
5995 |
21 Aug 20 |
nicklas |
178 |
*/ |
5995 |
21 Aug 20 |
nicklas |
179 |
public Collection<OpenGridCluster> getClusters(DbControl dc, Collection<Include> include) |
5995 |
21 Aug 20 |
nicklas |
180 |
{ |
5995 |
21 Aug 20 |
nicklas |
181 |
return getClusters(dc, include, null); |
5995 |
21 Aug 20 |
nicklas |
182 |
} |
5995 |
21 Aug 20 |
nicklas |
183 |
|
5995 |
21 Aug 20 |
nicklas |
184 |
/** |
4255 |
28 Nov 16 |
nicklas |
Get all clusters that the logged in user is allowed to use. |
4255 |
28 Nov 16 |
nicklas |
This method uses a query for job agents with the given include |
4255 |
28 Nov 16 |
nicklas |
settings (for example, {@link Include#IN_PROJECT}). Clusters that |
4255 |
28 Nov 16 |
nicklas |
are open to all are always included. |
4255 |
28 Nov 16 |
nicklas |
189 |
|
4255 |
28 Nov 16 |
nicklas |
@param dc An open DbControl |
4255 |
28 Nov 16 |
nicklas |
@param include A set of include options for the job agent query |
4255 |
28 Nov 16 |
nicklas |
(if null the default query options are used) |
5995 |
21 Aug 20 |
nicklas |
@param filter Optional filter, if not specified all clusters are returned |
5995 |
21 Aug 20 |
nicklas |
@since 1.4 |
4254 |
25 Nov 16 |
nicklas |
195 |
*/ |
5995 |
21 Aug 20 |
nicklas |
196 |
public Collection<OpenGridCluster> getClusters(DbControl dc, Collection<Include> include, Filter<OpenGridCluster> filter) |
4254 |
25 Nov 16 |
nicklas |
197 |
{ |
4255 |
28 Nov 16 |
nicklas |
198 |
ItemQuery<JobAgent> query = JobAgent.getQuery(); |
4255 |
28 Nov 16 |
nicklas |
199 |
if (include != null) query.setIncludes(include); |
5995 |
21 Aug 20 |
nicklas |
200 |
return getClusters(query.list(dc), filter); |
4254 |
25 Nov 16 |
nicklas |
201 |
} |
4254 |
25 Nov 16 |
nicklas |
202 |
|
4254 |
25 Nov 16 |
nicklas |
203 |
/** |
5995 |
21 Aug 20 |
nicklas |
@see #getClusters(Collection, Filter) |
5995 |
21 Aug 20 |
nicklas |
205 |
*/ |
5995 |
21 Aug 20 |
nicklas |
206 |
public Collection<OpenGridCluster> getClusters(Collection<JobAgent> jobAgents) |
5995 |
21 Aug 20 |
nicklas |
207 |
{ |
5995 |
21 Aug 20 |
nicklas |
208 |
return getClusters(jobAgents, null); |
5995 |
21 Aug 20 |
nicklas |
209 |
} |
5995 |
21 Aug 20 |
nicklas |
210 |
|
5995 |
21 Aug 20 |
nicklas |
211 |
/** |
4255 |
28 Nov 16 |
nicklas |
Get a collection with all registered clusters that are |
4255 |
28 Nov 16 |
nicklas |
either open to all or referencing one of the listed job agents |
4255 |
28 Nov 16 |
nicklas |
where the logged in user has at least USE permission. |
4255 |
28 Nov 16 |
nicklas |
215 |
|
4255 |
28 Nov 16 |
nicklas |
@param jobAgents A list of job agents (must exist in the database) |
4255 |
28 Nov 16 |
nicklas |
If null or empty, only clusters that are open to all are |
4255 |
28 Nov 16 |
nicklas |
returned |
5995 |
21 Aug 20 |
nicklas |
@param filter Optional filter, if not specified all clusters are returned |
5995 |
21 Aug 20 |
nicklas |
@since 1.4 |
4255 |
28 Nov 16 |
nicklas |
221 |
*/ |
5995 |
21 Aug 20 |
nicklas |
222 |
public Collection<OpenGridCluster> getClusters(Collection<JobAgent> jobAgents, Filter<OpenGridCluster> filter) |
4255 |
28 Nov 16 |
nicklas |
223 |
{ |
4255 |
28 Nov 16 |
nicklas |
// Collect the external id of the given job agents |
4255 |
28 Nov 16 |
nicklas |
225 |
Set<String> usableJobAgents = new HashSet<>(); |
4255 |
28 Nov 16 |
nicklas |
226 |
if (jobAgents != null) |
4255 |
28 Nov 16 |
nicklas |
227 |
{ |
4255 |
28 Nov 16 |
nicklas |
228 |
for (JobAgent agent : jobAgents) |
4255 |
28 Nov 16 |
nicklas |
229 |
{ |
4255 |
28 Nov 16 |
nicklas |
230 |
if (agent.isInDatabase() && agent.hasPermission(Permission.USE)) |
4255 |
28 Nov 16 |
nicklas |
231 |
{ |
4255 |
28 Nov 16 |
nicklas |
232 |
usableJobAgents.add(agent.getExternalId()); |
4255 |
28 Nov 16 |
nicklas |
233 |
} |
4255 |
28 Nov 16 |
nicklas |
234 |
} |
4255 |
28 Nov 16 |
nicklas |
235 |
} |
4255 |
28 Nov 16 |
nicklas |
236 |
|
4255 |
28 Nov 16 |
nicklas |
237 |
List<OpenGridCluster> result = new ArrayList<>(); |
5995 |
21 Aug 20 |
nicklas |
238 |
if (filter == null) filter = new StaticFilter<OpenGridCluster>(true); |
4255 |
28 Nov 16 |
nicklas |
239 |
for (OpenGridCluster cluster : clusters.values()) |
4255 |
28 Nov 16 |
nicklas |
240 |
{ |
5995 |
21 Aug 20 |
nicklas |
241 |
if (filter.evaluate(cluster)) |
4255 |
28 Nov 16 |
nicklas |
242 |
{ |
5995 |
21 Aug 20 |
nicklas |
243 |
String jobAgentId = cluster.getConfig().getJobAgentExternalId(); |
5995 |
21 Aug 20 |
nicklas |
244 |
if (jobAgentId == null || usableJobAgents.contains(jobAgentId)) |
5995 |
21 Aug 20 |
nicklas |
245 |
{ |
5995 |
21 Aug 20 |
nicklas |
246 |
result.add(cluster); |
5995 |
21 Aug 20 |
nicklas |
247 |
} |
4255 |
28 Nov 16 |
nicklas |
248 |
} |
4255 |
28 Nov 16 |
nicklas |
249 |
} |
4255 |
28 Nov 16 |
nicklas |
250 |
return result; |
4255 |
28 Nov 16 |
nicklas |
251 |
} |
4255 |
28 Nov 16 |
nicklas |
252 |
|
4255 |
28 Nov 16 |
nicklas |
253 |
/** |
4203 |
02 Nov 16 |
nicklas |
Is the Open Grid service running or not? |
4203 |
02 Nov 16 |
nicklas |
255 |
*/ |
4203 |
02 Nov 16 |
nicklas |
256 |
public boolean isRunning() |
4203 |
02 Nov 16 |
nicklas |
257 |
{ |
4203 |
02 Nov 16 |
nicklas |
258 |
return isRunning; |
4203 |
02 Nov 16 |
nicklas |
259 |
} |
4203 |
02 Nov 16 |
nicklas |
260 |
|
4203 |
02 Nov 16 |
nicklas |
261 |
/** |
4203 |
02 Nov 16 |
nicklas |
Start the service if it is not running. |
4203 |
02 Nov 16 |
nicklas |
263 |
*/ |
4254 |
25 Nov 16 |
nicklas |
264 |
synchronized void start(SessionControl systemSc, Extension<ServiceControllerAction> ext) |
4203 |
02 Nov 16 |
nicklas |
265 |
{ |
4222 |
09 Nov 16 |
nicklas |
266 |
if (isRunning) return; |
4222 |
09 Nov 16 |
nicklas |
267 |
|
4222 |
09 Nov 16 |
nicklas |
268 |
logger.debug("Starting Open Grid cluster service"); |
4254 |
25 Nov 16 |
nicklas |
269 |
try |
4254 |
25 Nov 16 |
nicklas |
270 |
{ |
4254 |
25 Nov 16 |
nicklas |
271 |
this.systemSc = systemSc; |
4254 |
25 Nov 16 |
nicklas |
272 |
this.rootSc = systemSc.impersonateLogin(SystemItems.getId(User.ROOT), "Open Grid Scheduler Services"); |
4254 |
25 Nov 16 |
nicklas |
273 |
this.ext = ext; |
4254 |
25 Nov 16 |
nicklas |
274 |
|
4254 |
25 Nov 16 |
nicklas |
275 |
XmlConfig cfg = new XmlConfig(); |
4254 |
25 Nov 16 |
nicklas |
276 |
cfg.readFromResource("/opengrid-config.xml"); |
4254 |
25 Nov 16 |
nicklas |
277 |
for (OpenGridCluster cluster : cfg.getClusters()) |
4254 |
25 Nov 16 |
nicklas |
278 |
{ |
4255 |
28 Nov 16 |
nicklas |
279 |
if (clusters.containsKey(cluster.getId())) |
4255 |
28 Nov 16 |
nicklas |
280 |
{ |
4255 |
28 Nov 16 |
nicklas |
281 |
logger.warn("Ignoring duplicate cluster definition: " + cluster); |
4255 |
28 Nov 16 |
nicklas |
282 |
} |
4255 |
28 Nov 16 |
nicklas |
283 |
else |
4255 |
28 Nov 16 |
nicklas |
284 |
{ |
4255 |
28 Nov 16 |
nicklas |
285 |
clusters.put(cluster.getId(), cluster); |
4255 |
28 Nov 16 |
nicklas |
286 |
} |
4254 |
25 Nov 16 |
nicklas |
287 |
} |
4254 |
25 Nov 16 |
nicklas |
288 |
|
4254 |
25 Nov 16 |
nicklas |
// Start timer for processing async requests |
4254 |
25 Nov 16 |
nicklas |
290 |
jobStatusTimer = Application.getScheduler().scheduleAtFixedRate( |
4254 |
25 Nov 16 |
nicklas |
291 |
new JobStatusTimerTask(), 5000, 5000, false); |
7380 |
18 Oct 23 |
nicklas |
292 |
|
7380 |
18 Oct 23 |
nicklas |
// Start timer for cleaning up on the clusters |
7380 |
18 Oct 23 |
nicklas |
294 |
cleanupTimer = Application.getScheduler().scheduleAtFixedRate( |
7380 |
18 Oct 23 |
nicklas |
295 |
new CleanupTimerTask(), 60000, 60000, false); |
4254 |
25 Nov 16 |
nicklas |
296 |
} |
4254 |
25 Nov 16 |
nicklas |
297 |
catch (RuntimeException ex) |
4254 |
25 Nov 16 |
nicklas |
298 |
{ |
4254 |
25 Nov 16 |
nicklas |
299 |
logger.debug("Could not start Open Grid cluster service", ex); |
4254 |
25 Nov 16 |
nicklas |
300 |
stopInternal(); |
4254 |
25 Nov 16 |
nicklas |
301 |
throw ex; |
4254 |
25 Nov 16 |
nicklas |
302 |
} |
4222 |
09 Nov 16 |
nicklas |
303 |
|
4222 |
09 Nov 16 |
nicklas |
304 |
isRunning = true; |
4222 |
09 Nov 16 |
nicklas |
305 |
logger.debug("Open Grid cluster service is now running"); |
4203 |
02 Nov 16 |
nicklas |
306 |
} |
4203 |
02 Nov 16 |
nicklas |
307 |
|
4203 |
02 Nov 16 |
nicklas |
308 |
/** |
4203 |
02 Nov 16 |
nicklas |
Stop the service if it is running. |
4203 |
02 Nov 16 |
nicklas |
310 |
*/ |
4254 |
25 Nov 16 |
nicklas |
311 |
synchronized void stop() |
4203 |
02 Nov 16 |
nicklas |
312 |
{ |
4222 |
09 Nov 16 |
nicklas |
313 |
if (!isRunning) return; |
4254 |
25 Nov 16 |
nicklas |
314 |
logger.debug("Stopping Open Grid cluster service"); |
4254 |
25 Nov 16 |
nicklas |
315 |
stopInternal(); |
4254 |
25 Nov 16 |
nicklas |
316 |
logger.debug("Open Grid cluster service has stopped"); |
4254 |
25 Nov 16 |
nicklas |
317 |
} |
4222 |
09 Nov 16 |
nicklas |
318 |
|
4254 |
25 Nov 16 |
nicklas |
319 |
private void stopInternal() |
4254 |
25 Nov 16 |
nicklas |
320 |
{ |
4222 |
09 Nov 16 |
nicklas |
321 |
if (jobStatusTimer != null) |
4203 |
02 Nov 16 |
nicklas |
322 |
{ |
4222 |
09 Nov 16 |
nicklas |
323 |
jobStatusTimer.cancel(); |
4222 |
09 Nov 16 |
nicklas |
324 |
jobStatusTimer = null; |
4203 |
02 Nov 16 |
nicklas |
325 |
} |
7380 |
18 Oct 23 |
nicklas |
326 |
if (cleanupTimer != null) |
7380 |
18 Oct 23 |
nicklas |
327 |
{ |
7380 |
18 Oct 23 |
nicklas |
328 |
cleanupTimer.cancel(); |
7380 |
18 Oct 23 |
nicklas |
329 |
cleanupTimer = null; |
7380 |
18 Oct 23 |
nicklas |
330 |
} |
4254 |
25 Nov 16 |
nicklas |
331 |
|
4254 |
25 Nov 16 |
nicklas |
332 |
jobsToAbort.clear(); |
4254 |
25 Nov 16 |
nicklas |
333 |
jobsToUpdate.clear(); |
4284 |
21 Dec 16 |
nicklas |
334 |
nonGridJobsToUpdate.clear(); |
4254 |
25 Nov 16 |
nicklas |
335 |
unknownJobs.clear(); |
4254 |
25 Nov 16 |
nicklas |
336 |
clusters.clear(); |
4254 |
25 Nov 16 |
nicklas |
337 |
|
4222 |
09 Nov 16 |
nicklas |
338 |
if (rootSc != null) rootSc.logout(); |
4222 |
09 Nov 16 |
nicklas |
339 |
rootSc = null; |
4222 |
09 Nov 16 |
nicklas |
340 |
systemSc = null; |
4222 |
09 Nov 16 |
nicklas |
341 |
ext = null; |
4222 |
09 Nov 16 |
nicklas |
342 |
isRunning = false; |
4203 |
02 Nov 16 |
nicklas |
343 |
} |
4254 |
25 Nov 16 |
nicklas |
344 |
|
4205 |
03 Nov 16 |
nicklas |
345 |
/** |
4205 |
03 Nov 16 |
nicklas |
Register a job that should be aborted. The job |
4205 |
03 Nov 16 |
nicklas |
identifier may or may not include a BASE job id. |
4205 |
03 Nov 16 |
nicklas |
If a BASE job id is specified the information in |
4205 |
03 Nov 16 |
nicklas |
the database is updated once the job has been |
4205 |
03 Nov 16 |
nicklas |
aborted on the cluster. |
4205 |
03 Nov 16 |
nicklas |
351 |
|
4205 |
03 Nov 16 |
nicklas |
@param jobId The job identifier |
4205 |
03 Nov 16 |
nicklas |
353 |
*/ |
4203 |
02 Nov 16 |
nicklas |
354 |
public void asyncJobAbort(JobIdentifier jobId) |
4203 |
02 Nov 16 |
nicklas |
355 |
{ |
4205 |
03 Nov 16 |
nicklas |
356 |
if (jobId == null) |
4205 |
03 Nov 16 |
nicklas |
357 |
{ |
4205 |
03 Nov 16 |
nicklas |
358 |
throw new NullPointerException("jobId"); |
4205 |
03 Nov 16 |
nicklas |
359 |
} |
4203 |
02 Nov 16 |
nicklas |
360 |
if (!isRunning) return; |
7075 |
27 Mar 23 |
nicklas |
361 |
if (logger.isDebugEnabled()) logger.debug("Request abort job: " + jobId); |
4205 |
03 Nov 16 |
nicklas |
362 |
synchronized (jobsToAbort) |
4205 |
03 Nov 16 |
nicklas |
363 |
{ |
4205 |
03 Nov 16 |
nicklas |
364 |
jobsToAbort.add(jobId); |
4205 |
03 Nov 16 |
nicklas |
365 |
} |
4203 |
02 Nov 16 |
nicklas |
366 |
} |
4203 |
02 Nov 16 |
nicklas |
367 |
|
4205 |
03 Nov 16 |
nicklas |
368 |
/** |
4205 |
03 Nov 16 |
nicklas |
Register a status update request for a job. The job |
4205 |
03 Nov 16 |
nicklas |
identifier may or may not include a BASE job id. |
4205 |
03 Nov 16 |
nicklas |
If a BASE job id is specified the information in |
4205 |
03 Nov 16 |
nicklas |
the database is updated once the job information has |
4205 |
03 Nov 16 |
nicklas |
been retrieved from the cluster. |
4205 |
03 Nov 16 |
nicklas |
374 |
|
4205 |
03 Nov 16 |
nicklas |
@param jobId The job identifier |
4205 |
03 Nov 16 |
nicklas |
376 |
*/ |
4203 |
02 Nov 16 |
nicklas |
377 |
public void asyncJobStatusUpdate(JobIdentifier jobId) |
4203 |
02 Nov 16 |
nicklas |
378 |
{ |
4205 |
03 Nov 16 |
nicklas |
379 |
if (jobId == null) |
4205 |
03 Nov 16 |
nicklas |
380 |
{ |
4205 |
03 Nov 16 |
nicklas |
381 |
throw new NullPointerException("jobId"); |
4205 |
03 Nov 16 |
nicklas |
382 |
} |
4203 |
02 Nov 16 |
nicklas |
383 |
if (!isRunning) return; |
4205 |
03 Nov 16 |
nicklas |
384 |
if (logger.isDebugEnabled()) logger.debug("Request status update: " + jobId); |
4205 |
03 Nov 16 |
nicklas |
385 |
synchronized (jobsToUpdate) |
4205 |
03 Nov 16 |
nicklas |
386 |
{ |
4205 |
03 Nov 16 |
nicklas |
387 |
jobsToUpdate.add(jobId); |
4205 |
03 Nov 16 |
nicklas |
388 |
} |
4205 |
03 Nov 16 |
nicklas |
389 |
} |
4205 |
03 Nov 16 |
nicklas |
390 |
|
4226 |
10 Nov 16 |
nicklas |
391 |
/** |
4284 |
21 Dec 16 |
nicklas |
Register a status update request for a job that is is |
4284 |
21 Dec 16 |
nicklas |
not an Open Grid job. This exists as a service to other |
4284 |
21 Dec 16 |
nicklas |
extensions that need to connect to a remote host and |
4284 |
21 Dec 16 |
nicklas |
make some checks via SSH. |
4284 |
21 Dec 16 |
nicklas |
396 |
|
4309 |
17 Jan 17 |
nicklas |
If a cluster that matches the {@link JobIdentifier#getClusterId()} |
4309 |
17 Jan 17 |
nicklas |
is found the {@link JobStatusUpdater#getJobStatus(OpenGridSession, JobIdentifier)} |
4309 |
17 Jan 17 |
nicklas |
is provided with a connected session, otherwise that parameter is null. |
4309 |
17 Jan 17 |
nicklas |
400 |
|
4309 |
17 Jan 17 |
nicklas |
The {@link JobIdentifier#getClusterJobId()} can always be substituted |
4309 |
17 Jan 17 |
nicklas |
with some other ID. |
4309 |
17 Jan 17 |
nicklas |
403 |
|
4309 |
17 Jan 17 |
nicklas |
@param jobId A job identifier with exceptions as noted above |
4301 |
13 Jan 17 |
nicklas |
@param updater The actual instance that performs the update check |
4284 |
21 Dec 16 |
nicklas |
406 |
*/ |
4284 |
21 Dec 16 |
nicklas |
407 |
public void asyncJobStatusUpdate(JobIdentifier jobId, JobStatusUpdater updater) |
4284 |
21 Dec 16 |
nicklas |
408 |
{ |
4284 |
21 Dec 16 |
nicklas |
409 |
if (jobId == null) throw new NullPointerException("jobId"); |
4284 |
21 Dec 16 |
nicklas |
410 |
if (updater == null) throw new NullPointerException("updater["+jobId + "]"); |
4284 |
21 Dec 16 |
nicklas |
411 |
if (!isRunning) return; |
4284 |
21 Dec 16 |
nicklas |
412 |
if (logger.isDebugEnabled()) logger.debug("Request status update: " + jobId); |
4284 |
21 Dec 16 |
nicklas |
413 |
synchronized (nonGridJobsToUpdate) |
4284 |
21 Dec 16 |
nicklas |
414 |
{ |
4284 |
21 Dec 16 |
nicklas |
415 |
nonGridJobsToUpdate.put(jobId, updater); |
4284 |
21 Dec 16 |
nicklas |
416 |
} |
4284 |
21 Dec 16 |
nicklas |
417 |
} |
4284 |
21 Dec 16 |
nicklas |
418 |
|
4284 |
21 Dec 16 |
nicklas |
419 |
/** |
4226 |
10 Nov 16 |
nicklas |
Process async requests since the last time. ABORT requests are |
4226 |
10 Nov 16 |
nicklas |
always processed. STATUS requests are only processed if it was |
4226 |
10 Nov 16 |
nicklas |
more the MIN_WAIT_INTERVAL milliseconds since that last update. |
4226 |
10 Nov 16 |
nicklas |
423 |
*/ |
4205 |
03 Nov 16 |
nicklas |
424 |
void processAsyncRequests() |
4205 |
03 Nov 16 |
nicklas |
425 |
{ |
4226 |
10 Nov 16 |
nicklas |
426 |
long now = System.currentTimeMillis(); |
4226 |
10 Nov 16 |
nicklas |
427 |
long timeSinceLastUpdate = now - lastJobStatusUpdate; |
7075 |
27 Mar 23 |
nicklas |
428 |
long minWaitInterval = logger.isDebugEnabled() ? MIN_WAIT_INTERVAL_DEBUG : MIN_WAIT_INTERVAL_NORMAL; |
7075 |
27 Mar 23 |
nicklas |
429 |
boolean doStatusUpdate = timeSinceLastUpdate > minWaitInterval; |
4284 |
21 Dec 16 |
nicklas |
430 |
|
7075 |
27 Mar 23 |
nicklas |
431 |
logger.debug("Processing async requests"); |
7075 |
27 Mar 23 |
nicklas |
432 |
if (logger.isTraceEnabled()) |
4205 |
03 Nov 16 |
nicklas |
433 |
{ |
7075 |
27 Mar 23 |
nicklas |
434 |
logger.trace("jobsToAbort=" + jobsToAbort); |
7075 |
27 Mar 23 |
nicklas |
435 |
logger.trace("jobsToUpdate=" + jobsToUpdate); |
7075 |
27 Mar 23 |
nicklas |
436 |
logger.trace("nonGridJobsToUpdate=" + nonGridJobsToUpdate); |
7075 |
27 Mar 23 |
nicklas |
437 |
logger.trace("unknownJobs=" + unknownJobs); |
7075 |
27 Mar 23 |
nicklas |
438 |
logger.trace("timeSinceLastUpdate=" + timeSinceLastUpdate); |
7075 |
27 Mar 23 |
nicklas |
439 |
logger.trace("doStatusUpdate=" + doStatusUpdate); |
4205 |
03 Nov 16 |
nicklas |
440 |
} |
4203 |
02 Nov 16 |
nicklas |
441 |
|
4205 |
03 Nov 16 |
nicklas |
// Create at most one session to each cluster |
4205 |
03 Nov 16 |
nicklas |
443 |
Map<String, OpenGridSession> sessions = new HashMap<>(); |
4205 |
03 Nov 16 |
nicklas |
444 |
|
4205 |
03 Nov 16 |
nicklas |
// Make copy of current async requests and clear the queues |
4226 |
10 Nov 16 |
nicklas |
446 |
List<JobIdentifier> tmpAbort = null; |
4226 |
10 Nov 16 |
nicklas |
447 |
List<JobIdentifier> tmpUpdate = null; |
4284 |
21 Dec 16 |
nicklas |
448 |
Map<JobIdentifier, JobStatusUpdater> tmpNonGridUpdate = null; |
4222 |
09 Nov 16 |
nicklas |
449 |
List<JobIdentifier> tmpUnknown = new ArrayList<>(); |
4205 |
03 Nov 16 |
nicklas |
450 |
synchronized (jobsToAbort) |
4205 |
03 Nov 16 |
nicklas |
451 |
{ |
4205 |
03 Nov 16 |
nicklas |
452 |
tmpAbort = new ArrayList<>(jobsToAbort); |
4205 |
03 Nov 16 |
nicklas |
453 |
jobsToAbort.clear(); |
4205 |
03 Nov 16 |
nicklas |
454 |
} |
4226 |
10 Nov 16 |
nicklas |
455 |
|
6830 |
02 Sep 22 |
nicklas |
456 |
List<JobIdentifier> updatedNow = new ArrayList<>(); |
4226 |
10 Nov 16 |
nicklas |
457 |
if (doStatusUpdate) |
4205 |
03 Nov 16 |
nicklas |
458 |
{ |
4226 |
10 Nov 16 |
nicklas |
459 |
synchronized (jobsToUpdate) |
4226 |
10 Nov 16 |
nicklas |
460 |
{ |
4226 |
10 Nov 16 |
nicklas |
461 |
jobsToUpdate.addAll(unknownJobs); // Always re-check jobs that was 'not found' the last time |
4226 |
10 Nov 16 |
nicklas |
462 |
jobsToUpdate.removeAll(tmpAbort); // No update for aborted jobs |
4226 |
10 Nov 16 |
nicklas |
463 |
tmpUpdate = new ArrayList<>(jobsToUpdate); |
4226 |
10 Nov 16 |
nicklas |
464 |
jobsToUpdate.clear(); |
4226 |
10 Nov 16 |
nicklas |
465 |
} |
4284 |
21 Dec 16 |
nicklas |
466 |
|
4284 |
21 Dec 16 |
nicklas |
467 |
synchronized (nonGridJobsToUpdate) |
4284 |
21 Dec 16 |
nicklas |
468 |
{ |
4284 |
21 Dec 16 |
nicklas |
469 |
tmpNonGridUpdate = new HashMap<>(nonGridJobsToUpdate); |
4284 |
21 Dec 16 |
nicklas |
470 |
nonGridJobsToUpdate.clear(); |
4284 |
21 Dec 16 |
nicklas |
471 |
} |
4205 |
03 Nov 16 |
nicklas |
472 |
} |
4205 |
03 Nov 16 |
nicklas |
473 |
|
4205 |
03 Nov 16 |
nicklas |
474 |
try |
4205 |
03 Nov 16 |
nicklas |
475 |
{ |
4205 |
03 Nov 16 |
nicklas |
476 |
|
7075 |
27 Mar 23 |
nicklas |
477 |
logger.debug("Jobs to abort: " + tmpAbort.size()); |
4205 |
03 Nov 16 |
nicklas |
478 |
for (JobIdentifier jobId : tmpAbort) |
4205 |
03 Nov 16 |
nicklas |
479 |
{ |
4205 |
03 Nov 16 |
nicklas |
480 |
logger.debug("Processing abort: " + jobId); |
4205 |
03 Nov 16 |
nicklas |
481 |
|
4205 |
03 Nov 16 |
nicklas |
// Get new or existing session |
4205 |
03 Nov 16 |
nicklas |
483 |
OpenGridSession session = getSession(jobId, sessions); |
4205 |
03 Nov 16 |
nicklas |
484 |
|
4205 |
03 Nov 16 |
nicklas |
// If we could not get connection to cluster, skip this and continue with next |
4205 |
03 Nov 16 |
nicklas |
486 |
if (session == null) continue; |
4205 |
03 Nov 16 |
nicklas |
487 |
|
4222 |
09 Nov 16 |
nicklas |
488 |
CmdResult<String> qdel = session.qdel(jobId); |
4205 |
03 Nov 16 |
nicklas |
489 |
if (jobId.getBaseJobId() != 0) |
4205 |
03 Nov 16 |
nicklas |
490 |
{ |
4302 |
16 Jan 17 |
nicklas |
491 |
if (qdel.getExitStatus() == 0) |
4302 |
16 Jan 17 |
nicklas |
492 |
{ |
4302 |
16 Jan 17 |
nicklas |
493 |
setProgressOnBaseJob(jobId, 99, qdel.getStdout()); |
4302 |
16 Jan 17 |
nicklas |
494 |
} |
4302 |
16 Jan 17 |
nicklas |
495 |
else |
4302 |
16 Jan 17 |
nicklas |
496 |
{ |
4302 |
16 Jan 17 |
nicklas |
497 |
setErrorOnBaseJob(jobId, "Failed to abort job: " + qdel.getStderr()); |
4302 |
16 Jan 17 |
nicklas |
498 |
} |
4205 |
03 Nov 16 |
nicklas |
499 |
} |
4205 |
03 Nov 16 |
nicklas |
500 |
} |
4205 |
03 Nov 16 |
nicklas |
501 |
|
7075 |
27 Mar 23 |
nicklas |
502 |
if (!doStatusUpdate) |
4205 |
03 Nov 16 |
nicklas |
503 |
{ |
7075 |
27 Mar 23 |
nicklas |
504 |
logger.debug("No status update since not long enough wait time ["+(timeSinceLastUpdate/1000)+" seconds]"); |
7075 |
27 Mar 23 |
nicklas |
505 |
} |
7075 |
27 Mar 23 |
nicklas |
506 |
else |
7075 |
27 Mar 23 |
nicklas |
507 |
{ |
7075 |
27 Mar 23 |
nicklas |
508 |
logger.debug("Jobs for status update: " + tmpUpdate.size()); |
4245 |
21 Nov 16 |
nicklas |
509 |
JobCompletionInvoker jobCompletionInvoker = null; |
4245 |
21 Nov 16 |
nicklas |
510 |
|
4226 |
10 Nov 16 |
nicklas |
511 |
for (JobIdentifier jobId : tmpUpdate) |
4212 |
08 Nov 16 |
nicklas |
512 |
{ |
4226 |
10 Nov 16 |
nicklas |
// Get new or existing session |
4226 |
10 Nov 16 |
nicklas |
514 |
OpenGridSession session = getSession(jobId, sessions); |
4222 |
09 Nov 16 |
nicklas |
515 |
|
4226 |
10 Nov 16 |
nicklas |
// If we could not get connection to cluster, skip this and continue with next job |
4226 |
10 Nov 16 |
nicklas |
517 |
if (session == null) continue; |
4226 |
10 Nov 16 |
nicklas |
518 |
|
4226 |
10 Nov 16 |
nicklas |
519 |
CmdResult<JobStatus> qstat = session.qstat(jobId, true); |
4226 |
10 Nov 16 |
nicklas |
520 |
JobStatus status = qstat.getResult(); |
4222 |
09 Nov 16 |
nicklas |
521 |
|
4226 |
10 Nov 16 |
nicklas |
522 |
if (status == null && qstat.getExitStatus() == 1) |
4212 |
08 Nov 16 |
nicklas |
523 |
{ |
4226 |
10 Nov 16 |
nicklas |
524 |
logger.debug("No 'qstat' information for job: " + jobId); |
4226 |
10 Nov 16 |
nicklas |
// No information was found by 'qstat' |
4226 |
10 Nov 16 |
nicklas |
// Check with 'qacct' if the job has ended |
4226 |
10 Nov 16 |
nicklas |
527 |
|
4226 |
10 Nov 16 |
nicklas |
528 |
CmdResult<JobStatus> qacct = session.qacct(jobId, true); |
4226 |
10 Nov 16 |
nicklas |
529 |
status = qacct.getResult(); |
4226 |
10 Nov 16 |
nicklas |
530 |
|
4226 |
10 Nov 16 |
nicklas |
531 |
if (status == null && qacct.getExitStatus() == 1) |
4212 |
08 Nov 16 |
nicklas |
532 |
{ |
4226 |
10 Nov 16 |
nicklas |
533 |
logger.debug("No 'qacct' information for job: " + jobId); |
4226 |
10 Nov 16 |
nicklas |
// Allow one 'not found' since there is a delay |
4226 |
10 Nov 16 |
nicklas |
// between a job disappearing from 'qstat' and appearing in 'qacct' |
4226 |
10 Nov 16 |
nicklas |
536 |
if (unknownJobs.contains(jobId)) |
4226 |
10 Nov 16 |
nicklas |
537 |
{ |
4226 |
10 Nov 16 |
nicklas |
538 |
logger.debug("Reporting not found: " + jobId); |
4226 |
10 Nov 16 |
nicklas |
// This is the second time the job was not found, report it as an error |
4226 |
10 Nov 16 |
nicklas |
540 |
setErrorOnBaseJob(jobId, qacct.getStderr()); |
4226 |
10 Nov 16 |
nicklas |
541 |
} |
4226 |
10 Nov 16 |
nicklas |
542 |
else |
4226 |
10 Nov 16 |
nicklas |
543 |
{ |
4226 |
10 Nov 16 |
nicklas |
544 |
logger.debug("Allowing one more status update for job: " + jobId); |
4226 |
10 Nov 16 |
nicklas |
545 |
tmpUnknown.add(jobId); |
4226 |
10 Nov 16 |
nicklas |
546 |
} |
4212 |
08 Nov 16 |
nicklas |
547 |
} |
4212 |
08 Nov 16 |
nicklas |
548 |
} |
4226 |
10 Nov 16 |
nicklas |
549 |
|
4226 |
10 Nov 16 |
nicklas |
550 |
if (status != null) |
4222 |
09 Nov 16 |
nicklas |
551 |
{ |
6830 |
02 Sep 22 |
nicklas |
552 |
updatedNow.add(jobId); |
4226 |
10 Nov 16 |
nicklas |
// We know something about this job... |
4226 |
10 Nov 16 |
nicklas |
// If it is WAITING do nothing |
4226 |
10 Nov 16 |
nicklas |
// If it is EXECUTING get the progress file |
4229 |
10 Nov 16 |
nicklas |
556 |
Job.Status jobStatus = status.getStatus(); |
4229 |
10 Nov 16 |
nicklas |
557 |
if (jobStatus == Job.Status.EXECUTING) |
4222 |
09 Nov 16 |
nicklas |
558 |
{ |
4229 |
10 Nov 16 |
nicklas |
559 |
session.readProgress(status); |
4229 |
10 Nov 16 |
nicklas |
560 |
} |
4229 |
10 Nov 16 |
nicklas |
561 |
else if (jobStatus == Job.Status.ERROR) |
4229 |
10 Nov 16 |
nicklas |
562 |
{ |
4229 |
10 Nov 16 |
nicklas |
563 |
if (status.getMessage() == null) |
4226 |
10 Nov 16 |
nicklas |
564 |
{ |
4229 |
10 Nov 16 |
nicklas |
565 |
session.readStderr(status); |
4226 |
10 Nov 16 |
nicklas |
566 |
} |
4222 |
09 Nov 16 |
nicklas |
567 |
} |
4226 |
10 Nov 16 |
nicklas |
568 |
|
4226 |
10 Nov 16 |
nicklas |
569 |
if (jobId.getBaseJobId() != 0) |
4226 |
10 Nov 16 |
nicklas |
570 |
{ |
4245 |
21 Nov 16 |
nicklas |
571 |
if (jobCompletionInvoker == null) |
4245 |
21 Nov 16 |
nicklas |
572 |
{ |
4245 |
21 Nov 16 |
nicklas |
573 |
jobCompletionInvoker = new JobCompletionInvoker(rootSc); |
4245 |
21 Nov 16 |
nicklas |
574 |
} |
4245 |
21 Nov 16 |
nicklas |
575 |
updateJobStatusInBase(status, session, jobCompletionInvoker); |
4226 |
10 Nov 16 |
nicklas |
576 |
} |
4222 |
09 Nov 16 |
nicklas |
577 |
} |
4222 |
09 Nov 16 |
nicklas |
578 |
} |
4222 |
09 Nov 16 |
nicklas |
579 |
|
4226 |
10 Nov 16 |
nicklas |
580 |
unknownJobs.clear(); |
4226 |
10 Nov 16 |
nicklas |
581 |
unknownJobs.addAll(tmpUnknown); |
4226 |
10 Nov 16 |
nicklas |
582 |
|
4284 |
21 Dec 16 |
nicklas |
583 |
for (Map.Entry<JobIdentifier, JobStatusUpdater> entry : tmpNonGridUpdate.entrySet()) |
4284 |
21 Dec 16 |
nicklas |
584 |
{ |
4284 |
21 Dec 16 |
nicklas |
585 |
JobIdentifier jobId = entry.getKey(); |
4284 |
21 Dec 16 |
nicklas |
586 |
|
4309 |
17 Jan 17 |
nicklas |
// Get new or existing session |
4309 |
17 Jan 17 |
nicklas |
588 |
OpenGridSession session = null; |
4309 |
17 Jan 17 |
nicklas |
589 |
if (isDefined(jobId.getClusterId())) |
4309 |
17 Jan 17 |
nicklas |
590 |
{ |
4309 |
17 Jan 17 |
nicklas |
591 |
session = getSession(jobId, sessions); |
4309 |
17 Jan 17 |
nicklas |
// If we could not get connection to cluster, skip this and continue with next job |
4309 |
17 Jan 17 |
nicklas |
593 |
if (session == null) continue; |
4309 |
17 Jan 17 |
nicklas |
594 |
} |
4284 |
21 Dec 16 |
nicklas |
595 |
|
4284 |
21 Dec 16 |
nicklas |
596 |
JobStatusUpdater updater = entry.getValue(); |
4284 |
21 Dec 16 |
nicklas |
597 |
JobStatus status = updater.getJobStatus(session, jobId); |
4284 |
21 Dec 16 |
nicklas |
598 |
|
4284 |
21 Dec 16 |
nicklas |
599 |
if (jobId.getBaseJobId() != 0) |
4284 |
21 Dec 16 |
nicklas |
600 |
{ |
4284 |
21 Dec 16 |
nicklas |
601 |
if (jobCompletionInvoker == null) |
4284 |
21 Dec 16 |
nicklas |
602 |
{ |
4284 |
21 Dec 16 |
nicklas |
603 |
jobCompletionInvoker = new JobCompletionInvoker(rootSc); |
4284 |
21 Dec 16 |
nicklas |
604 |
} |
4284 |
21 Dec 16 |
nicklas |
605 |
updateJobStatusInBase(status, session, jobCompletionInvoker); |
6830 |
02 Sep 22 |
nicklas |
606 |
updatedNow.add(jobId); |
4284 |
21 Dec 16 |
nicklas |
607 |
} |
4284 |
21 Dec 16 |
nicklas |
608 |
} |
4284 |
21 Dec 16 |
nicklas |
609 |
|
6830 |
02 Sep 22 |
nicklas |
// Update requests may have been registered during our processing here |
6830 |
02 Sep 22 |
nicklas |
// so we simply remove those requests |
6830 |
02 Sep 22 |
nicklas |
612 |
synchronized (jobsToUpdate) |
6830 |
02 Sep 22 |
nicklas |
613 |
{ |
6830 |
02 Sep 22 |
nicklas |
614 |
jobsToUpdate.removeAll(updatedNow); |
6830 |
02 Sep 22 |
nicklas |
615 |
} |
6830 |
02 Sep 22 |
nicklas |
616 |
synchronized (nonGridJobsToUpdate) |
6830 |
02 Sep 22 |
nicklas |
617 |
{ |
6830 |
02 Sep 22 |
nicklas |
618 |
nonGridJobsToUpdate.keySet().removeAll(updatedNow); |
6830 |
02 Sep 22 |
nicklas |
619 |
} |
6830 |
02 Sep 22 |
nicklas |
620 |
lastJobStatusUpdate = now; |
4205 |
03 Nov 16 |
nicklas |
621 |
} |
4205 |
03 Nov 16 |
nicklas |
622 |
} |
4205 |
03 Nov 16 |
nicklas |
623 |
finally |
4205 |
03 Nov 16 |
nicklas |
624 |
{ |
4205 |
03 Nov 16 |
nicklas |
625 |
for (OpenGridSession session : sessions.values()) |
4205 |
03 Nov 16 |
nicklas |
626 |
{ |
6071 |
20 Nov 20 |
nicklas |
627 |
if (session != null) |
6071 |
20 Nov 20 |
nicklas |
628 |
{ |
6071 |
20 Nov 20 |
nicklas |
629 |
logger.debug("Closing connection: "+ session.getHost()); |
6071 |
20 Nov 20 |
nicklas |
630 |
FileUtil.close(session); |
6071 |
20 Nov 20 |
nicklas |
631 |
} |
4205 |
03 Nov 16 |
nicklas |
632 |
} |
4205 |
03 Nov 16 |
nicklas |
633 |
} |
4203 |
02 Nov 16 |
nicklas |
634 |
} |
4203 |
02 Nov 16 |
nicklas |
635 |
|
7380 |
18 Oct 23 |
nicklas |
636 |
|
4203 |
02 Nov 16 |
nicklas |
637 |
/** |
7380 |
18 Oct 23 |
nicklas |
638 |
|
7380 |
18 Oct 23 |
nicklas |
639 |
*/ |
7380 |
18 Oct 23 |
nicklas |
640 |
void processCleanupTasks() |
7380 |
18 Oct 23 |
nicklas |
641 |
{ |
7380 |
18 Oct 23 |
nicklas |
642 |
long now = System.currentTimeMillis(); |
7380 |
18 Oct 23 |
nicklas |
643 |
long timeSinceLastCleanup = now - lastCleanup; |
7380 |
18 Oct 23 |
nicklas |
644 |
long minWaitInterval = logger.isDebugEnabled() ? MIN_CLEANUP_INTERVAL_DEBUG : MIN_CLEANUP_INTERVAL_NORMAL; |
7380 |
18 Oct 23 |
nicklas |
645 |
boolean doCleanup = timeSinceLastCleanup > minWaitInterval; |
7380 |
18 Oct 23 |
nicklas |
646 |
|
7380 |
18 Oct 23 |
nicklas |
647 |
logger.debug("Processing cleanup"); |
7380 |
18 Oct 23 |
nicklas |
648 |
if (logger.isTraceEnabled()) |
7380 |
18 Oct 23 |
nicklas |
649 |
{ |
7380 |
18 Oct 23 |
nicklas |
650 |
logger.trace("timeSinceLastCleanup=" + timeSinceLastCleanup); |
7380 |
18 Oct 23 |
nicklas |
651 |
logger.trace("doCleanup=" + doCleanup); |
7380 |
18 Oct 23 |
nicklas |
652 |
} |
7380 |
18 Oct 23 |
nicklas |
653 |
|
7380 |
18 Oct 23 |
nicklas |
654 |
if (!doCleanup) |
7380 |
18 Oct 23 |
nicklas |
655 |
{ |
7380 |
18 Oct 23 |
nicklas |
656 |
logger.debug("No cleanup since not long enough wait time ["+(timeSinceLastCleanup/1000)+" seconds]"); |
7380 |
18 Oct 23 |
nicklas |
657 |
return; |
7380 |
18 Oct 23 |
nicklas |
658 |
} |
7380 |
18 Oct 23 |
nicklas |
659 |
|
7380 |
18 Oct 23 |
nicklas |
660 |
for (OpenGridCluster cluster : clusters.values()) |
7380 |
18 Oct 23 |
nicklas |
661 |
{ |
7380 |
18 Oct 23 |
nicklas |
// Number of days; 0 or missing=no automatic removal |
7380 |
18 Oct 23 |
nicklas |
663 |
ClusterConfig cfg = cluster.getConfig(); |
7380 |
18 Oct 23 |
nicklas |
664 |
int autoRemove = Values.getInt(cfg.getCustomOption("auto-remove-job-folders"), 0); |
7380 |
18 Oct 23 |
nicklas |
665 |
if (autoRemove > 0) |
7380 |
18 Oct 23 |
nicklas |
666 |
{ |
7380 |
18 Oct 23 |
nicklas |
667 |
OpenGridSession session = null; |
7380 |
18 Oct 23 |
nicklas |
668 |
try |
7380 |
18 Oct 23 |
nicklas |
669 |
{ |
7380 |
18 Oct 23 |
nicklas |
670 |
logger.debug("Connecting to cluster: " + cluster); |
7380 |
18 Oct 23 |
nicklas |
671 |
session = cluster.connect(5); |
7380 |
18 Oct 23 |
nicklas |
672 |
CmdResult<String> findAndDelete = session.executeCmd("find "+cfg.getJobFolder()+" -mindepth 1 -maxdepth 1 -type d -mtime +"+autoRemove+" -print -exec rm -rf {} \\;", 30); |
7380 |
18 Oct 23 |
nicklas |
673 |
if (findAndDelete.getExitStatus() == 0) |
7380 |
18 Oct 23 |
nicklas |
674 |
{ |
7380 |
18 Oct 23 |
nicklas |
675 |
String[] deleted = findAndDelete.getStdout().split("\n"); |
7380 |
18 Oct 23 |
nicklas |
676 |
int numDeleted = 0; |
7380 |
18 Oct 23 |
nicklas |
677 |
for (String folder : deleted) |
7380 |
18 Oct 23 |
nicklas |
678 |
{ |
7380 |
18 Oct 23 |
nicklas |
679 |
if (folder.length() > 0) |
7380 |
18 Oct 23 |
nicklas |
680 |
{ |
7380 |
18 Oct 23 |
nicklas |
681 |
numDeleted++; |
7380 |
18 Oct 23 |
nicklas |
682 |
logger.trace("Removed folder: "+folder); |
7380 |
18 Oct 23 |
nicklas |
683 |
} |
7380 |
18 Oct 23 |
nicklas |
684 |
} |
7380 |
18 Oct 23 |
nicklas |
685 |
logger.info("Removed "+numDeleted+" job folders on cluster: "+cluster); |
7380 |
18 Oct 23 |
nicklas |
686 |
} |
7380 |
18 Oct 23 |
nicklas |
687 |
} |
7380 |
18 Oct 23 |
nicklas |
688 |
catch (RuntimeException ex) |
7380 |
18 Oct 23 |
nicklas |
689 |
{ |
7380 |
18 Oct 23 |
nicklas |
690 |
logger.warn("Could not connect to cluster: " + cluster.getId(), ex); |
7380 |
18 Oct 23 |
nicklas |
691 |
} |
7380 |
18 Oct 23 |
nicklas |
692 |
finally |
7380 |
18 Oct 23 |
nicklas |
693 |
{ |
7380 |
18 Oct 23 |
nicklas |
694 |
FileUtil.close(session); |
7380 |
18 Oct 23 |
nicklas |
695 |
} |
7380 |
18 Oct 23 |
nicklas |
696 |
} |
7380 |
18 Oct 23 |
nicklas |
697 |
else |
7380 |
18 Oct 23 |
nicklas |
698 |
{ |
7380 |
18 Oct 23 |
nicklas |
699 |
logger.debug("No automatic cleanup on cluster: "+cluster); |
7380 |
18 Oct 23 |
nicklas |
700 |
} |
7380 |
18 Oct 23 |
nicklas |
701 |
} |
7380 |
18 Oct 23 |
nicklas |
702 |
} |
7380 |
18 Oct 23 |
nicklas |
703 |
|
7380 |
18 Oct 23 |
nicklas |
704 |
|
7380 |
18 Oct 23 |
nicklas |
705 |
/** |
4205 |
03 Nov 16 |
nicklas |
Get an existing session from the cache or create a new connection if |
4205 |
03 Nov 16 |
nicklas |
it doesn't exists. If no connection could be created null is returned. |
4205 |
03 Nov 16 |
nicklas |
708 |
*/ |
4205 |
03 Nov 16 |
nicklas |
709 |
private OpenGridSession getSession(JobIdentifier jobId, Map<String, OpenGridSession> sessions) |
4205 |
03 Nov 16 |
nicklas |
710 |
{ |
4205 |
03 Nov 16 |
nicklas |
711 |
OpenGridSession session = null; |
4205 |
03 Nov 16 |
nicklas |
712 |
if (sessions.containsKey(jobId.getClusterId())) |
4205 |
03 Nov 16 |
nicklas |
713 |
{ |
4205 |
03 Nov 16 |
nicklas |
714 |
session = sessions.get(jobId.getClusterId()); |
4205 |
03 Nov 16 |
nicklas |
715 |
} |
4205 |
03 Nov 16 |
nicklas |
716 |
else |
4205 |
03 Nov 16 |
nicklas |
717 |
{ |
4205 |
03 Nov 16 |
nicklas |
718 |
session = tryConnect(jobId); |
4205 |
03 Nov 16 |
nicklas |
719 |
sessions.put(jobId.getClusterId(), session); |
4205 |
03 Nov 16 |
nicklas |
720 |
} |
4205 |
03 Nov 16 |
nicklas |
721 |
return session; |
4205 |
03 Nov 16 |
nicklas |
722 |
} |
4205 |
03 Nov 16 |
nicklas |
723 |
|
4205 |
03 Nov 16 |
nicklas |
724 |
/** |
4205 |
03 Nov 16 |
nicklas |
Try to connect to the cluster referenced by the job identifier. |
4205 |
03 Nov 16 |
nicklas |
Exceptions are handled inside the method which only return null |
4205 |
03 Nov 16 |
nicklas |
if no connection could be established. |
4205 |
03 Nov 16 |
nicklas |
728 |
*/ |
4205 |
03 Nov 16 |
nicklas |
729 |
private OpenGridSession tryConnect(JobIdentifier jobId) |
4205 |
03 Nov 16 |
nicklas |
730 |
{ |
4205 |
03 Nov 16 |
nicklas |
731 |
OpenGridSession session = null; |
4205 |
03 Nov 16 |
nicklas |
732 |
logger.debug("Connecting to cluster: " + jobId.getClusterId()); |
4205 |
03 Nov 16 |
nicklas |
733 |
OpenGridCluster cluster = clusters.get(jobId.getClusterId()); |
4205 |
03 Nov 16 |
nicklas |
734 |
if (cluster == null) |
4205 |
03 Nov 16 |
nicklas |
735 |
{ |
4205 |
03 Nov 16 |
nicklas |
736 |
logger.warn("Unknown cluster specified by job: " + jobId); |
4205 |
03 Nov 16 |
nicklas |
737 |
} |
4205 |
03 Nov 16 |
nicklas |
738 |
else |
4205 |
03 Nov 16 |
nicklas |
739 |
{ |
4205 |
03 Nov 16 |
nicklas |
740 |
try |
4205 |
03 Nov 16 |
nicklas |
741 |
{ |
4205 |
03 Nov 16 |
nicklas |
742 |
session = cluster.connect(5); |
4205 |
03 Nov 16 |
nicklas |
743 |
} |
4205 |
03 Nov 16 |
nicklas |
744 |
catch (RuntimeException ex) |
4205 |
03 Nov 16 |
nicklas |
745 |
{ |
4205 |
03 Nov 16 |
nicklas |
746 |
logger.warn("Could not connect to cluster: " + jobId, ex); |
4205 |
03 Nov 16 |
nicklas |
747 |
} |
4205 |
03 Nov 16 |
nicklas |
748 |
} |
4205 |
03 Nov 16 |
nicklas |
749 |
return session; |
4205 |
03 Nov 16 |
nicklas |
750 |
} |
4205 |
03 Nov 16 |
nicklas |
751 |
|
4205 |
03 Nov 16 |
nicklas |
752 |
/** |
4205 |
03 Nov 16 |
nicklas |
Set ERROR status on a BASE job. This is done in a |
4205 |
03 Nov 16 |
nicklas |
separate transaction. Exceptions are logged but not |
4205 |
03 Nov 16 |
nicklas |
re-thrown out of this method. |
4205 |
03 Nov 16 |
nicklas |
@return TRUE if the operation succeeded, FALSE if not |
4205 |
03 Nov 16 |
nicklas |
757 |
*/ |
4205 |
03 Nov 16 |
nicklas |
758 |
private boolean setErrorOnBaseJob(JobIdentifier jobId, String msg) |
4205 |
03 Nov 16 |
nicklas |
759 |
{ |
4205 |
03 Nov 16 |
nicklas |
760 |
DbControl dc = null; |
4205 |
03 Nov 16 |
nicklas |
761 |
boolean ok = false; |
4205 |
03 Nov 16 |
nicklas |
762 |
try |
4205 |
03 Nov 16 |
nicklas |
763 |
{ |
4205 |
03 Nov 16 |
nicklas |
764 |
dc = rootSc.newDbControl(); |
4205 |
03 Nov 16 |
nicklas |
765 |
Job baseJob = Job.getById(dc, jobId.getBaseJobId()); |
4205 |
03 Nov 16 |
nicklas |
766 |
baseJob.doneError(msg); |
4205 |
03 Nov 16 |
nicklas |
767 |
dc.commit(); |
4205 |
03 Nov 16 |
nicklas |
768 |
ok = true; |
4205 |
03 Nov 16 |
nicklas |
769 |
} |
4205 |
03 Nov 16 |
nicklas |
770 |
catch (RuntimeException ex) |
4205 |
03 Nov 16 |
nicklas |
771 |
{ |
4205 |
03 Nov 16 |
nicklas |
772 |
logger.warn("Could not set ERROR status on job: " + jobId, ex); |
4205 |
03 Nov 16 |
nicklas |
773 |
} |
4205 |
03 Nov 16 |
nicklas |
774 |
finally |
4205 |
03 Nov 16 |
nicklas |
775 |
{ |
4205 |
03 Nov 16 |
nicklas |
776 |
if (dc != null) dc.close(); |
4205 |
03 Nov 16 |
nicklas |
777 |
} |
4205 |
03 Nov 16 |
nicklas |
778 |
return ok; |
4205 |
03 Nov 16 |
nicklas |
779 |
} |
4205 |
03 Nov 16 |
nicklas |
780 |
|
4302 |
16 Jan 17 |
nicklas |
781 |
/** |
4302 |
16 Jan 17 |
nicklas |
Update the progress status on a BASE job. This is done in a |
4302 |
16 Jan 17 |
nicklas |
separate transaction. Exceptions are logged but not |
4302 |
16 Jan 17 |
nicklas |
re-thrown out of this method. |
4302 |
16 Jan 17 |
nicklas |
@return TRUE if the operation succeeded, FALSE if not |
4302 |
16 Jan 17 |
nicklas |
786 |
*/ |
4302 |
16 Jan 17 |
nicklas |
787 |
private boolean setProgressOnBaseJob(JobIdentifier jobId, int progress, String msg) |
4302 |
16 Jan 17 |
nicklas |
788 |
{ |
4302 |
16 Jan 17 |
nicklas |
789 |
DbControl dc = null; |
4302 |
16 Jan 17 |
nicklas |
790 |
boolean ok = false; |
4302 |
16 Jan 17 |
nicklas |
791 |
try |
4302 |
16 Jan 17 |
nicklas |
792 |
{ |
4302 |
16 Jan 17 |
nicklas |
793 |
dc = rootSc.newDbControl(); |
4302 |
16 Jan 17 |
nicklas |
794 |
Job baseJob = Job.getById(dc, jobId.getBaseJobId()); |
4302 |
16 Jan 17 |
nicklas |
795 |
baseJob.setProgress(progress, msg); |
4302 |
16 Jan 17 |
nicklas |
796 |
dc.commit(); |
4302 |
16 Jan 17 |
nicklas |
797 |
ok = true; |
4302 |
16 Jan 17 |
nicklas |
798 |
} |
4302 |
16 Jan 17 |
nicklas |
799 |
catch (RuntimeException ex) |
4302 |
16 Jan 17 |
nicklas |
800 |
{ |
4302 |
16 Jan 17 |
nicklas |
801 |
logger.warn("Could not set progress on job: " + jobId, ex); |
4302 |
16 Jan 17 |
nicklas |
802 |
} |
4302 |
16 Jan 17 |
nicklas |
803 |
finally |
4302 |
16 Jan 17 |
nicklas |
804 |
{ |
4302 |
16 Jan 17 |
nicklas |
805 |
if (dc != null) dc.close(); |
4302 |
16 Jan 17 |
nicklas |
806 |
} |
4302 |
16 Jan 17 |
nicklas |
807 |
return ok; |
4302 |
16 Jan 17 |
nicklas |
808 |
} |
4302 |
16 Jan 17 |
nicklas |
809 |
|
4222 |
09 Nov 16 |
nicklas |
810 |
|
4245 |
21 Nov 16 |
nicklas |
811 |
private boolean updateJobStatusInBase(JobStatus jobStatus, OpenGridSession session, JobCompletionInvoker jobCompletionInvoker) |
4222 |
09 Nov 16 |
nicklas |
812 |
{ |
4222 |
09 Nov 16 |
nicklas |
813 |
JobIdentifier jobId = jobStatus.getJobIdentifier(); |
4222 |
09 Nov 16 |
nicklas |
814 |
DbControl dc = null; |
4222 |
09 Nov 16 |
nicklas |
815 |
boolean ok = false; |
4222 |
09 Nov 16 |
nicklas |
816 |
try |
4222 |
09 Nov 16 |
nicklas |
817 |
{ |
4222 |
09 Nov 16 |
nicklas |
818 |
dc = rootSc.newDbControl(); |
4222 |
09 Nov 16 |
nicklas |
819 |
|
4222 |
09 Nov 16 |
nicklas |
820 |
Job baseJob = Job.getById(dc, jobId.getBaseJobId()); |
4222 |
09 Nov 16 |
nicklas |
821 |
Job.Status currentStatus = baseJob.getStatus(); |
4222 |
09 Nov 16 |
nicklas |
822 |
Job.Status newStatus = jobStatus.getStatus(); |
4222 |
09 Nov 16 |
nicklas |
823 |
|
4222 |
09 Nov 16 |
nicklas |
824 |
if (logger.isDebugEnabled()) |
4222 |
09 Nov 16 |
nicklas |
825 |
{ |
4222 |
09 Nov 16 |
nicklas |
826 |
logger.debug("updateJobStatusInBase: " + jobId + "; current=" + currentStatus + "; new="+newStatus); |
4222 |
09 Nov 16 |
nicklas |
827 |
} |
4222 |
09 Nov 16 |
nicklas |
828 |
|
4302 |
16 Jan 17 |
nicklas |
829 |
if (currentStatus == Job.Status.DONE || currentStatus == Job.Status.ERROR) |
4222 |
09 Nov 16 |
nicklas |
830 |
{ |
4222 |
09 Nov 16 |
nicklas |
// Skip update since the current status seems to have changed |
4222 |
09 Nov 16 |
nicklas |
// since the request for a status update was made |
4222 |
09 Nov 16 |
nicklas |
// (eg. due to manual changes) |
4222 |
09 Nov 16 |
nicklas |
834 |
return false; |
4222 |
09 Nov 16 |
nicklas |
835 |
} |
4222 |
09 Nov 16 |
nicklas |
836 |
|
4302 |
16 Jan 17 |
nicklas |
// Basically, we now know that the current status is either WAITING, EXECUTING or ABORTING |
4222 |
09 Nov 16 |
nicklas |
838 |
if (newStatus == Job.Status.WAITING) |
4222 |
09 Nov 16 |
nicklas |
839 |
{ |
4222 |
09 Nov 16 |
nicklas |
// Still waiting so there is nothing to do |
4222 |
09 Nov 16 |
nicklas |
841 |
return false; |
4222 |
09 Nov 16 |
nicklas |
842 |
} |
4222 |
09 Nov 16 |
nicklas |
843 |
|
4263 |
14 Dec 16 |
nicklas |
// If we get here we know for sure that the job is executing or has ended |
4222 |
09 Nov 16 |
nicklas |
845 |
String serverName = jobId.getClusterId(); |
4263 |
14 Dec 16 |
nicklas |
846 |
String nodeName = jobStatus.getNodeName(); |
4222 |
09 Nov 16 |
nicklas |
847 |
|
4222 |
09 Nov 16 |
nicklas |
848 |
if (currentStatus == Job.Status.WAITING) |
4222 |
09 Nov 16 |
nicklas |
849 |
{ |
4263 |
14 Dec 16 |
nicklas |
850 |
baseJob.start(null, serverName, nodeName, null, jobStatus.getStartDate()); |
4222 |
09 Nov 16 |
nicklas |
851 |
} |
4222 |
09 Nov 16 |
nicklas |
852 |
|
4222 |
09 Nov 16 |
nicklas |
853 |
if (newStatus == Job.Status.EXECUTING) |
4222 |
09 Nov 16 |
nicklas |
854 |
{ |
4222 |
09 Nov 16 |
nicklas |
855 |
baseJob.setProgress(jobStatus.getProgress(), jobStatus.getMessage()); |
4222 |
09 Nov 16 |
nicklas |
856 |
} |
4222 |
09 Nov 16 |
nicklas |
857 |
else if (newStatus == Job.Status.DONE) |
4222 |
09 Nov 16 |
nicklas |
858 |
{ |
4245 |
21 Nov 16 |
nicklas |
859 |
try |
4245 |
21 Nov 16 |
nicklas |
860 |
{ |
4245 |
21 Nov 16 |
nicklas |
861 |
String msg = jobCompletionInvoker.call(session, baseJob, jobStatus); |
4245 |
21 Nov 16 |
nicklas |
862 |
if (msg == null) msg = "Job completed"; |
4245 |
21 Nov 16 |
nicklas |
863 |
baseJob.doneOk(msg, jobStatus.getEndDate()); |
4245 |
21 Nov 16 |
nicklas |
864 |
} |
4245 |
21 Nov 16 |
nicklas |
865 |
catch (Exception ex) |
4245 |
21 Nov 16 |
nicklas |
866 |
{ |
4245 |
21 Nov 16 |
nicklas |
867 |
baseJob.doneError(StringUtil.trimString(ex.getMessage(), 200), |
4245 |
21 Nov 16 |
nicklas |
868 |
Collections.singleton(ex), jobStatus.getEndDate()); |
4245 |
21 Nov 16 |
nicklas |
869 |
} |
4222 |
09 Nov 16 |
nicklas |
870 |
} |
4222 |
09 Nov 16 |
nicklas |
871 |
else if (newStatus == Job.Status.ERROR) |
4222 |
09 Nov 16 |
nicklas |
872 |
{ |
4229 |
10 Nov 16 |
nicklas |
873 |
Exception ex = null; |
4245 |
21 Nov 16 |
nicklas |
874 |
String msg = null; |
4245 |
21 Nov 16 |
nicklas |
875 |
try |
4229 |
10 Nov 16 |
nicklas |
876 |
{ |
4245 |
21 Nov 16 |
nicklas |
877 |
msg = jobCompletionInvoker.call(session, baseJob, jobStatus); |
4245 |
21 Nov 16 |
nicklas |
878 |
if (msg != null) |
4245 |
21 Nov 16 |
nicklas |
879 |
{ |
4245 |
21 Nov 16 |
nicklas |
880 |
ex = new RuntimeException(StringUtil.trimString(msg, Job.MAX_STACK_TRACE_LENGTH)); |
4245 |
21 Nov 16 |
nicklas |
881 |
} |
4245 |
21 Nov 16 |
nicklas |
882 |
else |
4245 |
21 Nov 16 |
nicklas |
883 |
{ |
4302 |
16 Jan 17 |
nicklas |
884 |
msg = jobStatus.getExitCode() == 137 ? "Aborted by user." : "Job failed for an unknown reason."; |
4245 |
21 Nov 16 |
nicklas |
885 |
} |
4229 |
10 Nov 16 |
nicklas |
886 |
} |
4245 |
21 Nov 16 |
nicklas |
887 |
catch (Exception e) |
4245 |
21 Nov 16 |
nicklas |
888 |
{ |
4245 |
21 Nov 16 |
nicklas |
889 |
ex = e; |
4245 |
21 Nov 16 |
nicklas |
890 |
msg = ex.getMessage(); |
4245 |
21 Nov 16 |
nicklas |
891 |
} |
4245 |
21 Nov 16 |
nicklas |
892 |
|
4245 |
21 Nov 16 |
nicklas |
893 |
baseJob.doneError(StringUtil.trimString("[" + jobStatus.getExitCode() + "] " + msg, 200), |
4245 |
21 Nov 16 |
nicklas |
894 |
ex == null ? null : Collections.singleton(ex), jobStatus.getEndDate()); |
4222 |
09 Nov 16 |
nicklas |
895 |
} |
4222 |
09 Nov 16 |
nicklas |
896 |
|
4222 |
09 Nov 16 |
nicklas |
897 |
dc.commit(); |
4222 |
09 Nov 16 |
nicklas |
898 |
ok = true; |
4222 |
09 Nov 16 |
nicklas |
899 |
} |
4222 |
09 Nov 16 |
nicklas |
900 |
catch (RuntimeException ex) |
4222 |
09 Nov 16 |
nicklas |
901 |
{ |
4222 |
09 Nov 16 |
nicklas |
902 |
logger.warn("Could not update status on job: " + jobId, ex); |
4222 |
09 Nov 16 |
nicklas |
903 |
} |
4222 |
09 Nov 16 |
nicklas |
904 |
finally |
4222 |
09 Nov 16 |
nicklas |
905 |
{ |
4222 |
09 Nov 16 |
nicklas |
906 |
if (dc != null) dc.close(); |
4222 |
09 Nov 16 |
nicklas |
907 |
} |
4222 |
09 Nov 16 |
nicklas |
908 |
return ok; |
4222 |
09 Nov 16 |
nicklas |
909 |
} |
4222 |
09 Nov 16 |
nicklas |
910 |
|
4205 |
03 Nov 16 |
nicklas |
911 |
/** |
4203 |
02 Nov 16 |
nicklas |
Task for checking current status of jobs that has been |
4203 |
02 Nov 16 |
nicklas |
sent to the cluster. |
4203 |
02 Nov 16 |
nicklas |
914 |
*/ |
4203 |
02 Nov 16 |
nicklas |
915 |
static class JobStatusTimerTask |
4203 |
02 Nov 16 |
nicklas |
916 |
extends TimerTask |
4203 |
02 Nov 16 |
nicklas |
917 |
{ |
4203 |
02 Nov 16 |
nicklas |
918 |
JobStatusTimerTask() |
4203 |
02 Nov 16 |
nicklas |
919 |
{} |
4203 |
02 Nov 16 |
nicklas |
920 |
|
4203 |
02 Nov 16 |
nicklas |
921 |
@Override |
4203 |
02 Nov 16 |
nicklas |
922 |
public void run() |
4203 |
02 Nov 16 |
nicklas |
923 |
{ |
7380 |
18 Oct 23 |
nicklas |
924 |
logger.trace("Job status timer running"); |
4205 |
03 Nov 16 |
nicklas |
925 |
getInstance().processAsyncRequests(); |
4203 |
02 Nov 16 |
nicklas |
926 |
} |
4203 |
02 Nov 16 |
nicklas |
927 |
|
4203 |
02 Nov 16 |
nicklas |
928 |
} |
4245 |
21 Nov 16 |
nicklas |
929 |
|
7380 |
18 Oct 23 |
nicklas |
930 |
/** |
7380 |
18 Oct 23 |
nicklas |
Task for checking current status of jobs that has been |
7380 |
18 Oct 23 |
nicklas |
sent to the cluster. |
7380 |
18 Oct 23 |
nicklas |
933 |
*/ |
7380 |
18 Oct 23 |
nicklas |
934 |
static class CleanupTimerTask |
7380 |
18 Oct 23 |
nicklas |
935 |
extends TimerTask |
7380 |
18 Oct 23 |
nicklas |
936 |
{ |
7380 |
18 Oct 23 |
nicklas |
937 |
CleanupTimerTask() |
7380 |
18 Oct 23 |
nicklas |
938 |
{} |
7380 |
18 Oct 23 |
nicklas |
939 |
|
7380 |
18 Oct 23 |
nicklas |
940 |
@Override |
7380 |
18 Oct 23 |
nicklas |
941 |
public void run() |
7380 |
18 Oct 23 |
nicklas |
942 |
{ |
7380 |
18 Oct 23 |
nicklas |
943 |
logger.trace("Cleanup timer running"); |
7380 |
18 Oct 23 |
nicklas |
944 |
getInstance().processCleanupTasks(); |
7380 |
18 Oct 23 |
nicklas |
945 |
} |
7380 |
18 Oct 23 |
nicklas |
946 |
} |
7380 |
18 Oct 23 |
nicklas |
947 |
|
4245 |
21 Nov 16 |
nicklas |
948 |
static class JobCompletionInvoker |
4245 |
21 Nov 16 |
nicklas |
949 |
{ |
4245 |
21 Nov 16 |
nicklas |
950 |
|
4245 |
21 Nov 16 |
nicklas |
951 |
private final SessionControl rootSc; |
4245 |
21 Nov 16 |
nicklas |
952 |
private final ClientContext context; |
4245 |
21 Nov 16 |
nicklas |
953 |
private final ExtensionsInvoker<JobCompletionHandler> invoker; |
4245 |
21 Nov 16 |
nicklas |
954 |
|
4245 |
21 Nov 16 |
nicklas |
955 |
JobCompletionInvoker(SessionControl rootSc) |
4245 |
21 Nov 16 |
nicklas |
956 |
{ |
4245 |
21 Nov 16 |
nicklas |
957 |
this.rootSc = rootSc; |
4245 |
21 Nov 16 |
nicklas |
958 |
this.context = new ClientContext(rootSc); |
4245 |
21 Nov 16 |
nicklas |
959 |
|
4245 |
21 Nov 16 |
nicklas |
960 |
Registry registry = Application.getExtensionsManager().getRegistry(); |
4245 |
21 Nov 16 |
nicklas |
961 |
Settings settings = Application.getExtensionsManager().getSettings(); |
5505 |
17 Jun 19 |
nicklas |
962 |
this.invoker = registry.useExtensions( |
4245 |
21 Nov 16 |
nicklas |
963 |
context, settings, new String[] { "net.sf.basedb.opengrid.job-complete" }); |
4245 |
21 Nov 16 |
nicklas |
964 |
} |
4245 |
21 Nov 16 |
nicklas |
965 |
|
4245 |
21 Nov 16 |
nicklas |
966 |
/** |
4245 |
21 Nov 16 |
nicklas |
Call the extensions for the given job. |
4245 |
21 Nov 16 |
nicklas |
@return The messages from the extensions (separated with newline). If |
4245 |
21 Nov 16 |
nicklas |
no extension returns a message, the message from the job status is |
4245 |
21 Nov 16 |
nicklas |
used. |
4245 |
21 Nov 16 |
nicklas |
971 |
*/ |
4245 |
21 Nov 16 |
nicklas |
972 |
String call(OpenGridSession session, Job baseJob, JobStatus jobStatus) |
4245 |
21 Nov 16 |
nicklas |
973 |
{ |
4245 |
21 Nov 16 |
nicklas |
974 |
context.setCurrentItem(baseJob); |
4245 |
21 Nov 16 |
nicklas |
975 |
context.setAttribute("job-status", jobStatus); |
4245 |
21 Nov 16 |
nicklas |
976 |
|
4245 |
21 Nov 16 |
nicklas |
977 |
SessionControl impersonated = null; |
4245 |
21 Nov 16 |
nicklas |
978 |
SessionControl forJob = null; |
4245 |
21 Nov 16 |
nicklas |
979 |
List<String> messages = new ArrayList<>(); |
4245 |
21 Nov 16 |
nicklas |
980 |
try |
4245 |
21 Nov 16 |
nicklas |
981 |
{ |
4245 |
21 Nov 16 |
nicklas |
// A single try-catch since we want processing to be aborted |
4245 |
21 Nov 16 |
nicklas |
// as soon as a handler throws an exception |
4245 |
21 Nov 16 |
nicklas |
984 |
ActionIterator<JobCompletionHandler> it = invoker.iterate(); |
4245 |
21 Nov 16 |
nicklas |
985 |
while (it.hasNext()) |
4245 |
21 Nov 16 |
nicklas |
986 |
{ |
4245 |
21 Nov 16 |
nicklas |
987 |
JobCompletionHandler handler = it.next(); |
4245 |
21 Nov 16 |
nicklas |
988 |
if (handler == null) continue; |
4245 |
21 Nov 16 |
nicklas |
989 |
|
4245 |
21 Nov 16 |
nicklas |
990 |
if (impersonated == null) |
4245 |
21 Nov 16 |
nicklas |
991 |
{ |
4245 |
21 Nov 16 |
nicklas |
// Create an impersonated session control |
4245 |
21 Nov 16 |
nicklas |
993 |
impersonated = rootSc.impersonateLogin(baseJob, "Finalizing job: " + baseJob.getName()); |
4245 |
21 Nov 16 |
nicklas |
994 |
if (baseJob.getActiveProjectId() > 0) |
4245 |
21 Nov 16 |
nicklas |
995 |
{ |
4245 |
21 Nov 16 |
nicklas |
996 |
impersonated.setActiveProject(Project.getById(baseJob.getDbControl(), baseJob.getActiveProjectId())); |
4245 |
21 Nov 16 |
nicklas |
997 |
} |
4245 |
21 Nov 16 |
nicklas |
// Create a delegate job session control for proper change history logging |
4245 |
21 Nov 16 |
nicklas |
999 |
forJob = impersonated.getJobSessionControl(baseJob); |
4245 |
21 Nov 16 |
nicklas |
1000 |
} |
4245 |
21 Nov 16 |
nicklas |
1001 |
|
4245 |
21 Nov 16 |
nicklas |
1002 |
messages.add(handler.jobCompleted(forJob, session, baseJob, jobStatus)); |
4245 |
21 Nov 16 |
nicklas |
1003 |
} |
4245 |
21 Nov 16 |
nicklas |
1004 |
} |
4245 |
21 Nov 16 |
nicklas |
1005 |
finally |
4245 |
21 Nov 16 |
nicklas |
1006 |
{ |
4245 |
21 Nov 16 |
nicklas |
1007 |
if (forJob != null) forJob.close(); |
4245 |
21 Nov 16 |
nicklas |
1008 |
if (impersonated != null) impersonated.close(); |
4245 |
21 Nov 16 |
nicklas |
1009 |
} |
4245 |
21 Nov 16 |
nicklas |
1010 |
|
4245 |
21 Nov 16 |
nicklas |
1011 |
String msg = Values.getString(messages, "\n", true); |
4302 |
16 Jan 17 |
nicklas |
1012 |
if (msg.length() == 0) msg = Values.getStringOrNull(jobStatus.getMessage()); |
4245 |
21 Nov 16 |
nicklas |
1013 |
return msg; |
4245 |
21 Nov 16 |
nicklas |
1014 |
} |
4203 |
02 Nov 16 |
nicklas |
1015 |
|
4245 |
21 Nov 16 |
nicklas |
1016 |
} |
4245 |
21 Nov 16 |
nicklas |
1017 |
|
4203 |
02 Nov 16 |
nicklas |
1018 |
} |