Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/XrdCeph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

XrdCeph
============

The XrdCeph Plugin allows XRootD to access a ceph object store directly (trough the RADOS ceph interface) and provides a posix-like frontend for such storges to work without mounting a cephFS frontend into the server.
Writing a file is done with the libradosstriper library for higher performance over large files, while reading can be optionally configured to use RADOS directly, improving performance.

This provides significant performance and stability benefits, especially for large distributed storages (>30PB), as it avoids the metadata overhead to maintain folder structures storage side.

Configuration
-------------

The following configuration options are available:

Required for enabling the plugin:
```
ofs.osslib +cksio /usr/lib64/libXrdCeph.so user@,nbStripes,stripeUnit,objectSize
ofs.xattrlib /usr/lib64/libXrdCephXattr.so
```

These parameters set the default values for the ceph operations.
user - username for the file operations, usually xrootd
nbStripes - number of stripes per file, recommended value 1
stripeUnit - size of stripes
objectSize - size of objects

Config Options:
```
ceph.buffermaxpersimul N # max number of simultaneous buffers, default 10
ceph.nbconnections N # max number of handlers to the cluster (max 100)
ceph.usedefaultpreadalg [0/1] # flag to enable default readv algorithm
ceph.aiowaitthresh N # aio wait timeout
ceph.usebuffer [0/1] # flag if to use IO buffers when interacting with storage
ceph.buffersize N # size of the buffer in bytes (max 1000000000)
ceph.buffermaxpersimul # size of the buffer in bytes for multibuffers (max 1000000000)
ceph.usereadv [0/1] # use optimized readv code with direct IO
ceph.readvalgname <ALGNAME> # select readv algorithm, recommended passtrough
ceph.bufferiomode [aio/io] # select buffer io mode, recommended io
ceph.reportingpools <LIST OF POOLS> # select pools where data metrics are enabled (for spaceinfo queries)
ceph.namelib NAMELIB file:NAMELIB_CONF?protocol=PROT_LIST # XrdCeph namelib option to enable name2name mapping (XrdCmstfc)
ceph.streamed-cks-adler32 [calc/log/store] # enable streamed checksums and either calculate them (calc), log them as well (log) or log and store them into metadata (store)
ceph.streamed-cks-logfile LOG_FILE_PATH # set up logfile location for streamed checksum log and store modes
```



Additional configs needed
---------------------------
A ceph client keyring and config file must be present in the server's /etc/ceph/ location to allow connection to the cluster.
The xrootd client keyring is usually named ceph.client.xrootd.keyring and sets the access to the pools for the xrootd user.

76 changes: 73 additions & 3 deletions src/XrdCeph/XrdCephOss.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ssize_t getNumericAttr(const char* const path, const char* attrName, const int m
return retval;

}
char *g_cksLogFileName;

extern FILE *g_cksLogFile;

extern "C"
{
Expand Down Expand Up @@ -165,6 +168,12 @@ XrdCephOss::~XrdCephOss() {
extern unsigned int g_maxCephPoolIdx;
extern unsigned int g_cephAioWaitThresh;

extern bool g_calcStreamedAdler32;
extern bool g_storeStreamedAdler32;
extern bool g_logStreamedAdler32;



int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
int NoGo = 0;
XrdOucEnv myEnv;
Expand Down Expand Up @@ -348,7 +357,7 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) {
Eroute.Emsg("Config", "readvalgname parameters will be ignored");
}
m_configBufferIOmode = var; // allowed values would be aio, io
m_configBufferIOmode = var; // allowed values would be aio, io, write-only-io
} else {
Eroute.Emsg("Config", "Missing value for ceph.bufferiomode in config file", configfn);
return 1;
Expand All @@ -361,9 +370,59 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
m_configPoolnames = var;
} else {
Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn);
return 1;
return 1;
}
}
}
if (!strcmp(var, "ceph.streamed-cks-adler32")) { // Streaming Adler32 checksum

var = Config.GetWord();
if (var) {
/*
* Currently, actions are simply additive:
*
* Store implies calculate, log, store
* Log implies calculate, log
* Calc implies calculate
*
* Might want to make e.g. logging optional in the future,
* when storing is more prevalent.
*
* Instead of setting g_* flags in three conditionals,
* can switch to setting values in a single bitfield flag
*
*/
if (strstr(var, "calc")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = false;
g_storeStreamedAdler32 = false;
}
if (strstr(var, "log")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = true;
g_storeStreamedAdler32 = false;
}
if (strstr(var, "store")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = true;
g_storeStreamedAdler32 = true;
}

}
}// "ceph.streamed-cks-adler32"

if (!strcmp(var, "ceph.streamed-cks-logfile") ) {
var = Config.GetWord();
if (var) {
g_cksLogFileName = strdup(var);
} else {
const char *defLogFileName = "/tmp/checksums.log"; // To-DO: Move defLogFileName so it can also be used as fallback
// when attempt to open specified log file below fails
Eroute.Emsg("Config", "Missing value for ceph.streamed-cks-logfile in config file, setting to default = ", defLogFileName);
g_cksLogFileName = strdup(defLogFileName);
return 1;
}
}// "ceph.streamed-cks-logfile"

} // while

// Now check if any errors occurred during file i/o
Expand All @@ -374,6 +433,17 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
configfn);
}
Config.Close();

if (g_logStreamedAdler32) {
if (NULL == (g_cksLogFile = fopen(g_cksLogFileName, "a"))) {
g_logStreamedAdler32 = false;
Eroute.Emsg("Config: ", "cannot open file for logging checksum values and pathname", g_cksLogFileName);
return 1;
} else {
Eroute.Emsg("Config: ", "Opened file for logging checksum values and pathname: ", g_cksLogFileName);
}
}

}
return NoGo;
}
Expand Down
8 changes: 6 additions & 2 deletions src/XrdCeph/XrdCephOssBufferedFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) {
ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) {
size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());

if (m_bufferIOmode == "write-only-io") {
return m_xrdOssDF->Read(buff, offset, blen);
}

IXrdCephBufferAlg * buffer{nullptr};
// check for, and create if needed, a buffer
{
Expand Down Expand Up @@ -326,11 +330,11 @@ std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::create
std::unique_ptr<ICephIOAdapter> cephio;
if (m_bufferIOmode == "aio") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
} else if (m_bufferIOmode == "io") {
} else if (m_bufferIOmode == "io" || m_bufferIOmode == "write-only-io") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
!m_cephoss->m_useDefaultPreadAlg));
} else {
BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io|write-only-io " );
m_xrdOssDF->Close();
return bufferAlg; // invalid instance;
}
Expand Down
Loading
Loading