extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/ReleaseExporter.java

Code
Comments
Other
Rev Date Author Line
3932 13 May 16 nicklas 1 package net.sf.basedb.reggie.plugins.release;
3932 13 May 16 nicklas 2
3934 13 May 16 nicklas 3 import java.util.HashMap;
5198 19 Dec 18 nicklas 4 import java.util.HashSet;
3932 13 May 16 nicklas 5 import java.util.List;
3933 13 May 16 nicklas 6 import java.util.Map;
7022 06 Feb 23 nicklas 7 import java.util.Queue;
5198 19 Dec 18 nicklas 8 import java.util.Set;
4466 25 Apr 17 nicklas 9 import java.util.TreeMap;
7022 06 Feb 23 nicklas 10 import java.util.concurrent.ConcurrentLinkedQueue;
7022 06 Feb 23 nicklas 11 import java.util.concurrent.ExecutorService;
7022 06 Feb 23 nicklas 12 import java.util.concurrent.Executors;
7022 06 Feb 23 nicklas 13 import java.util.concurrent.TimeUnit;
3932 13 May 16 nicklas 14
4466 25 Apr 17 nicklas 15 import net.sf.basedb.core.Application;
5089 14 Nov 18 nicklas 16 import net.sf.basedb.core.BioSource;
3932 13 May 16 nicklas 17 import net.sf.basedb.core.DbControl;
5198 19 Dec 18 nicklas 18 import net.sf.basedb.core.Extract;
3932 13 May 16 nicklas 19 import net.sf.basedb.core.InvalidDataException;
3932 13 May 16 nicklas 20 import net.sf.basedb.core.Item;
3932 13 May 16 nicklas 21 import net.sf.basedb.core.ItemList;
3932 13 May 16 nicklas 22 import net.sf.basedb.core.ItemQuery;
3932 13 May 16 nicklas 23 import net.sf.basedb.core.ProgressReporter;
7022 06 Feb 23 nicklas 24 import net.sf.basedb.core.SessionControl;
5200 19 Dec 18 nicklas 25 import net.sf.basedb.core.SimpleProgressReporter;
4367 28 Feb 17 nicklas 26 import net.sf.basedb.core.plugin.ExportOutputStream;
3932 13 May 16 nicklas 27 import net.sf.basedb.core.query.Hql;
3932 13 May 16 nicklas 28 import net.sf.basedb.core.query.Orders;
7022 06 Feb 23 nicklas 29 import net.sf.basedb.core.signal.SignalException;
3935 13 May 16 nicklas 30 import net.sf.basedb.core.snapshot.SnapshotManager;
7071 20 Mar 23 nicklas 31 import net.sf.basedb.reggie.ReggieThreadFactory;
3932 13 May 16 nicklas 32 import net.sf.basedb.reggie.Reggie;
3933 13 May 16 nicklas 33 import net.sf.basedb.util.FileUtil;
3932 13 May 16 nicklas 34
3932 13 May 16 nicklas 35 /**
3932 13 May 16 nicklas 36   Main class for exporting the release files. Before using
4378 03 Mar 17 nicklas 37   the {@link #setOutputLocation(OutputLocation)} and 
3932 13 May 16 nicklas 38   {@link #setItemList(ItemList)} must be called. 
3932 13 May 16 nicklas 39   
3932 13 May 16 nicklas 40   Use the {@link #createAllReleaseFiles(DbControl, ProgressReporter)}
3932 13 May 16 nicklas 41   to create the files. Note that the exporter may create other DbControl:s
3932 13 May 16 nicklas 42   for reading information from the database. This is needed to let garbage
3932 13 May 16 nicklas 43   collection clean up data that has been processed and is no longer
3932 13 May 16 nicklas 44   needed. The supplied DbControl is used for all modifications
3932 13 May 16 nicklas 45   and file creations.
3932 13 May 16 nicklas 46   
3932 13 May 16 nicklas 47   @author nicklas
3932 13 May 16 nicklas 48   @since 4.5
3932 13 May 16 nicklas 49 */
3932 13 May 16 nicklas 50 public class ReleaseExporter 
3932 13 May 16 nicklas 51 {
3933 13 May 16 nicklas 52
4339 07 Feb 17 nicklas 53   private OutputLocation outputLocation;
3933 13 May 16 nicklas 54
7022 06 Feb 23 nicklas 55   private final ReleaseWriterOptions options;
3932 13 May 16 nicklas 56   
3932 13 May 16 nicklas 57   private ItemList list;
5089 14 Nov 18 nicklas 58   private List<BioSource> biosources;
3932 13 May 16 nicklas 59   
4466 25 Apr 17 nicklas 60   private final Map<String, Map<String, String>> batchIndexProxies;
5198 19 Dec 18 nicklas 61   private Set<Integer> libsWithFlaggedAlignment;
4466 25 Apr 17 nicklas 62   
4079 06 Sep 16 nicklas 63   public ReleaseExporter(ReleaseWriterOptions options) 
3932 13 May 16 nicklas 64   {
4079 06 Sep 16 nicklas 65     this.options = options;
4466 25 Apr 17 nicklas 66     this.batchIndexProxies = new HashMap<>();
3932 13 May 16 nicklas 67   }
3932 13 May 16 nicklas 68
3932 13 May 16 nicklas 69   /**
7034 10 Feb 23 nicklas 70     Get the options for the export.
7034 10 Feb 23 nicklas 71     @since 4.44
7034 10 Feb 23 nicklas 72   */
7034 10 Feb 23 nicklas 73   public ReleaseWriterOptions getOptions()
7034 10 Feb 23 nicklas 74   {
7034 10 Feb 23 nicklas 75     return options;
7034 10 Feb 23 nicklas 76   }
7034 10 Feb 23 nicklas 77   
7034 10 Feb 23 nicklas 78   /**
3932 13 May 16 nicklas 79     Set the output directory where the exported files should
3932 13 May 16 nicklas 80     be saved. The exporter will create several subdirectories 
3932 13 May 16 nicklas 81     and files.
3932 13 May 16 nicklas 82   */
4339 07 Feb 17 nicklas 83   public void setOutputLocation(OutputLocation outputLocation)
3932 13 May 16 nicklas 84   {
4339 07 Feb 17 nicklas 85     this.outputLocation = outputLocation;
3932 13 May 16 nicklas 86   }
3932 13 May 16 nicklas 87   
3932 13 May 16 nicklas 88   /**
3932 13 May 16 nicklas 89     Set the item list containing the raw bioassays that should be exported.
3932 13 May 16 nicklas 90     The item list must contain raw bioassays and all raw bioassays must use the
3932 13 May 16 nicklas 91     same array design.
3932 13 May 16 nicklas 92   */
3932 13 May 16 nicklas 93   public void setItemList(ItemList list)
3932 13 May 16 nicklas 94   {
5089 14 Nov 18 nicklas 95     if (list.getMemberType() != Item.BIOSOURCE)
3932 13 May 16 nicklas 96     {
5089 14 Nov 18 nicklas 97       throw new InvalidDataException("Item list '" + list.getName() + "' doesn't contain biosources");
3932 13 May 16 nicklas 98     }
3932 13 May 16 nicklas 99     
3932 13 May 16 nicklas 100     this.list = list;
3932 13 May 16 nicklas 101     
5089 14 Nov 18 nicklas 102     // Load biosources
4626 17 Nov 17 nicklas 103     DbControl dc = list.getDbControl();
6183 26 Mar 21 nicklas 104     ItemQuery<BioSource> query = list.getMembers();
3932 13 May 16 nicklas 105     query.setIncludes(Reggie.INCLUDE_IN_CURRENT_PROJECT);
3932 13 May 16 nicklas 106     query.order(Orders.asc(Hql.property("name")));
3932 13 May 16 nicklas 107     
5089 14 Nov 18 nicklas 108     List<BioSource> tmp = query.list(dc);
4626 17 Nov 17 nicklas 109     if (tmp.size() == 0)
3932 13 May 16 nicklas 110     {
3932 13 May 16 nicklas 111       throw new InvalidDataException("Item list '" + list.getName() + "' is empty");
3932 13 May 16 nicklas 112     }
5089 14 Nov 18 nicklas 113
5089 14 Nov 18 nicklas 114     biosources = tmp;
3932 13 May 16 nicklas 115   }
3932 13 May 16 nicklas 116   
3932 13 May 16 nicklas 117   /**
3932 13 May 16 nicklas 118     Get the item list.
3932 13 May 16 nicklas 119   */
3932 13 May 16 nicklas 120   public ItemList getItemList()
3932 13 May 16 nicklas 121   {
3932 13 May 16 nicklas 122     return list;
3932 13 May 16 nicklas 123   }
3932 13 May 16 nicklas 124   
3932 13 May 16 nicklas 125   /**
5089 14 Nov 18 nicklas 126     Get the biosources loaded from the item list.
3932 13 May 16 nicklas 127   */
5089 14 Nov 18 nicklas 128   public List<BioSource> getBioSources()
3932 13 May 16 nicklas 129   {
5089 14 Nov 18 nicklas 130     return biosources;
3932 13 May 16 nicklas 131   }
3932 13 May 16 nicklas 132   
3932 13 May 16 nicklas 133   /**
3933 13 May 16 nicklas 134     Run the export and create all files.
3933 13 May 16 nicklas 135   */
3932 13 May 16 nicklas 136   public void createAllReleaseFiles(DbControl dc, ProgressReporter progress)
3932 13 May 16 nicklas 137   {
5200 19 Dec 18 nicklas 138     if (progress == null) progress = new SimpleProgressReporter(null);
5200 19 Dec 18 nicklas 139     progress.display(0, "Starting release export for '" + list.getName() + "'");
5130 21 Nov 18 nicklas 140
6962 19 Dec 22 nicklas 141     outputLocation.init(progress);
5198 19 Dec 18 nicklas 142     QueryManager queryManager = new QueryManager(dc);
5198 19 Dec 18 nicklas 143     initGlobalData(dc, queryManager);
5200 19 Dec 18 nicklas 144     exportCohortData(dc, queryManager, getBioSources(), progress);
4085 07 Sep 16 nicklas 145
5089 14 Nov 18 nicklas 146     ExportOutputStream complete = outputLocation.getOutputStream("/exportcomplete", false);
5089 14 Nov 18 nicklas 147     FileUtil.close(complete);
4367 28 Feb 17 nicklas 148     
5200 19 Dec 18 nicklas 149     progress.display(100, "Release export for '" + list.getName() + "' completed successfully");
3932 13 May 16 nicklas 150   }
3932 13 May 16 nicklas 151   
5198 19 Dec 18 nicklas 152   private void exportCohortData(DbControl dc, QueryManager queryManager, List<BioSource> biosources, ProgressReporter progress)
3935 13 May 16 nicklas 153   {
7022 06 Feb 23 nicklas 154     // Use 1/3 of the available processors
7022 06 Feb 23 nicklas 155     int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors()/3);
3935 13 May 16 nicklas 156     // Counters used for progress reporting
5089 14 Nov 18 nicklas 157     int totalCount = biosources.size();
7022 06 Feb 23 nicklas 158     progress.display(15, "Exporting cohort data for " + totalCount + " biosources ("+numThreads+" threads)...");
7022 06 Feb 23 nicklas 159     
7022 06 Feb 23 nicklas 160     ScriptWriter scriptWriter = null;
7022 06 Feb 23 nicklas 161     if (options.exportFileSyncScripts())
7022 06 Feb 23 nicklas 162     {
7022 06 Feb 23 nicklas 163       scriptWriter = new ScriptWriter(dc, outputLocation, options);
7022 06 Feb 23 nicklas 164     }
7022 06 Feb 23 nicklas 165     
7022 06 Feb 23 nicklas 166     JsonWriter jsonWriter = new JsonWriter(dc, outputLocation, options);
7022 06 Feb 23 nicklas 167     jsonWriter.registerItemWriter(new PatientWriter(dc, options));
7022 06 Feb 23 nicklas 168     jsonWriter.registerItemWriter(new RetractWriter(dc, options));
7022 06 Feb 23 nicklas 169     jsonWriter.registerItemWriter(new NotAskedWriter(dc, options));
7022 06 Feb 23 nicklas 170     jsonWriter.registerItemWriter(new NoWriter(dc, options));
7022 06 Feb 23 nicklas 171     jsonWriter.registerItemWriter(new BloodWriter(dc, options));
7022 06 Feb 23 nicklas 172     jsonWriter.registerItemWriter(new BloodDnaWriter(dc, options));
7022 06 Feb 23 nicklas 173     jsonWriter.registerItemWriter(new CaseWriter(dc, options));
7022 06 Feb 23 nicklas 174     jsonWriter.registerItemWriter(new IncaWriter(dc, options));
7022 06 Feb 23 nicklas 175     jsonWriter.registerItemWriter(new SpecimenWriter(dc, options, outputLocation, scriptWriter));
7022 06 Feb 23 nicklas 176     jsonWriter.registerItemWriter(new NoSpecimenWriter(dc, options));
7022 06 Feb 23 nicklas 177     jsonWriter.registerItemWriter(new LysateWriter(dc, options));
7022 06 Feb 23 nicklas 178     jsonWriter.registerItemWriter(new RnaWriter(dc, options));
7022 06 Feb 23 nicklas 179     jsonWriter.registerItemWriter(new DnaWriter(dc, options));
7022 06 Feb 23 nicklas 180     jsonWriter.registerItemWriter(new FlowThroughWriter(dc, options));
7022 06 Feb 23 nicklas 181     jsonWriter.registerItemWriter(new LibraryWriter(dc, options));
7022 06 Feb 23 nicklas 182     jsonWriter.registerItemWriter(new MergedWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 183     jsonWriter.registerItemWriter(new AlignedWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 184     jsonWriter.registerItemWriter(new StringTieWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 185     jsonWriter.registerItemWriter(new CufflinksWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 186     jsonWriter.registerItemWriter(new VariantCallingWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 187     jsonWriter.registerItemWriter(new MethylationWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 188     jsonWriter.registerItemWriter(new OncoarrayWriter(dc, options, scriptWriter));
7022 06 Feb 23 nicklas 189     
7022 06 Feb 23 nicklas 190     jsonWriter.writeIndex(list, biosources);
7022 06 Feb 23 nicklas 191     jsonWriter.writeTypeDefs();
7022 06 Feb 23 nicklas 192     
7022 06 Feb 23 nicklas 193     Queue<BioSource> bioSourceQueue = new ConcurrentLinkedQueue<>(biosources);
7022 06 Feb 23 nicklas 194     
7022 06 Feb 23 nicklas 195     CallableExporter[] exporters = new CallableExporter[numThreads];
7071 20 Mar 23 nicklas 196     ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ReggieThreadFactory("ReleaseExporterThread"));
7022 06 Feb 23 nicklas 197     
3935 13 May 16 nicklas 198     int count = 0;
3935 13 May 16 nicklas 199     try
3935 13 May 16 nicklas 200     {
7022 06 Feb 23 nicklas 201       // Start the exporter threads
7022 06 Feb 23 nicklas 202       for (int threadNo = 0; threadNo < numThreads; threadNo++)
4420 23 Mar 17 nicklas 203       {
7022 06 Feb 23 nicklas 204         CallableExporter exp = new CallableExporter(this, dc, jsonWriter, bioSourceQueue);
7022 06 Feb 23 nicklas 205         exporters[threadNo] = exp;
7022 06 Feb 23 nicklas 206         executor.execute(exp);
4420 23 Mar 17 nicklas 207       }
4420 23 Mar 17 nicklas 208       
7022 06 Feb 23 nicklas 209       // Wait in this loop until all biosources have been processed
7022 06 Feb 23 nicklas 210       while (count < totalCount)
3935 13 May 16 nicklas 211       {
7022 06 Feb 23 nicklas 212         try
3935 13 May 16 nicklas 213         {
7022 06 Feb 23 nicklas 214           Thread.sleep(2000);
7035 10 Feb 23 nicklas 215           // Write files that have been completed while we were waiting
7035 10 Feb 23 nicklas 216           jsonWriter.writeQueuedJsonFiles();
3935 13 May 16 nicklas 217         }
7022 06 Feb 23 nicklas 218         catch (InterruptedException ex)
7022 06 Feb 23 nicklas 219         {
7022 06 Feb 23 nicklas 220           throw new SignalException("Aborted by user");
7022 06 Feb 23 nicklas 221         }
7022 06 Feb 23 nicklas 222         // Important loop for progress reporting AND for detecting if one of
7022 06 Feb 23 nicklas 223         // the exporter threads have failed (getCount() throws an exception)
7022 06 Feb 23 nicklas 224         count = 0;
7022 06 Feb 23 nicklas 225         for (CallableExporter exp : exporters)
7022 06 Feb 23 nicklas 226         {
7022 06 Feb 23 nicklas 227           count += exp.getCount();
7022 06 Feb 23 nicklas 228         }
7022 06 Feb 23 nicklas 229         progress.display(15 + ((80*count)/totalCount), 
7022 06 Feb 23 nicklas 230           "Exporting cohort data ("+numThreads+" threads; " + count + " of " + totalCount + ")...");
3935 13 May 16 nicklas 231       }
3935 13 May 16 nicklas 232     }
3935 13 May 16 nicklas 233     finally
3935 13 May 16 nicklas 234     {
7022 06 Feb 23 nicklas 235       try
7022 06 Feb 23 nicklas 236       {
7022 06 Feb 23 nicklas 237         // This will send an interrupt to all running threads
7022 06 Feb 23 nicklas 238         // There should be none if everything went ok, but if one
7022 06 Feb 23 nicklas 239         // thread fails, getCount() will throw an exception and we
7022 06 Feb 23 nicklas 240         // will ask the other threads to abort. We give them 10 seconds...
7022 06 Feb 23 nicklas 241         executor.shutdownNow();
7022 06 Feb 23 nicklas 242         executor.awaitTermination(10, TimeUnit.SECONDS);
7022 06 Feb 23 nicklas 243       }
7022 06 Feb 23 nicklas 244       catch (InterruptedException ex)
7022 06 Feb 23 nicklas 245       {}
3935 13 May 16 nicklas 246     }
7022 06 Feb 23 nicklas 247     
7035 10 Feb 23 nicklas 248     // Write rest of files
7035 10 Feb 23 nicklas 249     jsonWriter.writeQueuedJsonFiles();
7022 06 Feb 23 nicklas 250     jsonWriter.writeFiles();
7022 06 Feb 23 nicklas 251     jsonWriter.writeBatchIndexLookupFiles(batchIndexProxies);
7022 06 Feb 23 nicklas 252     if (scriptWriter != null) scriptWriter.writeScripts();
7022 06 Feb 23 nicklas 253     
7022 06 Feb 23 nicklas 254     progress.display(98, "Exported cohort data (" + count + " items)");
3935 13 May 16 nicklas 255   }
5096 14 Nov 18 nicklas 256   
5096 14 Nov 18 nicklas 257   /**
5198 19 Dec 18 nicklas 258     Initialize some global data that it is better to have available before starting
5198 19 Dec 18 nicklas 259     the export.
5198 19 Dec 18 nicklas 260     
5198 19 Dec 18 nicklas 261     * Load libraries that have an alignment in the "Flagged alignments" list
5198 19 Dec 18 nicklas 262   */
5198 19 Dec 18 nicklas 263   private void initGlobalData(DbControl dc, QueryManager queryManager)
5198 19 Dec 18 nicklas 264   {
5198 19 Dec 18 nicklas 265     // Load libraries that have at lease one alignment on the "Flagged alignment" list
5198 19 Dec 18 nicklas 266     ItemQuery<Extract> query = queryManager.getLibrariesWithFlaggedAlignments();
5198 19 Dec 18 nicklas 267     libsWithFlaggedAlignment = new HashSet<>(query.idList(dc));
5198 19 Dec 18 nicklas 268   }
5198 19 Dec 18 nicklas 269   
5198 19 Dec 18 nicklas 270   /**
5198 19 Dec 18 nicklas 271     Check if this library has an alignment in the flagged alignment list.
5198 19 Dec 18 nicklas 272   */
5198 19 Dec 18 nicklas 273   public boolean hasFlaggedAlignment(Extract lib)
5198 19 Dec 18 nicklas 274   {
5198 19 Dec 18 nicklas 275     return libsWithFlaggedAlignment.contains(lib.getId());
5198 19 Dec 18 nicklas 276   }
5198 19 Dec 18 nicklas 277   
5198 19 Dec 18 nicklas 278   /**
4466 25 Apr 17 nicklas 279     Get or create a proxy value for the batch key. If called multiple
4466 25 Apr 17 nicklas 280     times with the same batchName/batchKey the same proxy will be 
4466 25 Apr 17 nicklas 281     returned. After all items have been exported the exporter will
4466 25 Apr 17 nicklas 282     generate a lookup file for all proxies.
4466 25 Apr 17 nicklas 283     @param batchName Typically the name of an annotation type
4466 25 Apr 17 nicklas 284     @param batchKey Typically a date identifying the "batch"
4466 25 Apr 17 nicklas 285   */
7022 06 Feb 23 nicklas 286   public synchronized String getBatchIndexProxy(String batchName, String batchKey)
4466 25 Apr 17 nicklas 287   {
4466 25 Apr 17 nicklas 288     Map<String, String> batch = batchIndexProxies.get(batchName);
4466 25 Apr 17 nicklas 289     if (batch == null)
4466 25 Apr 17 nicklas 290     {
4466 25 Apr 17 nicklas 291       // Use TreeMap to keep the mapping sorted in batchKey (=date) order
4466 25 Apr 17 nicklas 292       batch = new TreeMap<>();
4466 25 Apr 17 nicklas 293       batchIndexProxies.put(batchName, batch);
4466 25 Apr 17 nicklas 294     }
4466 25 Apr 17 nicklas 295     
4466 25 Apr 17 nicklas 296     String proxy = batch.get(batchKey);
4466 25 Apr 17 nicklas 297     if (proxy == null)
4466 25 Apr 17 nicklas 298     {
4466 25 Apr 17 nicklas 299       // This should be a unique value
7035 10 Feb 23 nicklas 300       proxy = options.debugMode() ? batchKey : Application.generateRandomId(1) + batch.size();
4466 25 Apr 17 nicklas 301       batch.put(batchKey, proxy);
4466 25 Apr 17 nicklas 302     }
4466 25 Apr 17 nicklas 303     return proxy;
4466 25 Apr 17 nicklas 304   }
4466 25 Apr 17 nicklas 305   
7022 06 Feb 23 nicklas 306   /**
7022 06 Feb 23 nicklas 307     Implementation for multi-threading support. Any number of instances can be created with
7022 06 Feb 23 nicklas 308     the same parameters and started at the same time. Each instance will take biosource items
7022 06 Feb 23 nicklas 309     one by one from the queue and will continue processing as long as there are items left.
7022 06 Feb 23 nicklas 310     
7022 06 Feb 23 nicklas 311     The main code should call getCount() at regular intervals to get information about the
7022 06 Feb 23 nicklas 312     progress. This will also check if there has been an error during the processing and
7022 06 Feb 23 nicklas 313     re-throw the same error.
7022 06 Feb 23 nicklas 314     
7022 06 Feb 23 nicklas 315     @since 4.44
7022 06 Feb 23 nicklas 316   */
7022 06 Feb 23 nicklas 317   public static class CallableExporter
7022 06 Feb 23 nicklas 318     implements Runnable
7022 06 Feb 23 nicklas 319   {
7022 06 Feb 23 nicklas 320     private final ReleaseExporter exporter;
7022 06 Feb 23 nicklas 321     private final DbControl writerDc;
7022 06 Feb 23 nicklas 322     private final JsonWriter jsonWriter;
7022 06 Feb 23 nicklas 323     private final Queue<BioSource> biosources;
7022 06 Feb 23 nicklas 324     
7022 06 Feb 23 nicklas 325     private volatile int count;
7022 06 Feb 23 nicklas 326     private volatile RuntimeException error;
7022 06 Feb 23 nicklas 327     
7022 06 Feb 23 nicklas 328     CallableExporter(ReleaseExporter exporter, DbControl writerDc, JsonWriter jsonWriter, Queue<BioSource> biosources)
7022 06 Feb 23 nicklas 329     {
7022 06 Feb 23 nicklas 330       this.exporter = exporter;
7022 06 Feb 23 nicklas 331       this.writerDc = writerDc;
7022 06 Feb 23 nicklas 332       this.jsonWriter = jsonWriter;
7022 06 Feb 23 nicklas 333       this.biosources = biosources;
7022 06 Feb 23 nicklas 334     }
7022 06 Feb 23 nicklas 335
7022 06 Feb 23 nicklas 336     /**
7022 06 Feb 23 nicklas 337       Get the number of biosources that have been processed so far by this
7022 06 Feb 23 nicklas 338       instance. If there has been an error during the processing, this
7022 06 Feb 23 nicklas 339       method will re-throw that error.
7022 06 Feb 23 nicklas 340     */
7022 06 Feb 23 nicklas 341     public int getCount()
7022 06 Feb 23 nicklas 342     {
7022 06 Feb 23 nicklas 343       if (error != null) throw error;
7022 06 Feb 23 nicklas 344       return count;
7022 06 Feb 23 nicklas 345     }
7022 06 Feb 23 nicklas 346     
7022 06 Feb 23 nicklas 347     @Override
7022 06 Feb 23 nicklas 348     public void run() 
7022 06 Feb 23 nicklas 349     {
7022 06 Feb 23 nicklas 350       DbControl readerDc = null;
7022 06 Feb 23 nicklas 351       SnapshotManager manager = null;
7022 06 Feb 23 nicklas 352       QueryManager queryManager = null;
7022 06 Feb 23 nicklas 353       
7022 06 Feb 23 nicklas 354       try
7022 06 Feb 23 nicklas 355       {
7022 06 Feb 23 nicklas 356         SessionControl sc = writerDc.getSessionControl();
7022 06 Feb 23 nicklas 357         readerDc = sc.newDbControl(writerDc.getName());
7022 06 Feb 23 nicklas 358         manager = new SnapshotManager();
7022 06 Feb 23 nicklas 359         queryManager = new QueryManager(readerDc);
7022 06 Feb 23 nicklas 360         
7022 06 Feb 23 nicklas 361         do
7022 06 Feb 23 nicklas 362         {
7022 06 Feb 23 nicklas 363           BioSource bs = biosources.poll();
7022 06 Feb 23 nicklas 364           // Exit if there are no more biosources or
7022 06 Feb 23 nicklas 365           // if this thread has been interrupted (eg. due to
7022 06 Feb 23 nicklas 366           // aborting or an error in another thread)
7022 06 Feb 23 nicklas 367           if (bs == null || Thread.interrupted()) break;
7022 06 Feb 23 nicklas 368           
7022 06 Feb 23 nicklas 369           if (count % 100 == 0)
7022 06 Feb 23 nicklas 370           {
7022 06 Feb 23 nicklas 371             // Rollback and create new DbControl and snapshot 
7022 06 Feb 23 nicklas 372             // manager to allow GC to reclaim memory
7022 06 Feb 23 nicklas 373             readerDc.close();
7022 06 Feb 23 nicklas 374             readerDc = sc.newDbControl(writerDc.getName());
7022 06 Feb 23 nicklas 375             manager = new SnapshotManager();
7022 06 Feb 23 nicklas 376             queryManager = new QueryManager(readerDc);
7022 06 Feb 23 nicklas 377           }
7022 06 Feb 23 nicklas 378           
7022 06 Feb 23 nicklas 379           CohortItem item = new CohortItem(exporter, readerDc, manager, queryManager, bs.getId());
7022 06 Feb 23 nicklas 380           jsonWriter.writeJsonData(item);
7022 06 Feb 23 nicklas 381           count++;
7022 06 Feb 23 nicklas 382         } while (true);
7022 06 Feb 23 nicklas 383       }
7022 06 Feb 23 nicklas 384       catch (RuntimeException ex)
7022 06 Feb 23 nicklas 385       {
7022 06 Feb 23 nicklas 386         error = ex;
7022 06 Feb 23 nicklas 387       }
7022 06 Feb 23 nicklas 388       finally
7022 06 Feb 23 nicklas 389       {
7022 06 Feb 23 nicklas 390         if (readerDc != null) readerDc.close();
7022 06 Feb 23 nicklas 391       }
7022 06 Feb 23 nicklas 392     }
7022 06 Feb 23 nicklas 393   }
7022 06 Feb 23 nicklas 394
3932 13 May 16 nicklas 395 }