Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/Backups/ArchiveBackup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include <Backups/ArchiveBackup.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/createArchiveWriter.h>


namespace DB
{
ArchiveBackup::ArchiveBackup(
const String & backup_name_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_)
: BackupImpl(backup_name_, context_, base_backup_info_), disk(disk_), path(path_)
{
}

ArchiveBackup::~ArchiveBackup()
{
close();
}

bool ArchiveBackup::backupExists() const
{
return disk ? disk->exists(path) : fs::exists(path);
}

void ArchiveBackup::openImpl(OpenMode open_mode_)
{
/// mutex is already locked
if (open_mode_ == OpenMode::WRITE)
{
if (disk)
writer = createArchiveWriter(path, disk->writeFile(path));
else
writer = createArchiveWriter(path);

writer->setCompression(compression_method, compression_level);
writer->setPassword(password);
}
else if (open_mode_ == OpenMode::READ)
{
if (disk)
{
auto archive_read_function = [d = disk, p = path]() -> std::unique_ptr<SeekableReadBuffer> { return d->readFile(p); };
size_t archive_size = disk->getFileSize(path);
reader = createArchiveReader(path, archive_read_function, archive_size);
}
else
reader = createArchiveReader(path);

reader->setPassword(password);
}
}

void ArchiveBackup::closeImpl(bool writing_finalized_)
{
/// mutex is already locked
if (writer && writer->isWritingFile())
throw Exception("There is some writing unfinished on close", ErrorCodes::LOGICAL_ERROR);

writer.reset();
reader.reset();

if ((getOpenModeNoLock() == OpenMode::WRITE) && !writing_finalized_)
fs::remove(path);
}

std::unique_ptr<ReadBuffer> ArchiveBackup::readFileImpl(const String & file_name) const
{
/// mutex is already locked
return reader->readFile(file_name);
}

std::unique_ptr<WriteBuffer> ArchiveBackup::addFileImpl(const String & file_name)
{
/// mutex is already locked
return writer->writeFile(file_name);
}

void ArchiveBackup::setCompression(const String & compression_method_, int compression_level_)
{
std::lock_guard lock{mutex};
compression_method = compression_method_;
compression_level = compression_level_;
if (writer)
writer->setCompression(compression_method, compression_level);
}

void ArchiveBackup::setPassword(const String & password_)
{
std::lock_guard lock{mutex};
password = password_;
if (writer)
writer->setPassword(password);
if (reader)
reader->setPassword(password);
}

}
52 changes: 52 additions & 0 deletions src/Backups/ArchiveBackup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <Backups/BackupImpl.h>


namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class IArchiveReader;
class IArchiveWriter;

/// Stores a backup as a single .zip file.
class ArchiveBackup : public BackupImpl
{
public:
/// `disk`_ is allowed to be nullptr and that means the `path_` is a path in the local filesystem.
ArchiveBackup(
const String & backup_name_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_ = {});

~ArchiveBackup() override;

static constexpr const int kDefaultCompressionLevel = -1;

/// Sets compression method and level.
void setCompression(const String & compression_method_, int compression_level_ = kDefaultCompressionLevel);

/// Sets password.
void setPassword(const String & password_);

private:
bool backupExists() const override;
void openImpl(OpenMode open_mode_) override;
void closeImpl(bool writing_finalized_) override;
bool supportsWritingInMultipleThreads() const override { return false; }
std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const override;
std::unique_ptr<WriteBuffer> addFileImpl(const String & file_name) override;

const DiskPtr disk;
const String path;
std::shared_ptr<IArchiveReader> reader;
std::shared_ptr<IArchiveWriter> writer;
String compression_method;
int compression_level = kDefaultCompressionLevel;
String password;
};

}
11 changes: 9 additions & 2 deletions src/Backups/BackupFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ BackupMutablePtr BackupFactory::createBackup(const CreateParams & params) const
auto it = creators.find(engine_name);
if (it == creators.end())
throw Exception(ErrorCodes::BACKUP_ENGINE_NOT_FOUND, "Not found backup engine {}", engine_name);
return (it->second)(params);
BackupMutablePtr backup = (it->second)(params);
backup->open(params.open_mode);
return backup;
}

void BackupFactory::registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn)
Expand All @@ -31,7 +33,12 @@ void BackupFactory::registerBackupEngine(const String & engine_name, const Creat
creators[engine_name] = creator_fn;
}

void registerBackupEngines(BackupFactory & factory);
void registerBackupEnginesFileAndDisk(BackupFactory &);

void registerBackupEngines(BackupFactory & factory)
{
registerBackupEnginesFileAndDisk(factory);
}

