Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/lifecycle/LogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables)
private LogRecord makeAddRecord(SSTable table)
{
maybeCreateReplica(table);
return LogRecord.make(Type.ADD, table);
return LogRecord.makeAdd(table);
}

/**
Expand Down Expand Up @@ -568,4 +568,4 @@ public void takeOwnership(LogFile txnFile)
addRecord(record);
}
}
}
}
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ public static LogRecord make(Type type, SSTable table)
return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
}

static LogRecord makeAdd(SSTable table)
{
String absoluteTablePath = absolutePath(table.descriptor.baseFile());
List<String> allPossibleFilePaths = table.getAllFilePaths();
List<File> filesOnDisk = new ArrayList<>();
for (String p : allPossibleFilePaths)
{
filesOnDisk.add(new File(p));
}
return make(Type.ADD, filesOnDisk, allPossibleFilePaths.size(), absoluteTablePath);
}

public static Map<SSTable, LogRecord> make(Type type, Iterable<SSTableReader> tables)
{
// contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc part of the filename) to the sstable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.test.microbench.sstable;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.util.File;


@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@Threads(1)
@State(Scope.Benchmark)
public class MemtableFlushBench extends SSTableAbstractBench
{

@Param("10")
private int preFillSStableCount;

@Param("10")
private int flushingSStableCount;

@Param("1")
private int rowCount;

@Param("200000")
private int extraFileCount;

@Param("false")
private boolean skipCleanup;

protected void setupData()
{
super.setupData();
// Input some mostly empty flushes
for (int j = 0; j < preFillSStableCount; j++)
{
for (long i = 0; i < rowCount; i++)
insertForIndex(writeStatement, i);
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
}

// Inflate directory entry count with dummy files to simulate
// a directory with many more SSTables than we actually flushed
File dataDir = cfs.getDirectories().getDirectoryForNewSSTables();
for (int i = 0; i < extraFileCount; i++)
new File(dataDir, "dummy_" + i + ".db").createFileIfNotExists();
}

@Benchmark
public void doFlushing()
{
for (int j = 0; j < flushingSStableCount; j++)
{
for (long i = 0; i < rowCount; i++)
insertForIndex(writeStatement, i);
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
}

}

@TearDown(Level.Trial)
public void teardown() throws IOException, ExecutionException, InterruptedException
{
if (!skipCleanup)
super.teardown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class SSTableAbstractBench extends CQLTester
@Param("50000")
int rowCount = 50000;
private String table;
protected String writeStatement;

// TODO: elaborate data setup with multiple schemas
@Setup(Level.Trial)
Expand Down Expand Up @@ -84,7 +85,7 @@ protected void setupTable()

protected void setupData()
{
String writeStatement = "INSERT INTO " + table + "(userid,picid1,picid2,commentid)VALUES(?,?,?,?)";
writeStatement = "INSERT INTO " + table + "(userid,picid1,picid2,commentid)VALUES(?,?,?,?)";
for (long i = 0; i < rowCount; i++)
insertForIndex(writeStatement, i);

Expand Down
100 changes: 99 additions & 1 deletion test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1634,4 +1635,101 @@ public void useAfterCompletedTest()
txnFile.abort(); // this should complete the txn
txnFile.trackNew(dummySSTable()); // expect an IllegalStateException here
}
}}
}

@Test
public void testMakeAddEquivalentToMake() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
File dataFolder = new Directories(cfs.metadata()).getDirectoryForNewSSTables();
SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);

LogRecord viaOldPath = LogRecord.make(LogRecord.Type.ADD, sstable);
LogRecord viaMakeAdd = LogRecord.makeAdd(sstable);

assertEquals(viaOldPath, viaMakeAdd);
assertEquals(viaOldPath.raw, viaMakeAdd.raw);

sstable.selfRef().release();
}

@Test
public void testMakeAddUnaffectedByExtraFilesInDirectory() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
File dataFolder = new Directories(cfs.metadata()).getDirectoryForNewSSTables();
SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);

LogRecord before = LogRecord.makeAdd(sstable);

final List<File> fakeDataFiles = generateDisjointComponents(sstable);

LogRecord after = LogRecord.makeAdd(sstable);
assertEquals(before, after);
assertEquals(before.raw, after.raw);

for (File file : fakeDataFiles)
file.tryDelete();

sstable.selfRef().release();
}

@Test
public void testDisjointComponentsAppearMidTransaction() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
File dataFolder = new Directories(cfs.metadata()).getDirectoryForNewSSTables();
SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);

try (LogTransaction log = new LogTransaction(OperationType.COMPACTION))
{
log.trackNew(sstable);
generateDisjointComponents(sstable);
log.untrackNew(sstable);
log.finish();
}

sstable.selfRef().release();
LogTransaction.waitForDeletions();

assertFiles(dataFolder.path(), Collections.emptySet());

}

@Test
public void testPreExistingDisjointComponents() throws IOException
{
ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
File dataFolder = new Directories(cfs.metadata()).getDirectoryForNewSSTables();
SSTableReader sstable = sstable(dataFolder, cfs, 0, 128);
generateDisjointComponents(sstable);

try (LogTransaction log = new LogTransaction(OperationType.COMPACTION))
{
log.trackNew(sstable);
log.untrackNew(sstable);
log.finish();
}

sstable.selfRef().release();
LogTransaction.waitForDeletions();

assertFiles(dataFolder.path(), Collections.emptySet());

}

private static List<File> generateDisjointComponents(SSTableReader sstable)
{
int numComponents = 100;
final List<File> fakeComponents = new ArrayList<>(numComponents);
for (int i = 0; i < numComponents; i++)
{
File file = new File(sstable.descriptor.fileFor(Components.DATA).absolutePath()
.replace(".db", i + "fake.db"));
file.createFileIfNotExists();
fakeComponents.add(file);

}
return fakeComponents;
}
}