Skip to content

INFO: Batch Job Implementation

bmckinney edited this page Jan 3, 2017 · 23 revisions

Overview

JSR 352: Batch Applications for the Java Platform

JSR 352 is part of the Java EE 7 platform and defines the programming model for batch applications plus a runtime to run and manage batch jobs.

GlassFish 4.0 is the reference implementation (RI) for the Java EE 7 specification and contains the RI for JSR 352. The source code for this RI (JBatch) can be found on github.

General Constraints

  1. Must be run by a user who can issue the UpdateDatasetCommand.
  2. Dataset must be in DatasetVersion.VersionState.DRAFT.
  3. It must have only one version.
  4. If run more than once, only new files will be added. Nothing is updated or deleted.

Big Data Constraints

  1. FITS/JHOVE isn't run against imported files (too large, not likely to be recognized) so no media mime types are assigned.
  2. No automatic thumbnails are produced (not efficient for large files or large numbers of files).
  3. Ideally, experimental data files should be imported inside a "/data" folder to prevent various Dataverse artifacts from being added (*.cache, etc.), which conflicts with compute needs.

Import Mode

The import mode defines how importing content is applied to existing content in the dataset. It's controlled by setting the "mode" job property and defaults to MERGE. Only MERGE is currently implemented.

  1. MERGE Default behavior. Existing data files are not deleted or modified. Only new data files are added.

  2. UPDATE Existing data files are updated. New data files are added. Nothing is deleted.

  3. REPLACE Existing data files are replaced completely.

See: ImportMode

Job specification XML

The file system import batch job is described in an XML file located at: src/main/resources/META-INF/batch-jobs/FileSystemImportJob.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="FileSystemImportJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">

    <properties>
        <property name="job-checksumManifest" value="files.sha"/>
        <property name="job-checksumType" value="SHA1"/>
    </properties>

    <listeners>
        <listener ref="fileRecordJobListener">
            <properties>
                <property name="checksumManifest" value="#{jobProperties['job-checksumManifest']}"/>
                <property name="checksumType" value="#{jobProperties['job-checksumType']}"/>
            </properties>
        </listener>
    </listeners>

    <step id="import-files">
        <listeners>
            <listener ref="fileRecordJobListener"/>
        </listeners>
        <chunk checkpoint-policy="item" item-count="50" time-limit="0" skip-limit="0" retry-limit="1">
            <reader ref="fileRecordReader">
                <properties>
                    <property name="excludes" value="*.DS_Store, *.sha, *.cached"/>
                </properties>
            </reader>
            <processor ref="fileRecordProcessor"/>
            <writer ref="fileRecordWriter">
                <properties>
                    <property name="checksumManifest" value="#{jobProperties['job-checksumManifest']}"/>
                    <property name="checksumType" value="#{jobProperties['job-checksumType']}"/>
                </properties>
            </writer>
            <skippable-exception-classes>
                <!-- To skip all the exceptions -->
                <include class="javax.transaction.RollbackException"/>
                <include class="java.lang.NullPointerException"/>
                <include class="java.lang.Exception"/>
                <include class="java.lang.Throwable"/>
            </skippable-exception-classes>
            <no-rollback-exception-classes>
                <!-- To skip all the exceptions -->
                <include class="javax.transaction.RollbackException"/>
                <include class="java.lang.NullPointerException"/>
                <include class="java.lang.Exception"/>
                <include class="java.lang.Throwable"/>
            </no-rollback-exception-classes>
        </chunk>
    </step>

</job>

Job source code

Code implementing the file system import jobs is located at: edu.harvard.iq.dataverse.batch.jobs.importer package.

Code used to create json representations of the batch execution log are located at: edu.harvard.iq.dataverse.batch.entities.

Code used to create an API to list and execute import jobs is located at: edu.harvard.iq.dataverse.batch.api.

Integration tests are located at: edu.harvard.iq.dataverse.filesystem.FileRecordJobIT

Additions to core Dataverse classes:

Adds a method to locate DataFiles by storageId and DatasetVersion:

    public DataFile findByStorageIdandDatasetVersion(String storageId, DatasetVersion dv) {
        try {
            Query query = em.createNativeQuery("select o.id from datafile o, filemetadata m " +
                    "where o.filesystemname = '" + storageId + "' and o.id = m.datafile_id and m.datasetversion_id = " +
                    dv.getId() + "");
            query.setMaxResults(1);
            if (query.getResultList().size() < 1) {
                return null;
            } else {
                return findCheapAndEasy((Long) query.getSingleResult());
            }
        } catch (Exception e) {
            System.out.println("Error finding datafile by storageID and DataSetVersion: " + e.getMessage());
            return null;
        }
    }

This is required by FileRecordProcessor to check if the datafile already exists (skipping it).

Adds new notification types: FILESYSTEMIMPORT, CHECKSUMIMPORT, CHECKSUMFAIL

    public enum Type {
        ASSIGNROLE, 
        REVOKEROLE, 
        CREATEDV, 
        CREATEDS, 
        CREATEACC, 
        MAPLAYERUPDATED, 
        SUBMITTEDDS, 
        RETURNEDDS, 
        PUBLISHEDDS, 
        REQUESTFILEACCESS, 
        GRANTFILEACCESS, 
        REJECTFILEACCESS, 
        FILESYSTEMIMPORT, 
        CHECKSUMIMPORT, 
        CHECKSUMFAIL
    };

Adds required boilerplate code to support: CHECKSUMFAIL, FILESYSTEMIMPORT and CHECKSUMIMPORT.

Starting a job from code

