Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/org/apache/sysds/api/DMLScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ public static boolean executeScript( String[] args )
LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
LineageCacheConfig.setEstimator(LINEAGE_ESTIMATE);

if (dmlOptions.oocLogEvents)
OOCEventLog.setup(100000);
if(dmlOptions.oocLogEvents)
OOCEventLog.setup(1000000);

String dmlScriptStr = readDMLScript(isFile, fileOrScript);
Map<String, String> argVals = dmlOptions.argVals;
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/org/apache/sysds/api/ScriptExecutorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -31,6 +31,7 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
Expand All @@ -45,7 +46,7 @@ public class ScriptExecutorUtils {
* Execute the runtime program. This involves execution of the program
* blocks that make up the runtime program and may involve dynamic
* recompilation.
*
*
* @param se
* script executor
* @param statisticsMaxHeavyHitters
Expand All @@ -62,7 +63,7 @@ public static void executeRuntimeProgram(ScriptExecutor se, int statisticsMaxHea
* Execute the runtime program. This involves execution of the program
* blocks that make up the runtime program and may involve dynamic
* recompilation.
*
*
* @param rtprog
* runtime program
* @param ec
Expand All @@ -82,7 +83,7 @@ public static void executeRuntimeProgram(Program rtprog, ExecutionContext ec, DM
List<GPUContext> gCtxs = GPUContextPool.reserveAllGPUContexts();
if (gCtxs == null) {
throw new DMLRuntimeException(
"GPU : Could not create GPUContext, either no GPU or all GPUs currently in use");
"GPU : Could not create GPUContext, either no GPU or all GPUs currently in use");
}
gCtxs.get(0).initializeThread();
ec.setGPUContexts(gCtxs);
Expand Down Expand Up @@ -120,18 +121,21 @@ public static void executeRuntimeProgram(Program rtprog, ExecutionContext ec, DM
}
if( ConfigurationManager.isCodegenEnabled() )
SpoofCompiler.cleanupCodeGenerator();

// display statistics (incl caching stats if enabled)
Statistics.stopRunTimer();
System.out.println(Statistics.display(statisticsMaxHeavyHitters > 0 ?
statisticsMaxHeavyHitters : DMLScript.STATISTICS_COUNT));
statisticsMaxHeavyHitters : DMLScript.STATISTICS_COUNT));

if (DMLScript.LINEAGE_ESTIMATE)
System.out.println(LineageEstimatorStatistics.displayLineageEstimates());