BackupFactory::BackupFactory()
{
Expand Down
3 changes: 3 additions & 0 deletions src/Backups/BackupFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class BackupFactory : boost::noncopyable
OpenMode open_mode = OpenMode::WRITE;
BackupInfo backup_info;
std::optional<BackupInfo> base_backup_info;
String compression_method;
int compression_level = -1;
String password;
ContextPtr context;
};

Expand Down
80 changes: 58 additions & 22 deletions src/Backups/BackupImpl.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <Backups/BackupImpl.h>
#include <Backups/BackupImpl.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupEntryConcat.h>
#include <Backups/BackupEntryFromCallback.h>
Expand Down Expand Up @@ -47,34 +47,44 @@ namespace
}
}

BackupImpl::BackupImpl(const String & backup_name_, OpenMode open_mode_, const ContextPtr & context_, const std::optional<BackupInfo> & base_backup_info_)
: backup_name(backup_name_), open_mode(open_mode_), context(context_), base_backup_info(base_backup_info_)
BackupImpl::BackupImpl(const String & backup_name_, const ContextPtr & context_, const std::optional<BackupInfo> & base_backup_info_)
: backup_name(backup_name_), context(context_), base_backup_info_param(base_backup_info_)
{
}

BackupImpl::~BackupImpl() = default;

void BackupImpl::open()
void BackupImpl::open(OpenMode open_mode_)
{
if (open_mode == OpenMode::WRITE)
std::lock_guard lock{mutex};
if (open_mode == open_mode_)
return;

if (open_mode != OpenMode::NONE)
throw Exception("Backup is already opened", ErrorCodes::LOGICAL_ERROR);

if (open_mode_ == OpenMode::WRITE)
{
if (backupExists())
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", getName());

timestamp = std::time(nullptr);
uuid = UUIDHelpers::generateV4();

startWriting();
writing_started = true;
writing_finalized = false;
}

if (open_mode == OpenMode::READ)
if (open_mode_ == OpenMode::READ)
{
if (!backupExists())
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", getName());
readBackupMetadata();
}

openImpl(open_mode_);

base_backup_info = base_backup_info_param;
if (open_mode_ == OpenMode::READ)
readBackupMetadata();

if (base_backup_info)
{
BackupFactory::CreateParams params;
Expand All @@ -83,25 +93,43 @@ void BackupImpl::open()
params.context = context;
base_backup = BackupFactory::instance().createBackup(params);

if (open_mode == OpenMode::WRITE)
if (open_mode_ == OpenMode::WRITE)
base_backup_uuid = base_backup->getUUID();
else if (base_backup_uuid != base_backup->getUUID())
throw Exception(ErrorCodes::WRONG_BASE_BACKUP, "Backup {}: The base backup {} has different UUID ({} != {})",
getName(), base_backup->getName(), toString(base_backup->getUUID()), (base_backup_uuid ? toString(*base_backup_uuid) : ""));
}

open_mode = open_mode_;
}

void BackupImpl::close()
{
if (open_mode == OpenMode::WRITE)
{
if (writing_started && !writing_finalized)
{
/// Creating of the backup wasn't finished correctly,
/// so the backup cannot be used and it's better to remove its files.
removeAllFilesAfterFailure();
}
}
std::lock_guard lock{mutex};
if (open_mode == OpenMode::NONE)
return;

closeImpl(writing_finalized);

uuid = UUIDHelpers::Nil;
timestamp = 0;
base_backup_info.reset();
base_backup.reset();
base_backup_uuid.reset();
file_infos.clear();
open_mode = OpenMode::NONE;
}

IBackup::OpenMode BackupImpl::getOpenMode() const
{
std::lock_guard lock{mutex};
return open_mode;
}

time_t BackupImpl::getTimestamp() const
{
std::lock_guard lock{mutex};
return timestamp;
}

void BackupImpl::writeBackupMetadata()
Expand Down Expand Up @@ -244,6 +272,9 @@ UInt128 BackupImpl::getFileChecksum(const String & file_name) const
BackupEntryPtr BackupImpl::readFile(const String & file_name) const
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::READ)
throw Exception("Backup is not opened for reading", ErrorCodes::LOGICAL_ERROR);

auto it = file_infos.find(file_name);
if (it == file_infos.end())
throw Exception(
Expand Down Expand Up @@ -329,7 +360,7 @@ void BackupImpl::addFile(const String & file_name, BackupEntryPtr entry)
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR);

if (file_infos.contains(file_name))
throw Exception(
Expand Down Expand Up @@ -467,8 +498,13 @@ void BackupImpl::addFile(const String & file_name, BackupEntryPtr entry)

void BackupImpl::finalizeWriting()
{
std::lock_guard lock{mutex};
if (writing_finalized)
return;

if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
throw Exception("Backup is not opened for writing", ErrorCodes::LOGICAL_ERROR);

writeBackupMetadata();
writing_finalized = true;
}
Expand Down
Loading