// set the required properties
Properties props = new Properties();
props.setProperty("datasetId", dataset.getGlobalId());
// create a job operator
JobOperator jo = BatchRuntime.getJobOperator();
// run a job (using an id in a job xml file) and return the job execution ID
long jid = jo.start("FileSystemImportJob", props);

Starting a job from the API

curl -X GET --header 'Accept: application/json' --header 'X-Dataverse-key: my-key' 'http://localhost:8080/api/import/datasets/files/10.5072/FK2/MVKMO8?mode=MERGE'

Overriding default checksum properties

1. Checksum manifest file name

The default checksum manifest file name is set as files.sha in the job xml. To override it, add a JVM option: -DchecksumManifest=manifest-md5.txt

2. Checksum type

The default checksum type is set as SHA1 in the job xml. To override it, add a JVM option: -DchecksumType=MD5

Example:

Monitoring jobs

1. User notifications

A notification is sent to the user used to start the batch job after the batch job has completed. The Data Capture Modules (DCM) implementation will determine who that user is. In addition, the Dataverse admin receives the same notifications.

Example:

2. Action logs

Dataverse action log entries are created containing a pointer to the full log file for the job. The id column references the job execution id and the info column contains the location of the full job log:

899	OK	FileSystemImportJob	Command	2016-12-21 09:21:21.373000	/usr/local/glassfish4/glassfish/domains/domain1/logs/batch-jobs/job-899.log	2016-12-21 09:21:21.214000	@bb3db07c

The full log looks something like this:

INFO: Job ID = 912
INFO: Job Name = FileSystemImportJob
INFO: Job Status = STARTED
INFO: Dataset Identifier (datasetId=doi:10.5072/FK2/PN3IYE): PN3IYE
INFO: User Identifier (userId=e30e4ded): @e30e4ded
INFO: Import mode =  MERGE
INFO: Locking dataset
INFO: Checksum type = SHA1 ('checksumType' System property)
INFO: Checksum manifest = files.sha ('checksumManifest' System property)
INFO: Reading checksum manifest: /usr/local/glassfish4/glassfish/domains/domain1/files/10.5072/FK2/PN3IYE/files.sha
INFO: Checksums found = 1
INFO: Reading dataset directory: /usr/local/glassfish4/glassfish/domains/domain1/files/10.5072/FK2/PN3IYE (excluding: *.DS_Store, *.sha, *.cached)
INFO: Files found = 2
SEVERE: Checksum mismatch: 1 checksums found in the manifest and 2 files found in the dataset directory.
INFO: Creating DataFile for: /usr/local/glassfish4/glassfish/domains/domain1/files/10.5072/FK2/PN3IYE/testfile1.txt
SEVERE: Unable to find checksum in manifest for: /usr/local/glassfish4/glassfish/domains/domain1/files/10.5072/FK2/PN3IYE/testfile2.txt
INFO: Creating DataFile for: /usr/local/glassfish4/glassfish/domains/domain1/files/10.5072/FK2/PN3IYE/testfile2.txt
INFO: Files read  = 2
SEVERE: Creating job json: null
INFO: Removing dataset lock.
INFO: Job start = Thu Dec 22 15:52:07 EST 2016
INFO: Job end   = Thu Dec 22 15:52:07 EST 2016
INFO: Job exit status = COMPLETED

Note: Log entries for individual datafile creation are suppressed for datasets containing more than 20000 files.

3. JSON log files

JSON log files are also generated and saved to: /usr/local/glassfish4/glassfish/domains/domain1/logs/batch-jobs/

Example:

{  
   "id":119,
   "name":"FileSystemImportJob",
   "status":"COMPLETED",
   "exitStatus":"COMPLETED",
   "createTime":1472145902194,
   "endTime":1472145902785,
   "lastUpdateTime":1472145902212,
   "startTime":1472145902212,
   "properties":{  
      "userId":"dataverseAdmin",
      "datasetId":"doi:10.5072/FK2/EYVJSM"
   },
   "steps":[  
      {  
         "id":233,
         "name":"import-files",
         "status":"COMPLETED",
         "exitStatus":"COMPLETED",
         "endTime":1472145902339,
         "startTime":1472145902218,
         "metrics":{  
            "write_skip_count":0,
            "commit_count":1,
            "process_skip_count":0,
            "read_skip_count":0,
            "write_count":9,
            "rollback_count":0,
            "filter_count":0,
            "read_count":9
         },
         "persistentUserData":null
      },
      {  
         "id":234,
         "name":"import-checksums",
         "status":"COMPLETED",
         "exitStatus":"FAILED",
         "endTime":1472145902776,
         "startTime":1472145902343,
         "metrics":{  
            "write_skip_count":0,
            "commit_count":2,
            "process_skip_count":0,
            "read_skip_count":0,
            "write_count":9,
            "rollback_count":0,
            "filter_count":2,
            "read_count":11
         },
         "persistentUserData":"FAILED: missing data files [.DS_Store, G08_trj/.DS_Store] "
      }
   ]
}

Job status API

After a job is started, a particular job status can be polled using this API (requires admin): https://dv.sbgrid.org/api/admin/batch/job/{job-execution-id}

It returns a json response identical to the files in the batch-jobs folder.

Glassfish admin server console

Jobs can be monitored via the Glassfish Admin Server:

Individual job steps can be monitored as well:

Glassfish command line

To view the list of batch jobs, open a terminal window and run the following command:

asadmin list-batch-jobs -l

To view batch job executions run the following command:

asadmin list-batch-job-executions -l

To view the job steps of a particular job, run the following command:

asadmin list-batch-job-steps -l {job-execution-id}