if (DMLScript.USE_OOC)
if(DMLScript.USE_OOC) {
// Clean symbol-table entries so OOC streams count as de-referenced
if((outputVariables == null || outputVariables.isEmpty()) && ec != null)
ec.getVarList().forEach(var -> VariableCPInstruction.processRmvarInstruction(ec, var));
OOCCacheManager.reset();
}
}
}

}
2 changes: 1 addition & 1 deletion src/main/java/org/apache/sysds/hops/UnaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ else if(_op == OpOp1.MEDIAN)
else { // general case MATRIX
ExecType et = optFindExecType();
// special handling cumsum/cumprod/cummin/cumsum
if(isCumulativeUnaryOperation() && !(et == ExecType.CP || et == ExecType.GPU)) {
if(isCumulativeUnaryOperation() && !(et == ExecType.CP || et == ExecType.GPU || et == ExecType.OOC)) {
// TODO additional physical operation if offsets fit in memory
ret = constructLopsSparkCumulativeUnary();
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/apache/sysds/lops/Lop.java
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,8 @@ public String prepScalarOperand(ExecType et, String label) {
boolean isLiteral = (isData && ((Data)this).isLiteral());

StringBuilder sb = new StringBuilder("");
if ( et == ExecType.CP || et == ExecType.SPARK || et == ExecType.GPU || (isData && isLiteral)) {
if ( et == ExecType.CP || et == ExecType.SPARK || et == ExecType.GPU || et == ExecType.OOC
|| (isData && isLiteral)) {
sb.append(label);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -729,12 +730,19 @@ private void executeLocalParFor( ExecutionContext ec, IntObject from, IntObject
final LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
final Thread[] threads = new Thread[_numThreads];
final LocalParWorker[] workers = new LocalParWorker[_numThreads];
@SuppressWarnings("unchecked")
final HashMap<String, Data>[] workerBaselines = DMLScript.USE_OOC ? new HashMap[_numThreads] : null;
try
{
// Step 1) create task queue and init workers in parallel
// (including preparation of update-in-place variables)
IntStream.range(0, _numThreads).forEach(i -> {
workers[i] = createParallelWorker( _pwIDs[i], queue, ec, i);
if(DMLScript.USE_OOC) {
workerBaselines[i] = new HashMap<>();
for(Map.Entry<String, Data> e : workers[i].getVariables().entrySet())
workerBaselines[i].put(e.getKey(), e.getValue());
}
threads[i] = new Thread( workers[i] , "PARFOR");
threads[i].setPriority(Thread.MAX_PRIORITY);
});
Expand Down Expand Up @@ -777,11 +785,21 @@ private void executeLocalParFor( ExecutionContext ec, IntObject from, IntObject

// Step 4) collecting results from each parallel worker
//obtain results and cleanup other intermediates before result merge
LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads];
Set<String> resultVarNames = _resultVars.stream()
.map(v -> v._name).collect(Collectors.toSet());
LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads];
for( int i=0; i<_numThreads; i++ ) {
localVariables[i] = workers[i].getVariables();
localVariables[i].removeAllNotIn(_resultVars.stream()
.map(v -> v._name).collect(Collectors.toSet()));
if(DMLScript.USE_OOC) {
for(String var : localVariables[i].keySet()) {
if(!resultVarNames.contains(var)) {
Data current = localVariables[i].get(var);
if(current != null && current != workerBaselines[i].get(var))
VariableCPInstruction.processRmvarInstruction(workers[i].getExecutionContext(), var);
}
}
}
localVariables[i].removeAllNotIn(resultVarNames);
numExecutedTasks += workers[i].getExecutedTasks();
numExecutedIterations += workers[i].getExecutedIterations();
}
Expand All @@ -797,6 +815,20 @@ private void executeLocalParFor( ExecutionContext ec, IntObject from, IntObject
//consolidate results into global symbol table
consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
numExecutedIterations, numExecutedTasks, localVariables );

if(DMLScript.USE_OOC) {
// Cleanup remaining variables
for(int i = 0; i < workers.length; i++) {
if(workers[i].getExecutedTasks() <= 0)
continue;
for(String var : localVariables[i].keySet()) {
Data current = localVariables[i].get(var);
if(current != null && current != workerBaselines[i].get(var))
VariableCPInstruction.processRmvarInstruction(workers[i].getExecutionContext(), var);
}
}

}

// Step 5) cleanup local parworkers (e.g., remove created functions)
for( int i=0; i<_numThreads; i++ ) {
Expand Down Expand Up @@ -1392,11 +1424,19 @@ private void consolidateAndCheckResults(ExecutionContext ec, final long expIters
CacheableData<?> outNew = USE_PARALLEL_RESULT_MERGE ?
rm.executeParallelMerge(_numThreads) :
rm.executeSerialMerge();

//cleanup existing var
Data exdata = ec.removeVariable(var._name);
if( exdata != null && exdata != outNew )
ec.cleanupDataObject(exdata);

if(DMLScript.USE_OOC) {
//cleanup existing var with rmvar semantics to keep OOC ref counters consistent
Data exdata = ec.getVariable(var._name);
if( exdata != null && exdata != outNew )
VariableCPInstruction.processRmvarInstruction(ec, var._name);
}
else {
//cleanup existing var
Data exdata = ec.removeVariable(var._name);
if( exdata != null && exdata != outNew )
ec.cleanupDataObject(exdata);
}

//cleanup of intermediate result variables
cleanWorkerResultVariables( ec, out, in, true );
Expand Down Expand Up @@ -1607,6 +1647,11 @@ public void run()
outNew = rm.executeSerialMerge();

synchronized( _ec.getVariables() ){
if(DMLScript.USE_OOC) {
Data exdata = _ec.getVariable(var._name);
if(exdata != null && exdata != outNew)
VariableCPInstruction.processRmvarInstruction(_ec, var._name);
}
_ec.getVariables().put( var._name, outNew);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
Expand Down Expand Up @@ -293,8 +294,13 @@ protected UpdateType[] prepareUpdateInPlaceVariables(ExecutionContext ec, long t
moNew.setFileName(mo.getFileName() + Lop.UPDATE_INPLACE_PREFIX + tid);
mo.release();
// cleanup old variable (e.g., remove from buffer pool)
if(ec.removeVariable(varname) != null)
if(DMLScript.USE_OOC) {
if(ec.containsVariable(varname))
VariableCPInstruction.processRmvarInstruction(ec, varname);
}
else if(ec.removeVariable(varname) != null) {
ec.cleanupCacheableData(mo);
}
moNew.release(); // after old removal to avoid unnecessary evictions
moNew.setUpdateType(UpdateType.INPLACE);
ec.setVariable(varname, moNew);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public double getLeafNodeEstimate(TestMeasure measure, OptNode node)
value = DEFAULT_MEM_REMOTE + h.getSpBroadcastSize();
}
//check for invalid cp memory estimate
else if ( h.getExecType()==ExecType.CP && value >= OptimizerUtils.getLocalMemBudget() ) {
else if ( (h.getExecType()==ExecType.CP || h.getExecType()==ExecType.OOC)
&& value >= OptimizerUtils.getLocalMemBudget() ) {
if( !forcedExec && !HopRewriteUtils.hasListInputs(h) )
LOG.warn("Memory estimate larger than budget but CP exec type (op="+h.getOpString()+", name="+h.getName()+", memest="+h.getMemEstimate()+").");
value = DEFAULT_MEM_REMOTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public static List<OptNode> rCreateAbstractOptNodes(Hop hop, LocalVariableMap va
Types.ExecType et = (hop.getExecType()!=null) ?
hop.getExecType() : Types.ExecType.CP;
switch( et ) {
case CP:case GPU:
case CP:case GPU:case OOC:
node.setExecType(ExecType.CP); break;
case SPARK:
node.setExecType(ExecType.SPARK); break;
Expand All @@ -329,7 +329,7 @@ public static List<OptNode> rCreateAbstractOptNodes(Hop hop, LocalVariableMap va
}

//handle degree of parallelism
if( et == Types.ExecType.CP && hop instanceof MultiThreadedHop ){
if( (et == Types.ExecType.CP || et == Types.ExecType.OOC) && hop instanceof MultiThreadedHop ){
MultiThreadedHop mtop = (MultiThreadedHop) hop;
node.setK( OptimizerUtils.getConstrainedNumThreads(mtop.getMaxNumThreads()) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ else if(parts.length == 4)
return TSMMOOCInstruction.parseInstruction(str);
case Reorg:
return ReorgOOCInstruction.parseInstruction(str);
case Reshape:
return ReorgOOCInstruction.parseInstruction(str);
case Tee:
return TeeOOCInstruction.parseInstruction(str);
case CentralMoment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,29 @@ public void processInstruction(ExecutionContext ec) {
throw new DMLRuntimeException("error executing function " + fname, e);
}
long t1 = !ReuseCacheType.isNone()||DMLScript.LINEAGE_ESTIMATE ? System.nanoTime() : 0;
// cleanup all returned variables w/o binding

// cleanup all returned variables w/o binding
HashSet<String> expectRetVars = new HashSet<>();
for(DataIdentifier di : fpb.getOutputParams())
expectRetVars.add(di.getName());

LocalVariableMap retVars = fn_ec.getVariables();
for( String varName : new ArrayList<>(retVars.keySet()) ) {
if( expectRetVars.contains(varName) )
continue;
//cleanup unexpected return values to avoid leaks
//(including OOC reference tracking for matrix streams)
VariableCPInstruction.processRmvarInstruction(fn_ec, varName);
if(DMLScript.USE_OOC) {
for( String varName : new ArrayList<>(retVars.keySet()) ) {
if( expectRetVars.contains(varName) )
continue;
// cleanup unexpected return values to avoid leaks
// (including OOC reference tracking for matrix streams)
VariableCPInstruction.processRmvarInstruction(fn_ec, varName);
}
}
else {
for( String varName : new ArrayList<>(retVars.keySet()) ) {
if( expectRetVars.contains(varName) )
continue;
// cleanup unexpected return values to avoid leaks
fn_ec.cleanupDataObject(fn_ec.removeVariable(varName));
}
}

// Unpin the pinned variables
Expand All @@ -245,7 +255,8 @@ public void processInstruction(ExecutionContext ec) {
for (int i=0; i< numOutputs; i++) {
String boundVarName = _boundOutputNames.get(i);
String retVarName = fpb.getOutputParams().get(i).getName();
Data boundValue = retVars.get(retVarName);
Data boundValue = DMLScript.USE_OOC ?
fn_ec.removeVariable(retVarName) : retVars.get(retVarName);
if (boundValue == null)
throw new DMLRuntimeException("fcall "+_functionName+": "
+boundVarName + " was not assigned a return value");
Expand Down Expand Up @@ -288,11 +299,17 @@ public void processInstruction(ExecutionContext ec) {
//FIXME: send _boundOutputNames instead of fpb.getOutputParams as
//those are already replaced by boundoutput names in the lineage map.
}

if(DMLScript.USE_OOC) {
// Cleanup any remaining unbound outputs in function scope.
for( String varName : new ArrayList<>(fn_ec.getVariables().keySet()) )
VariableCPInstruction.processRmvarInstruction(fn_ec, varName);

// cleanup declared outputs that are not bound at callsite
for (int i = numOutputs; i < fpb.getOutputParams().size(); i++) {
String retVarName = fpb.getOutputParams().get(i).getName();
VariableCPInstruction.processRmvarInstruction(fn_ec, retVarName);
// cleanup declared outputs that are not bound at callsite
for (int i = numOutputs; i < fpb.getOutputParams().size(); i++) {
String retVarName = fpb.getOutputParams().get(i).getName();
VariableCPInstruction.processRmvarInstruction(fn_ec, retVarName);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,8 @@ private void processCopyInstruction(ExecutionContext ec) {

// remove existing variable bound to target name
Data input2_data = ec.removeVariable(getInput2().getName());
if (DMLScript.USE_OOC && input2_data instanceof MatrixObject)
TeeOOCInstruction.incrRef(((MatrixObject) input2_data).getStreamable(), -1);

//cleanup matrix data on fs/hdfs (if necessary)
if( input2_data != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void processInstruction( ExecutionContext ec ) {

MatrixBlock ltmp;
int extra = _aop.correction.getNumRemovedRowsColumns();
MatrixBlock ret = new MatrixBlock(1,1+extra,false);
MatrixBlock ret = new MatrixBlock(1, 1 + extra, _aop.initialValue);
MatrixBlock corr = new MatrixBlock(1,1+extra,false);
while((ltmp = qLocal.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) {
OperationsOnMatrixValues.incrementalAggregation(
Expand Down
Loading
Loading