Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions src/XrdCeph/XrdCephOss.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ssize_t getNumericAttr(const char* const path, const char* attrName, const int m
return retval;

}
char *g_cksLogFileName;

extern FILE *g_cksLogFile;

extern "C"
{
Expand Down Expand Up @@ -165,6 +168,12 @@ XrdCephOss::~XrdCephOss() {
extern unsigned int g_maxCephPoolIdx;
extern unsigned int g_cephAioWaitThresh;

extern bool g_calcStreamedAdler32;
extern bool g_storeStreamedAdler32;
extern bool g_logStreamedAdler32;



int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
int NoGo = 0;
XrdOucEnv myEnv;
Expand Down Expand Up @@ -348,7 +357,7 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) {
Eroute.Emsg("Config", "readvalgname parameters will be ignored");
}
m_configBufferIOmode = var; // allowed values would be aio, io
m_configBufferIOmode = var; // allowed values would be aio, io, write-only-io
} else {
Eroute.Emsg("Config", "Missing value for ceph.bufferiomode in config file", configfn);
return 1;
Expand All @@ -361,9 +370,59 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
m_configPoolnames = var;
} else {
Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn);
return 1;
return 1;
}
}
}
if (!strcmp(var, "ceph.streamed-cks-adler32")) { // Streaming Adler32 checksum

var = Config.GetWord();
if (var) {
/*
* Currently, actions are simply additive:
*
* Store implies calculate, log, store
* Log implies calculate, log
* Calc implies calculate
*
* Might want to make e.g. logging optional in the future,
* when storing is more prevalent.
*
* Instead of setting g_* flags in three conditionals,
* can switch to setting values in a single bitfield flag
*
*/
if (strstr(var, "calc")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = false;
g_storeStreamedAdler32 = false;
}
if (strstr(var, "log")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = true;
g_storeStreamedAdler32 = false;
}
if (strstr(var, "store")) {
g_calcStreamedAdler32 = true;
g_logStreamedAdler32 = true;
g_storeStreamedAdler32 = true;
}

}
}// "ceph.streamed-cks-adler32"

if (!strcmp(var, "ceph.streamed-cks-logfile") ) {
var = Config.GetWord();
if (var) {
g_cksLogFileName = strdup(var);
} else {
const char *defLogFileName = "/tmp/checksums.log"; // To-DO: Move defLogFileName so it can also be used as fallback
// when attempt to open specified log file below fails
Eroute.Emsg("Config", "Missing value for ceph.streamed-cks-logfile in config file, setting to default = ", defLogFileName);
g_cksLogFileName = strdup(defLogFileName);
return 1;
}
}// "ceph.streamed-cks-logfile"

} // while

// Now check if any errors occurred during file i/o
Expand All @@ -374,6 +433,17 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
configfn);
}
Config.Close();

if (g_logStreamedAdler32) {
if (NULL == (g_cksLogFile = fopen(g_cksLogFileName, "a"))) {
g_logStreamedAdler32 = false;
Eroute.Emsg("Config: ", "cannot open file for logging checksum values and pathname", g_cksLogFileName);
return 1;
} else {
Eroute.Emsg("Config: ", "Opened file for logging checksum values and pathname: ", g_cksLogFileName);
}
}

}
return NoGo;
}
Expand Down
8 changes: 6 additions & 2 deletions src/XrdCeph/XrdCephOssBufferedFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) {
ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) {
size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());

if (m_bufferIOmode == "write-only-io") {
return m_xrdOssDF->Read(buff, offset, blen);
}

