Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static void CreateSingleChunk(TFChunkDbConfig config, int chunkNum, strin
var dataSize = actualDataSize ?? config.ChunkSize;
var buf = new byte[ChunkHeader.Size + dataSize + ChunkFooter.Size];
Buffer.BlockCopy(chunkBytes, 0, buf, 0, chunkBytes.Length);
var chunkFooter = new ChunkFooter(true, true, dataSize, dataSize, 0);
var chunkFooter = new ChunkFooter(true, dataSize, dataSize, 0);
chunkBytes = chunkFooter.AsByteArray();
Buffer.BlockCopy(chunkBytes, 0, buf, buf.Length - ChunkFooter.Size, chunkBytes.Length);

Expand All @@ -44,7 +44,7 @@ public static void CreateMultiChunk(TFChunkDbConfig config, int chunkStartNum, i
var logicalDataSize = logicalSize ?? (chunkEndNum - chunkStartNum + 1) * config.ChunkSize;
var buf = new byte[ChunkHeader.Size + physicalDataSize + ChunkFooter.Size];
Buffer.BlockCopy(chunkBytes, 0, buf, 0, chunkBytes.Length);
var chunkFooter = new ChunkFooter(true, true, physicalDataSize, logicalDataSize, 0);
var chunkFooter = new ChunkFooter(true, physicalDataSize, logicalDataSize, 0);
chunkBytes = chunkFooter.AsByteArray();
Buffer.BlockCopy(chunkBytes, 0, buf, buf.Length - ChunkFooter.Size, chunkBytes.Length);
File.WriteAllBytes(filename, buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,49 @@ namespace EventStore.Core.XUnit.Tests.TransactionLog.Chunks;
public class ChunkFooterTests
{
[Theory]
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public void can_round_trip(bool isCompleted, bool isMap12Bytes)
[InlineData(false)]
[InlineData(true)]
public void can_round_trip(bool isCompleted)
{
Span<byte> hash = stackalloc byte[ChunkFooter.ChecksumSize];
Random.Shared.NextBytes(hash);

var source = new ChunkFooter(
isCompleted: isCompleted,
isMap12Bytes: isMap12Bytes,
physicalDataSize: Random.Shared.Next(500, 600),
logicalDataSize: Random.Shared.Next(600, 700),
mapSize: Random.Shared.Next(500, 600).RoundUpToMultipleOf(24)) { MD5Hash = hash };
mapSize: Random.Shared.Next(500, 600).RoundUpToMultipleOf(12)) { MD5Hash = hash };

var destination = new ChunkFooter(source.AsByteArray());

Assert.Equal(source.IsCompleted, destination.IsCompleted);
Assert.Equal(source.IsMap12Bytes, destination.IsMap12Bytes);
Assert.Equal(source.PhysicalDataSize, destination.PhysicalDataSize);
Assert.Equal(source.LogicalDataSize, destination.LogicalDataSize);
Assert.Equal(source.MapSize, destination.MapSize);
Assert.Equal(source.MD5Hash, destination.MD5Hash);
}

[Fact]
public void rejects_deprecated_position_map_footer()
{
var bytes = new ChunkFooter(
isCompleted: true,
physicalDataSize: 500,
logicalDataSize: 600,
mapSize: 0).AsByteArray();
bytes[0] &= unchecked((byte)~2);

Assert.Throws<Exception>(() => new ChunkFooter(bytes));
}

[Fact]
public void accepts_zeroed_incomplete_footer()
{
var footer = new ChunkFooter(new byte[ChunkFooter.Size]);

Assert.False(footer.IsCompleted);
Assert.Equal(0, footer.PhysicalDataSize);
Assert.Equal(0, footer.LogicalDataSize);
Assert.Equal(0, footer.MapSize);
}
}

This file was deleted.

34 changes: 13 additions & 21 deletions src/EventStore.Core/TransactionLog/Chunks/ChunkFooter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public sealed class ChunkFooter : IBinaryFormattable<ChunkFooter>

// flags within single byte
public readonly bool IsCompleted;
public readonly bool IsMap12Bytes;

public readonly int PhysicalDataSize; // the size of a section of data in chunk

Expand All @@ -41,7 +40,7 @@ public ReadOnlySpan<byte> MD5Hash

public readonly int MapCount; // calculated, not stored

public ChunkFooter(bool isCompleted, bool isMap12Bytes, int physicalDataSize, long logicalDataSize, int mapSize, IncrementalHash hash = null)
public ChunkFooter(bool isCompleted, int physicalDataSize, long logicalDataSize, int mapSize, IncrementalHash hash = null)
{
Ensure.Nonnegative(physicalDataSize, nameof(physicalDataSize));
Ensure.Nonnegative(logicalDataSize, nameof(logicalDataSize));
Expand All @@ -51,7 +50,6 @@ public ChunkFooter(bool isCompleted, bool isMap12Bytes, int physicalDataSize, lo
Ensure.Nonnegative(mapSize, "mapSize");

IsCompleted = isCompleted;
IsMap12Bytes = isMap12Bytes;

PhysicalDataSize = physicalDataSize;
LogicalDataSize = logicalDataSize;
Expand All @@ -61,10 +59,9 @@ public ChunkFooter(bool isCompleted, bool isMap12Bytes, int physicalDataSize, lo
Unsafe.SkipInit(out _checksum); // fix for Qodana false positive about init of readonly field
hash?.TryGetHashAndReset(_checksum, out _);

var posMapSize = isMap12Bytes ? PosMap.FullSize : PosMap.DeprecatedSize;
if (MapSize % posMapSize is not 0)
throw new Exception($"Wrong MapSize {MapSize} -- not divisible by PosMap.Size {posMapSize}.");
MapCount = mapSize / posMapSize;
if (MapSize % PosMap.FullSize is not 0)
throw new Exception($"Wrong MapSize {MapSize} -- not divisible by PosMap.Size {PosMap.FullSize}.");
MapCount = mapSize / PosMap.FullSize;
}

public ChunkFooter(ReadOnlySpan<byte> source)
Expand All @@ -75,23 +72,22 @@ public ChunkFooter(ReadOnlySpan<byte> source)
byte flags = reader.Read();

IsCompleted = (flags & 1) is not 0;
IsMap12Bytes = (flags & 2) is not 0;
if (IsCompleted && (flags & 2) is 0)
throw new Exception("Deprecated 8-byte TFChunk position maps are not supported.");

PhysicalDataSize = reader.ReadLittleEndian<int>();
LogicalDataSize = IsMap12Bytes
? reader.ReadLittleEndian<long>()
: reader.ReadLittleEndian<int>();
LogicalDataSize = reader.ReadLittleEndian<long>();

MapSize = reader.ReadLittleEndian<int>();
reader.ConsumedCount = Size - ChecksumSize;
reader.Read(_checksum);

var posMapSize = IsMap12Bytes ? PosMap.FullSize : PosMap.DeprecatedSize;
if (MapSize % posMapSize is not 0)
if (MapSize % PosMap.FullSize is not 0)
{
throw new Exception($"Wrong MapSize {MapSize} -- not divisible by PosMap.Size {posMapSize}.");
throw new Exception($"Wrong MapSize {MapSize} -- not divisible by PosMap.Size {PosMap.FullSize}.");
}

MapCount = MapSize / posMapSize;
MapCount = MapSize / PosMap.FullSize;
}

static int IBinaryFormattable<ChunkFooter>.Size => Size;
Expand All @@ -101,15 +97,11 @@ public void Format(Span<byte> destination)
Debug.Assert(destination.Length >= Size);

SpanWriter<byte> writer = new(destination.Slice(0, Size));
int flags = Unsafe.BitCast<bool, byte>(IsCompleted)
| Unsafe.BitCast<bool, byte>(IsMap12Bytes) << 1;
var flags = (IsCompleted ? 1 : 0) | 2;

writer.Add((byte)flags);
writer.WriteLittleEndian(PhysicalDataSize);
if (IsMap12Bytes)
writer.WriteLittleEndian(LogicalDataSize);
else
writer.WriteLittleEndian((int)LogicalDataSize);
writer.WriteLittleEndian(LogicalDataSize);

writer.WriteLittleEndian(MapSize);

Expand Down
24 changes: 0 additions & 24 deletions src/EventStore.Core/TransactionLog/Chunks/TFChunk/PosMap.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
using System;
using System.Buffers.Binary;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using DotNext.Buffers;
using DotNext.Buffers.Binary;
using DotNext.IO;

namespace EventStore.Core.TransactionLog.Chunks.TFChunk;

public struct PosMap : IBinaryFormattable<PosMap>
{
public const int FullSize = sizeof(long) + sizeof(int);
public const int DeprecatedSize = sizeof(int) + sizeof(int);

public readonly long LogPos;
public readonly int ActualPos;
Expand Down Expand Up @@ -42,24 +36,6 @@ static PosMap IBinaryFormattable<PosMap>.Parse(ReadOnlySpan<byte> source)
public static PosMap FromNewFormat(ReadOnlySpan<byte> source)
=> new(source);

public static PosMap FromOldFormat(ReadOnlySpan<byte> source)
{
Debug.Assert(source.Length >= DeprecatedSize);

var posmap = BinaryPrimitives.ReadUInt64LittleEndian(source);
var logPos = (int)(posmap >>> 32);
var actualPos = (int)(posmap & 0xFFFFFFFF);
return new(logPos, actualPos);
}

public static async ValueTask<PosMap> FromOldFormat(Stream stream, Memory<byte> buffer, CancellationToken token)
{
var posmap = await stream.ReadLittleEndianAsync<ulong>(buffer, token);
var logPos = (int)(posmap >>> 32);
var actualPos = (int)(posmap & 0xFFFFFFFF);
return new(logPos, actualPos);
}

public readonly void Format(Span<byte> destination)
{
SpanWriter<byte> writer = new(destination);
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,15 +1094,15 @@ await _transform.Write.CompleteData(
ChunkFooter footerWithHash;
try
{
var footerNoHash = new ChunkFooter(true, true, _physicalDataSize, LogicalDataSize, mapSize);
var footerNoHash = new ChunkFooter(true, _physicalDataSize, LogicalDataSize, mapSize);

//MD5
footerNoHash.Format(bufferFromPool);
workItem.MD5.AppendData(bufferFromPool, 0,
ChunkFooter.Size - ChunkFooter.ChecksumSize);

//FILE
footerWithHash = new ChunkFooter(true, true, _physicalDataSize, LogicalDataSize, mapSize, workItem.MD5);
footerWithHash = new ChunkFooter(true, _physicalDataSize, LogicalDataSize, mapSize, workItem.MD5);

footerWithHash.Format(bufferFromPool);
fileSize = await _transform.Write.WriteFooter(new(bufferFromPool, 0, ChunkFooter.Size),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,14 @@ private async ValueTask<Midpoint[]> PopulateMidpoints(int depth, ReaderWorkItem
if (Chunk.ChunkFooter.MapCount is 0) // empty chunk
return [];

var posMapSize = Chunk.ChunkFooter.IsMap12Bytes ? PosMap.FullSize : PosMap.DeprecatedSize;
try
{
using var posMapsBuffer = Memory.AllocateAtLeast<byte>(checked(Chunk.ChunkFooter.MapSize));
int midPointsCnt = 1 << depth;
int segmentSize;
Midpoint[] midpoints;
var mapCount = Chunk.ChunkFooter.MapCount;
var posMaps = posMapsBuffer.Memory[..checked(mapCount * posMapSize)];
var posMaps = posMapsBuffer.Memory[..checked(mapCount * PosMap.FullSize)];
workItem.BaseStream.Seek(ChunkHeader.Size + Chunk.ChunkFooter.PhysicalDataSize, SeekOrigin.Begin);
await workItem.BaseStream.ReadExactlyAsync(posMaps, token);

Expand Down Expand Up @@ -272,32 +271,17 @@ private async ValueTask<Midpoint[]> PopulateMidpoints(int depth, ReaderWorkItem

private PosMap ReadPosMap(ReadOnlySpan<byte> posMaps, int index)
{
var posMapSize = Chunk.ChunkFooter.IsMap12Bytes ? PosMap.FullSize : PosMap.DeprecatedSize;
var start = checked(index * posMapSize);
var buffer = posMaps.Slice(start, posMapSize);
return Chunk.ChunkFooter.IsMap12Bytes
? PosMap.FromNewFormat(buffer)
: PosMap.FromOldFormat(buffer);
var start = checked(index * PosMap.FullSize);
var buffer = posMaps.Slice(start, PosMap.FullSize);
return PosMap.FromNewFormat(buffer);
}

private ValueTask<PosMap> ReadPosMap(ReaderWorkItem workItem, long index, Memory<byte> buffer,
CancellationToken token)
{
ValueTask<PosMap> task;
if (Chunk.ChunkFooter.IsMap12Bytes)
{
var pos = ChunkHeader.Size + Chunk.ChunkFooter.PhysicalDataSize + index * PosMap.FullSize;
workItem.BaseStream.Seek(pos, SeekOrigin.Begin);
task = workItem.BaseStream.ReadAsync<PosMap>(buffer, token);
}
else
{
var pos = ChunkHeader.Size + Chunk.ChunkFooter.PhysicalDataSize + index * PosMap.DeprecatedSize;
workItem.BaseStream.Seek(pos, SeekOrigin.Begin);
task = PosMap.FromOldFormat(workItem.BaseStream, buffer, token);
}

return task;
var pos = ChunkHeader.Size + Chunk.ChunkFooter.PhysicalDataSize + index * PosMap.FullSize;
workItem.BaseStream.Seek(pos, SeekOrigin.Begin);
return workItem.BaseStream.ReadAsync<PosMap>(buffer, token);
}

public async ValueTask<bool> ExistsAt(long logicalPosition, CancellationToken token)
Expand Down
Loading