3838import org .labkey .api .exp .property .Domain ;
3939import org .labkey .api .pipeline .AbstractTaskFactory ;
4040import org .labkey .api .pipeline .AbstractTaskFactorySettings ;
41+ import org .labkey .api .pipeline .CancelledException ;
4142import org .labkey .api .pipeline .PipelineJob ;
4243import org .labkey .api .pipeline .PipelineJobException ;
4344import org .labkey .api .pipeline .RecordedAction ;
5960
6061import java .io .BufferedReader ;
6162import java .io .File ;
62- import java .io .FileReader ;
6363import java .io .IOException ;
6464import java .io .LineNumberReader ;
65+ import java .sql .PreparedStatement ;
6566import java .sql .SQLException ;
6667import java .util .ArrayList ;
6768import java .util .Arrays ;
@@ -151,7 +152,7 @@ public RecordedActionSet run() throws PipelineJobException
151152 FileAnalysisJobSupport support = (FileAnalysisJobSupport ) job ;
152153
153154 processInbreeding (job .getContainer (), job .getUser (), support .getAnalysisDirectoryPath ().toFile (), job .getLogger ());
154- processKinship (job .getContainer (), job .getUser (), support .getAnalysisDirectoryPath ().toFile (), job .getLogger ());
155+ processKinship (job .getContainer (), job .getUser (), support .getAnalysisDirectoryPath ().toFile (), job .getLogger (), job );
155156
156157 if (GeneticCalculationsJob .isKinshipValidation ())
157158 {
@@ -165,10 +166,10 @@ public RecordedActionSet run() throws PipelineJobException
165166 public static void standaloneProcessKinshipAndInbreeding (Container c , User u , File pipelineDir , Logger log ) throws PipelineJobException
166167 {
167168 processInbreeding (c , u , pipelineDir , log );
168- processKinship (c , u , pipelineDir , log );
169+ processKinship (c , u , pipelineDir , log , null );
169170 }
170171
171- private static void processKinship (Container c , User u , File pipelineDir , Logger log ) throws PipelineJobException
172+ private static void processKinship (Container c , User u , File pipelineDir , Logger log , @ Nullable PipelineJob job ) throws PipelineJobException
172173 {
173174 File output = new File (pipelineDir , KINSHIP_FILE );
174175 if (!output .exists ())
@@ -240,13 +241,22 @@ else if (kinshipTable.getSqlDialect().isPostgreSQL())
240241 }
241242
242243 try (DbScope .Transaction transaction = ExperimentService .get ().ensureTransaction ();
243- BufferedReader reader = Readers .getReader (output ))
244+ BufferedReader reader = Readers .getReader (output );
245+ PreparedStatement stmt = transaction .getConnection ().prepareStatement (
246+ "INSERT INTO " + EHRSchema .EHR_SCHEMANAME + ".kinship\n " +
247+ "\t (Id, Id2, coefficient, container, created, createdby, modified, modifiedby)\n " +
248+ "\t VALUES (?, ?, ?, ?, ?, ?, ?, ?)" ))
244249 {
245250 log .info ("Inserting rows" );
246251 String line = null ;
247252 int lineNum = 0 ;
248253 while ((line = reader .readLine ()) != null )
249254 {
255+ if (job != null && job .isCancelled ())
256+ {
257+ throw new CancelledException ();
258+ }
259+
250260 String [] fields = line .split ("\t " );
251261 if (fields .length < 3 )
252262 continue ;
@@ -260,34 +270,45 @@ else if (kinshipTable.getSqlDialect().isPostgreSQL())
260270 assert fields [0 ].length () < 80 : "Field Id value too long: [" + fields [0 ] + ']' ;
261271 assert fields [1 ].length () < 80 : "Field Id2 value too long: [" + fields [1 ] + "]" ;
262272
263- row . put ( "Id" , fields [0 ]);
264- row . put ( "Id2" , fields [1 ]);
273+ stmt . setString ( 1 , fields [0 ]); // Id
274+ stmt . setString ( 2 , fields [1 ]); // Id2
265275 try
266276 {
267- row . put ( "coefficient" , Double .parseDouble (fields [2 ]));
277+ stmt . setDouble ( 3 , Double .parseDouble (fields [2 ])); // coefficient
268278 }
269279 catch (NumberFormatException e )
270280 {
271281 throw new PipelineJobException ("Invalid kinship coefficient on line " + (lineNum + 1 ) + " for IDs " + fields [0 ] + " and " + fields [1 ] + ": " + fields [2 ], e );
272282 }
273283
274- row .put ("container" , c .getId ());
275- row .put ("created" , new Date ());
276- row .put ("createdby" , u .getUserId ());
277- Table .insert (u , kinshipTable , row );
284+ stmt .setString (4 , c .getId ()); // container
285+ java .sql .Date now = new java .sql .Date (new Date ().getTime ());
286+ stmt .setDate (5 , now ); // created
287+ stmt .setInt (6 , u .getUserId ()); // createdby
288+ stmt .setDate (7 , now ); // modified
289+ stmt .setInt (8 , u .getUserId ()); // modifiedby
290+
291+ stmt .addBatch ();
292+
278293 lineNum ++;
279294
280295 if (lineNum % 100000 == 0 )
281296 {
282- log .info ("processed " + lineNum + " rows" );
297+ stmt .executeBatch ();
298+ }
299+
300+ if (lineNum % 250000 == 0 )
301+ {
302+ log .info ("imported " + lineNum + " rows" );
283303 }
284304 }
285305
286- log . info ( "Inserted " + lineNum + " rows into ehr.kinship" );
306+ stmt . executeBatch ( );
287307 transaction .commit ();
308+ log .info ("Inserted " + lineNum + " rows into ehr.kinship" );
288309 }
289310 }
290- catch (RuntimeSQLException | IOException e )
311+ catch (RuntimeSQLException | SQLException | IOException e )
291312 {
292313 throw new PipelineJobException (e );
293314 }
0 commit comments