IXrdCephBufferAlg * buffer{nullptr};
// check for, and create if needed, a buffer
{
Expand Down Expand Up @@ -326,11 +330,11 @@ std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::create
std::unique_ptr<ICephIOAdapter> cephio;
if (m_bufferIOmode == "aio") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
} else if (m_bufferIOmode == "io") {
} else if (m_bufferIOmode == "io" || m_bufferIOmode == "write-only-io") {
cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
!m_cephoss->m_useDefaultPreadAlg));
} else {
BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io|write-only-io " );
m_xrdOssDF->Close();
return bufferAlg; // invalid instance;
}
Expand Down
135 changes: 131 additions & 4 deletions src/XrdCeph/XrdCephPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,49 @@
#include "XrdCeph/XrdCephPosix.hh"
#include "XrdCeph/XrdCephBulkAioRead.hh"
#include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status
#include "XrdCks/XrdCksData.hh"

#include <XrdCks/XrdCksAssist.hh>

char *ts_rfc3339() {

std::time_t now = std::time({});
char timeString[std::size("yyyy-mm-dd hh:mm:ss")];
std::strftime(std::data(timeString), std::size(timeString),
"%F %TZ", std::gmtime(&now));
return strdup(timeString);
}

constexpr char hex2ascii(char nibble) { return (0<= nibble && nibble<=9) ? nibble+'0' : nibble-10+'a'; }
constexpr char hiNibble(uint8_t hexbyte) { return (hexbyte & 0xf0) >> 4; }
constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); }

constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){

char asciiVal[9] {};
for (unsigned int i = 0, j = 0; i < length; i++) {

const uint8_t hexbyte = bytes[i];
asciiVal[j++] = hex2ascii(hiNibble(hexbyte));
asciiVal[j++] = hex2ascii(loNibble(hexbyte));

}
return strdup(asciiVal);
}

using namespace std;

int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) {

int rc = -1;

std::vector<char> attrData = XrdCksAttrData(cstype, ckSumbuf, time(0));

rc = ceph_posix_fsetxattr(fd, XrdCksAttrName(cstype).c_str(),
attrData.data(), attrData.size(), 0);

return rc;
}

/// small struct for directory listing
struct DirIterator {
Expand Down Expand Up @@ -111,6 +153,17 @@ XrdSysMutex g_init_mutex;
//JW Counter for number of times a given cluster is resolved.
std::map<unsigned int, unsigned long long> g_idxCntr;

//IJJ: Actions for Adler32 checksum

extern bool g_calcStreamedAdler32;
bool g_calcStreamedAdler32 = false;
extern bool g_logStreamedAdler32;
bool g_logStreamedAdler32 = false;
extern bool g_storeStreamedAdler32;
bool g_storeStreamedAdler32 = false;

FILE *g_cksLogFile;

/// Accessor to next ceph pool index
/// Note that this is not thread safe, but we do not care
/// as we only want a rough load balancing
Expand Down Expand Up @@ -529,6 +582,7 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con
}
int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx);
if (rc != 0) {
logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, user@pool = %s", userAtPool.c_str());
logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
cluster->shutdown();
delete cluster;
Expand Down Expand Up @@ -748,7 +802,13 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode
}
}
}

// At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it
if (g_calcStreamedAdler32) {
fr.cksCalcadler32 = new XrdCksCalcadler32();
fr.cksCalcadler32->Init();
}
fr.writingData = true;
int fd = insertFileRef(fr);
logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname);
return fd;
Expand All @@ -757,6 +817,17 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode

}


const char* formatAdler32(unsigned long adler32) {

#ifndef Xrd_Big_Endian
adler32 = htonl(adler32);
#endif
char adler32Cks[8+1];
sprintf(adler32Cks, "%08lx", adler32);
return (const char*)strdup(adler32Cks);
}

int ceph_posix_close(int fd) {
CephFileRef* fr = getFileRef(fd);
if (fr) {
Expand All @@ -769,6 +840,9 @@ int ceph_posix_close(int fd) {
lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec)
+ 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec);
}
if (fr->bytesWritten > 0){
ceph_posix_fremovexattr(fd,"XrdCks.adler32");
}
logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
"async write ops %d/%d, async pending write bytes %ld, "
"async read ops %d/%d, bytes written/max offset %ld/%ld, "
Expand All @@ -777,6 +851,32 @@ int ceph_posix_close(int fd) {
fr->asyncWrCompletionCount, fr->asyncWrStartCount, fr->bytesAsyncWritePending,
fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten,
fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge));

