|
9 | 9 | import org.labkey.api.data.CompareType; |
10 | 10 | import org.labkey.api.data.Container; |
11 | 11 | import org.labkey.api.data.ContainerManager; |
| 12 | +import org.labkey.api.data.DbScope; |
12 | 13 | import org.labkey.api.data.SimpleFilter; |
13 | 14 | import org.labkey.api.data.TableInfo; |
14 | 15 | import org.labkey.api.data.TableSelector; |
@@ -100,7 +101,7 @@ public boolean isRequired() |
100 | 101 | } |
101 | 102 | } |
102 | 103 |
|
103 | | - final int BATCH_SIZE = 100; |
| 104 | + final int BATCH_SIZE = 250; |
104 | 105 |
|
105 | 106 | private MODE getMode() |
106 | 107 | { |
@@ -136,127 +137,136 @@ private void checkCancelled(PipelineJob job) |
136 | 137 | private void processBatch(List<String> subjects, Logger log, PipelineJob job) |
137 | 138 | { |
138 | 139 | log.info("processing batch with " + subjects.size() + " subjects"); |
139 | | - TableInfo destinationTable = getDataDestinationTable(); |
| 140 | + try (DbScope.Transaction t = DbScope.getLabKeyScope().ensureTransaction()) |
| 141 | + { |
| 142 | + TableInfo destinationTable = getDataDestinationTable(); |
140 | 143 |
|
141 | | - QueryUpdateService qus = destinationTable.getUpdateService(); |
142 | | - qus.setBulkLoad(true); |
| 144 | + QueryUpdateService qus = destinationTable.getUpdateService(); |
| 145 | + qus.setBulkLoad(true); |
143 | 146 |
|
144 | | - try |
145 | | - { |
146 | | - if (getMode() == MODE.TRUNCATE) |
| 147 | + try |
147 | 148 | { |
148 | | - // Find / Delete existing values: |
149 | | - Set<ColumnInfo> keyFields = destinationTable.getColumns().stream().filter(ColumnInfo::isKeyField).collect(Collectors.toSet()); |
150 | | - final SimpleFilter subjectFilter = new SimpleFilter(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name())), subjects, CompareType.IN); |
151 | | - if (_settings.get(Settings.targetAdditionalFilters.name()) != null) |
| 149 | + if (getMode() == MODE.TRUNCATE) |
152 | 150 | { |
153 | | - List<CompareType.AbstractCompareClause> additionalFilters = parseAdditionalFilters(_settings.get(Settings.targetAdditionalFilters.name())); |
154 | | - additionalFilters.forEach(subjectFilter::addCondition); |
155 | | - } |
| 151 | + // Find / Delete existing values: |
| 152 | + Set<ColumnInfo> keyFields = destinationTable.getColumns().stream().filter(ColumnInfo::isKeyField).collect(Collectors.toSet()); |
| 153 | + final SimpleFilter subjectFilter = new SimpleFilter(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name())), subjects, CompareType.IN); |
| 154 | + if (_settings.get(Settings.targetAdditionalFilters.name()) != null) |
| 155 | + { |
| 156 | + List<CompareType.AbstractCompareClause> additionalFilters = parseAdditionalFilters(_settings.get(Settings.targetAdditionalFilters.name())); |
| 157 | + additionalFilters.forEach(subjectFilter::addCondition); |
| 158 | + } |
156 | 159 |
|
157 | | - if (destinationTable.getColumn(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name()))) == null) |
158 | | - { |
159 | | - throw new IllegalStateException("Unknown column on table " + destinationTable.getName() + ": " + _settings.get(Settings.targetSubjectColumn.name())); |
160 | | - } |
| 160 | + if (destinationTable.getColumn(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name()))) == null) |
| 161 | + { |
| 162 | + throw new IllegalStateException("Unknown column on table " + destinationTable.getName() + ": " + _settings.get(Settings.targetSubjectColumn.name())); |
| 163 | + } |
161 | 164 |
|
162 | | - List<Map<String, Object>> existingRows = new ArrayList<>(new TableSelector(destinationTable, keyFields, subjectFilter, null).getMapCollection()); |
163 | | - if (!existingRows.isEmpty()) |
164 | | - { |
165 | | - List<List<Map<String, Object>>> batches = Lists.partition(existingRows, 5000); |
166 | | - log.info("deleting " + existingRows.size() + " rows in " + batches.size() + " batches"); |
167 | | - int i = 0; |
168 | | - for (List<Map<String, Object>> batch : batches) |
| 165 | + List<Map<String, Object>> existingRows = new ArrayList<>(new TableSelector(destinationTable, keyFields, subjectFilter, null).getMapCollection()); |
| 166 | + if (!existingRows.isEmpty()) |
169 | 167 | { |
170 | | - i++; |
171 | | - log.info("batch " + i); |
172 | | - checkCancelled(job); |
| 168 | + List<List<Map<String, Object>>> batches = Lists.partition(existingRows, 5000); |
| 169 | + log.info("deleting " + existingRows.size() + " rows in " + batches.size() + " batches"); |
| 170 | + int i = 0; |
| 171 | + for (List<Map<String, Object>> batch : batches) |
| 172 | + { |
| 173 | + i++; |
| 174 | + log.info("batch " + i); |
| 175 | + checkCancelled(job); |
173 | 176 |
|
174 | | - qus.deleteRows(_containerUser.getUser(), _containerUser.getContainer(), batch, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
| 177 | + qus.deleteRows(_containerUser.getUser(), _containerUser.getContainer(), batch, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
| 178 | + t.commitAndKeepConnection(); |
| 179 | + } |
| 180 | + } |
| 181 | + else |
| 182 | + { |
| 183 | + log.info("No rows to delete for this subject batch"); |
175 | 184 | } |
176 | 185 | } |
177 | 186 | else |
178 | 187 | { |
179 | | - log.info("No rows to delete for this subject batch"); |
| 188 | + log.info("Using " + getMode().name() + " mode, source records will not be deleted"); |
180 | 189 | } |
181 | | - } |
182 | | - else |
183 | | - { |
184 | | - log.info("Using " + getMode().name() + " mode, source records will not be deleted"); |
185 | | - } |
186 | 190 |
|
187 | | - // Query data and import |
188 | | - List<Map<String, Object>> toImportOrUpdate = getRowsToImport(subjects, log); |
189 | | - if (!toImportOrUpdate.isEmpty()) |
190 | | - { |
191 | | - if (getMode() == MODE.TRUNCATE) |
| 191 | + // Query data and import |
| 192 | + List<Map<String, Object>> toImportOrUpdate = getRowsToImport(subjects, log); |
| 193 | + if (!toImportOrUpdate.isEmpty()) |
192 | 194 | { |
193 | | - List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000); |
194 | | - log.info("inserting " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches"); |
195 | | - |
196 | | - int i = 0; |
197 | | - for (List<Map<String, Object>> batch : batches) |
| 195 | + if (getMode() == MODE.TRUNCATE) |
198 | 196 | { |
199 | | - i++; |
200 | | - log.info("batch " + i); |
201 | | - checkCancelled(job); |
| 197 | + List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000); |
| 198 | + log.info("inserting " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches"); |
202 | 199 |
|
203 | | - BatchValidationException bve = new BatchValidationException(); |
204 | | - qus.insertRows(_containerUser.getUser(), _containerUser.getContainer(), batch, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
205 | | - if (bve.hasErrors()) |
| 200 | + int i = 0; |
| 201 | + for (List<Map<String, Object>> batch : batches) |
206 | 202 | { |
207 | | - throw bve; |
| 203 | + i++; |
| 204 | + log.info("batch " + i); |
| 205 | + checkCancelled(job); |
| 206 | + |
| 207 | + BatchValidationException bve = new BatchValidationException(); |
| 208 | + qus.insertRows(_containerUser.getUser(), _containerUser.getContainer(), batch, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
| 209 | + if (bve.hasErrors()) |
| 210 | + { |
| 211 | + throw bve; |
| 212 | + } |
| 213 | + t.commitAndKeepConnection(); |
208 | 214 | } |
209 | 215 | } |
210 | | - } |
211 | | - else if (getMode() == MODE.UPDATE_ONLY) |
212 | | - { |
213 | | - List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000); |
214 | | - log.info("updating " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches"); |
215 | | - |
216 | | - int i = 0; |
217 | | - for (List<Map<String, Object>> batch : batches) |
| 216 | + else if (getMode() == MODE.UPDATE_ONLY) |
218 | 217 | { |
| 218 | + List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000); |
| 219 | + log.info("updating " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches"); |
219 | 220 |
|
220 | | - i++; |
221 | | - log.info("batch " + i); |
222 | | - checkCancelled(job); |
| 221 | + int i = 0; |
| 222 | + for (List<Map<String, Object>> batch : batches) |
| 223 | + { |
223 | 224 |
|
224 | | - BatchValidationException bve = new BatchValidationException(); |
| 225 | + i++; |
| 226 | + log.info("batch " + i); |
| 227 | + checkCancelled(job); |
225 | 228 |
|
226 | | - Collection<String> keyFields = destinationTable.getPkColumnNames(); |
227 | | - List<Map<String, Object>> keys = batch.stream().map(x -> { |
228 | | - Map<String, Object> map = new HashMap<>(); |
229 | | - for (String keyField : keyFields) |
230 | | - { |
231 | | - if (x.get(keyField) != null) |
| 229 | + BatchValidationException bve = new BatchValidationException(); |
| 230 | + |
| 231 | + Collection<String> keyFields = destinationTable.getPkColumnNames(); |
| 232 | + List<Map<String, Object>> keys = batch.stream().map(x -> { |
| 233 | + Map<String, Object> map = new HashMap<>(); |
| 234 | + for (String keyField : keyFields) |
232 | 235 | { |
233 | | - map.put(keyField, x.get(keyField)); |
| 236 | + if (x.get(keyField) != null) |
| 237 | + { |
| 238 | + map.put(keyField, x.get(keyField)); |
| 239 | + } |
234 | 240 | } |
235 | | - } |
236 | 241 |
|
237 | | - return map; |
238 | | - }).toList(); |
| 242 | + return map; |
| 243 | + }).toList(); |
239 | 244 |
|
240 | | - qus.updateRows(_containerUser.getUser(), _containerUser.getContainer(), batch, keys, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
241 | | - if (bve.hasErrors()) |
242 | | - { |
243 | | - throw bve; |
| 245 | + qus.updateRows(_containerUser.getUser(), _containerUser.getContainer(), batch, keys, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null); |
| 246 | + if (bve.hasErrors()) |
| 247 | + { |
| 248 | + throw bve; |
| 249 | + } |
| 250 | + t.commitAndKeepConnection(); |
244 | 251 | } |
245 | 252 | } |
| 253 | + else |
| 254 | + { |
| 255 | + throw new IllegalStateException("Unknown mode: " + getMode()); |
| 256 | + } |
246 | 257 | } |
247 | 258 | else |
248 | 259 | { |
249 | | - throw new IllegalStateException("Unknown mode: " + getMode()); |
| 260 | + log.info("No rows to import/update for this subject batch"); |
250 | 261 | } |
251 | 262 | } |
252 | | - else |
| 263 | + catch (SQLException | InvalidKeyException | BatchValidationException | QueryUpdateServiceException | |
| 264 | + DuplicateKeyException e) |
253 | 265 | { |
254 | | - log.info("No rows to import/update for this subject batch"); |
| 266 | + throw new IllegalStateException("Error Importing/Updating Rows", e); |
255 | 267 | } |
256 | | - } |
257 | | - catch (SQLException | InvalidKeyException | BatchValidationException | QueryUpdateServiceException | DuplicateKeyException e) |
258 | | - { |
259 | | - throw new IllegalStateException("Error Importing/Updating Rows", e); |
| 268 | + |
| 269 | + t.commit(); |
260 | 270 | } |
261 | 271 | } |
262 | 272 |
|
|
0 commit comments