Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 87 additions & 45 deletions pack/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-log/v2"
"github.com/rclone/rclone/fs"
"github.com/rjNemo/underscore"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -113,73 +114,113 @@ func Pack(
var minPieceSizePadding int64
if storageWriter != nil {
var carGenerated bool
reader := io.TeeReader(assembler, calc)
filename = uuid.NewString() + ".car"
obj, err := storageWriter.Write(ctx, filename, reader)
defer func() {
if !carGenerated && obj != nil {
removeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := storageWriter.Remove(removeCtx, obj)
if err != nil {
logger.Errorf("failed to remove temporary CAR file %s: %v", filename, err)
}
cancel()
}
}()
if err != nil {
return nil, errors.WithStack(err)
}
fileSize = obj.Size()

if assembler.carOffset <= 65 {
return nil, errors.WithStack(ErrNoContent)
}
pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize))
if err != nil {
return nil, errors.WithStack(err)
// Calculate total input size
var totalInputSize int64
for _, fr := range job.FileRanges {
totalInputSize += fr.Length
}

// Check if minPieceSize constraint forced larger piece size
naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize))
if finalPieceSize > naturalPieceSize {
// Need to pad to (127/128) × piece_size due to Fr32 padding overhead
targetCarSize := (int64(finalPieceSize) * 127) / 128
paddingNeeded := targetCarSize - fileSize
// Use temp file if input size suggests padding may be needed (< minPieceSize/2)
useTempFile := pieceSize > 0 && totalInputSize < int64(pieceSize)/2
var obj fs.Object

// Find the output storage by ID
var outputStorage *model.Storage
for i := range job.Attachment.Preparation.OutputStorages {
if job.Attachment.Preparation.OutputStorages[i].ID == *storageID {
outputStorage = &job.Attachment.Preparation.OutputStorages[i]
break
}
if useTempFile {
// Write to temp file first to enable padding
tempFile, err := os.CreateTemp("", "car-*.car")
if err != nil {
return nil, errors.Wrap(err, "failed to create temp file for CAR")
}
tempPath := tempFile.Name()
defer os.Remove(tempPath)

reader := io.TeeReader(assembler, calc)
_, err = io.Copy(tempFile, reader)
if err != nil {
tempFile.Close()
return nil, errors.Wrap(err, "failed to write CAR to temp file")
}
tempFile.Close()

if assembler.carOffset <= 65 {
return nil, errors.WithStack(ErrNoContent)
}

stat, err := os.Stat(tempPath)
if err != nil {
return nil, errors.WithStack(err)
}
fileSize = stat.Size()

pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize))
if err != nil {
return nil, errors.WithStack(err)
}

// Append zeros to file
if outputStorage != nil && obj != nil {
// Build full path to CAR file
carPath := outputStorage.Path + "/" + filename
// Check if padding needed
naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize))
if finalPieceSize > naturalPieceSize {
targetCarSize := (int64(finalPieceSize) * 127) / 128
paddingNeeded := targetCarSize - fileSize

// Reopen file and append zeros (like dd does)
f, err := os.OpenFile(carPath, os.O_APPEND|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempPath, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrap(err, "failed to open CAR file for padding")
return nil, errors.Wrap(err, "failed to open temp CAR file for padding")
}

zeros := make([]byte, paddingNeeded)
_, err = f.Write(zeros)
f.Close()
if err != nil {
return nil, errors.Wrap(err, "failed to write padding to CAR file")
return nil, errors.Wrap(err, "failed to write padding to temp CAR file")
}

fileSize = targetCarSize
// minPieceSizePadding stays 0 for non-inline (zeros are in file)

logger.Infow("padded CAR file for minPieceSize", "original", fileSize-paddingNeeded, "padded", fileSize, "padding", paddingNeeded, "piece_size", finalPieceSize)
}

// Upload complete file
f, err := os.Open(tempPath)
if err != nil {
return nil, errors.Wrap(err, "failed to open temp file for upload")
}
defer f.Close()

obj, err = storageWriter.Write(ctx, filename, f)
if err != nil {
return nil, errors.WithStack(err)
}
} else {
// Stream directly without temp file (input too large for padding to be likely)
reader := io.TeeReader(assembler, calc)
obj, err = storageWriter.Write(ctx, filename, reader)
if err != nil {
return nil, errors.WithStack(err)
}
fileSize = obj.Size()

if assembler.carOffset <= 65 {
return nil, errors.WithStack(ErrNoContent)
}

pieceCid, finalPieceSize, err = GetCommp(calc, uint64(pieceSize))
if err != nil {
return nil, errors.WithStack(err)
}
}

defer func() {
if !carGenerated && obj != nil {
removeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := storageWriter.Remove(removeCtx, obj)
if err != nil {
logger.Errorf("failed to remove temporary CAR file %s: %v", filename, err)
}
cancel()
}
}()

_, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car")
if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) {
logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err)
Expand All @@ -189,6 +230,7 @@ func Pack(
}
carGenerated = true
} else {
// Inline preparation - no physical CAR file
fileSize, err = io.Copy(calc, assembler)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
2 changes: 2 additions & 0 deletions pack/pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func TestLastPiecePadding(t *testing.T) {
// Create and execute the packing job
err := db.Create(&job).Error
require.NoError(t, err)

car, err := Pack(ctx, db, job)
require.NoError(t, err)

Expand Down Expand Up @@ -457,6 +458,7 @@ func TestMultiplePiecesWithLastPiece(t *testing.T) {
// Create and execute the packing job
err := db.Create(&job).Error
require.NoError(t, err)

car, err := Pack(ctx, db, job)
require.NoError(t, err)

Expand Down
127 changes: 83 additions & 44 deletions service/datasetworker/daggen.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,73 +207,112 @@ func (w *Thread) ExportDag(ctx context.Context, job model.Job) error {
var fileSize int64
var minPieceSizePadding int64
if storageWriter != nil {
reader := io.TeeReader(dagGenerator, calc)
filename = uuid.NewString() + ".car"
obj, err := storageWriter.Write(ctx, filename, reader)
if err != nil {
return errors.WithStack(err)
}
fileSize = obj.Size()

if dagGenerator.offset <= 59 {
logger.Info("Nothing to export to dag. Skipping.")
return nil
}
// DAG size is unknown until generated, use temp file when minPieceSize is set
if pieceSize > 0 {
// Write to temp file to enable padding
tempFile, err := os.CreateTemp("", "dagcar-*.car")
if err != nil {
return errors.Wrap(err, "failed to create temp file for DAG CAR")
}
tempPath := tempFile.Name()
defer os.Remove(tempPath)

pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize))
if err != nil {
return errors.WithStack(err)
}
reader := io.TeeReader(dagGenerator, calc)
_, err = io.Copy(tempFile, reader)
if err != nil {
tempFile.Close()
return errors.Wrap(err, "failed to write DAG CAR to temp file")
}
tempFile.Close()

// Check if minPieceSize constraint forced larger piece size
naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize))
if finalPieceSize > naturalPieceSize {
// Need to pad to (127/128) × piece_size due to Fr32 padding overhead
targetCarSize := (int64(finalPieceSize) * 127) / 128
paddingNeeded := targetCarSize - fileSize
if dagGenerator.offset <= 59 {
logger.Info("Nothing to export to dag. Skipping.")
return nil
}

// Find the output storage by ID
var outputStorage *model.Storage
for i := range job.Attachment.Preparation.OutputStorages {
if job.Attachment.Preparation.OutputStorages[i].ID == *storageID {
outputStorage = &job.Attachment.Preparation.OutputStorages[i]
break
}
stat, err := os.Stat(tempPath)
if err != nil {
return errors.WithStack(err)
}
fileSize = stat.Size()

pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize))
if err != nil {
return errors.WithStack(err)
}

// Append zeros to file
if outputStorage != nil && obj != nil {
// Build full path to CAR file
carPath := outputStorage.Path + "/" + filename
// Check if padding needed
naturalPieceSize := util.NextPowerOfTwo(uint64(fileSize))
if finalPieceSize > naturalPieceSize {
targetCarSize := (int64(finalPieceSize) * 127) / 128
paddingNeeded := targetCarSize - fileSize

// Reopen file and append zeros
f, err := os.OpenFile(carPath, os.O_APPEND|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempPath, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return errors.Wrap(err, "failed to open DAG CAR file for padding")
return errors.Wrap(err, "failed to open temp DAG CAR file for padding")
}

zeros := make([]byte, paddingNeeded)
_, err = f.Write(zeros)
f.Close()
if err != nil {
return errors.Wrap(err, "failed to write padding to DAG CAR file")
return errors.Wrap(err, "failed to write padding to temp DAG CAR file")
}

fileSize = targetCarSize
// minPieceSizePadding stays 0 for non-inline (zeros are in file)

logger.Infow("padded DAG CAR file for minPieceSize", "original", fileSize-paddingNeeded, "padded", fileSize, "padding", paddingNeeded, "piece_size", finalPieceSize)
}
}

_, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car")
if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) {
logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err)
}
if err == nil {
filename = pieceCid.String() + ".car"
// Upload complete file
f, err := os.Open(tempPath)
if err != nil {
return errors.Wrap(err, "failed to open temp file for upload")
}
defer f.Close()

obj, err := storageWriter.Write(ctx, filename, f)
if err != nil {
return errors.WithStack(err)
}

_, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car")
if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) {
logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err)
}
if err == nil {
filename = pieceCid.String() + ".car"
}
} else {
// No minPieceSize constraint, stream directly
reader := io.TeeReader(dagGenerator, calc)
obj, err := storageWriter.Write(ctx, filename, reader)
if err != nil {
return errors.WithStack(err)
}

if dagGenerator.offset <= 59 {
logger.Info("Nothing to export to dag. Skipping.")
return nil
}

fileSize = obj.Size()
pieceCid, finalPieceSize, err = pack.GetCommp(calc, uint64(pieceSize))
if err != nil {
return errors.WithStack(err)
}

_, err = storageWriter.Move(ctx, obj, pieceCid.String()+".car")
if err != nil && !errors.Is(err, storagesystem.ErrMoveNotSupported) {
logger.Errorf("failed to move car file from %s to %s: %s", filename, pieceCid.String()+".car", err)
}
if err == nil {
filename = pieceCid.String() + ".car"
}
}
} else {
// Inline DAG - no physical CAR file
fileSize, err = io.Copy(calc, dagGenerator)
if err != nil {
return errors.WithStack(err)
Expand Down
Loading