if (fr->writingData) {
if (g_calcStreamedAdler32) {

unsigned long adlerULong;
memcpy((&adlerULong), fr->cksCalcadler32->Final(), 4);
const char* adler32Cks = formatAdler32(adlerULong);

logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32Cks);

if (g_logStreamedAdler32) {
const char *path = strdup((fr->pool + ":" + fr->name).c_str());
fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "streamed", "adler32", adler32Cks);
fflush(g_cksLogFile);
}

if (g_storeStreamedAdler32) {
int rc = setXrdCksAttr(fd, "adler32", adler32Cks);
if (rc != 0) {
logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum");
}
}
delete fr->cksCalcadler32;
}
}

deleteFileRef(fd, *fr);
return 0;
} else {
Expand Down Expand Up @@ -838,6 +938,9 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
fr->wrcount++;
fr->bytesWritten+=count;
if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten);
if (g_calcStreamedAdler32) {
fr->cksCalcadler32->Update((const char*)buf, count);
}
return count;
} else {
return -EBADF;
Expand All @@ -848,7 +951,8 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
CephFileRef* fr = getFileRef(fd);
if (fr) {
// TODO implement proper logging level for this plugin - this should be only debug
//logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
//logwrapper((char*)"ceph_posix_pwrite: for fd %d, count=%d", fd, count);

if ((fr->flags & O_ACCMODE) == O_RDONLY) {
return -EBADF;
}
Expand All @@ -859,11 +963,17 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
ceph::bufferlist bl;
bl.append((const char*)buf, count);
int rc = striper->write(fr->name, bl, count, offset);

if (rc) return rc;
XrdSysMutexHelper lock(fr->statsMutex);
fr->wrcount++;
fr->bytesWritten+=count;
if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten);

if (g_calcStreamedAdler32) {
fr->cksCalcadler32->Update((const char*)buf, count);
}

return count;
} else {
return -EBADF;
Expand Down Expand Up @@ -908,7 +1018,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
const char *buf = (const char*)aiop->sfsAio.aio_buf;
size_t offset = aiop->sfsAio.aio_offset;
// TODO implement proper logging level for this plugin - this should be only debug
//logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
if ((fr->flags & O_ACCMODE) == O_RDONLY) {
return -EBADF;
}
Expand Down Expand Up @@ -938,6 +1048,9 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
fr->asyncWrStartCount++;
::gettimeofday(&fr->lastAsyncSubmission, nullptr);
fr->bytesAsyncWritePending+=count;
if (g_calcStreamedAdler32) {
fr->cksCalcadler32->Update((const char*)buf, count);
}
return rc;
} else {
return -EBADF;
Expand Down Expand Up @@ -1349,8 +1462,23 @@ static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* na
ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path,
const char* name, const void* value,
size_t size, int flags) {
int rc;

auto *cks = (XrdCksData*)value;
logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value);
return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags);
rc = ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags);

if (0 == rc && !strcmp(name, "XrdCks.adler32") && g_logStreamedAdler32) {
//
// We know that streamed checksums use ceph_posix_fsetxattr below, so this must be a readback checksum
//
auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length);
logwrapper((char*)"readback checksum = %s", cksAscii);
fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "readback", "adler32", cksAscii);
fflush(g_cksLogFile);

}
return rc;
}

int ceph_posix_fsetxattr(int fd,
Expand Down Expand Up @@ -1543,7 +1671,6 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) {
logwrapper((char*)"ceph_posix_unlink : %s", pathname);
// start the timer
auto timer_start = std::chrono::steady_clock::now();

// minimal stat : only size and times are filled
CephFile file = getCephFile(pathname, env);
libradosstriper::RadosStriper *striper = getRadosStriper(file);
Expand Down
Loading
Loading