Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/XProtocol/XProtocol.hh
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ enum XQueryType {
kXR_Qckscan= 6,
kXR_Qconfig= 7,
kXR_Qvisa = 8,
kXR_Qhead = 9,
kXR_Qopaque=16,
kXR_Qopaquf=32,
kXR_Qopaqug=64
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClFileSystem.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace XrdCl
Space = kXR_Qspace, //!< Query logical space stats
Stats = kXR_QStats, //!< Query server stats
Visa = kXR_Qvisa, //!< Query file visa attributes
Head = kXR_Qhead, //!< Query http header response
XAttr = kXR_Qxattr //!< Query file extended attributes
};
};
Expand Down
15 changes: 15 additions & 0 deletions src/XrdOuc/XrdOucCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "XrdOuc/XrdOucCacheStats.hh"
#include "XrdOuc/XrdOucIOVec.hh"
#include "XrdCl/XrdClBuffer.hh"

struct stat;
class XrdOucEnv;
Expand Down Expand Up @@ -147,6 +148,20 @@ long long FSize() = 0;

virtual int Fstat(struct stat &sbuff) {(void)sbuff; return 1;}


//------------------------------------------------------------------------------
//! Perform an fcntl() operation (defaults to passthrough).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this documentation needs to be changed, it is mostly copy of Fstat.
i'm not sure if saying that it defaults to passthrough makes sense ... we need to see if there will be some Fcntl calls that need to be handled by the cache / open-file / IO.

//!
//! @param AMT, for the moment XrdCl::Buffer to pass query code value and
//! XrdCl::Buffer to pass the string response. The XrdCL::Buffers is
//! interpreted as std::string
//!
//! @return <0 - fstat failed, value is -errno.
//! =0 - fstat succeeded, sbuff holds stat information.
//! >0 - fstat could not be done, forward operation to next level.
//------------------------------------------------------------------------------
virtual int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return -1; }

//-----------------------------------------------------------------------------
//! Get the file's location (i.e. endpoint hostname and port)
//!
Expand Down
119 changes: 118 additions & 1 deletion src/XrdPfc/XrdPfc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
#include <sys/statvfs.h>

#include "XrdCl/XrdClURL.hh"
#include "XrdCl/XrdClFileSystem.hh"
#include "XrdCl/XrdClFileStateHandler.hh"

#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdOuc/XrdOucPrivateUtils.hh"
#include "XrdOuc/XrdOucJson.hh"

#include "XrdSys/XrdSysTimer.hh"
#include "XrdSys/XrdSysTrace.hh"
Expand Down Expand Up @@ -425,6 +428,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f
}

// This is always true, now that IOFileBlock is unsupported.

if (filesize == 0)
{
struct stat st;
Expand All @@ -444,7 +448,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f

if (filesize >= 0)
{
file = File::FileOpen(path, off, filesize);
file = File::FileOpen(path, off, filesize, io->GetInput());
}

{
Expand Down Expand Up @@ -904,6 +908,20 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen,
return -ENOENT;
}

//______________________________________________________________________________
// If supported, write Cache-Control as xattr to cinfo file.
// One can use file descriptor or full path interchangeably
//------------------------------------------------------------------------------
void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc)
{
if (m_metaXattr) {
int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
if (res != 0) {
TRACE(Error, "WritecacheControlXAttr error setting xattr " << res);
}
}
}

//______________________________________________________________________________
// If supported, write file_size as xattr to cinfo file.
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -958,6 +976,47 @@ long long Cache::DetermineFullFileSize(const std::string &cinfo_fname)
return ret;
}

//______________________________________________________________________________
// Get cache control attributes from the corresponding cinfo-file name.
// Returns -error on failure.
//------------------------------------------------------------------------------
int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& ival)
{
if (m_metaXattr) {

char pfn[4096];
m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);

char cc[512];
int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1);
if (res > 0)
{
std::string tmp(cc, res);
ival = tmp;
}
return res;
}
return 0;
}

//______________________________________________________________________________
// Get cache control attributes from the corresponding cinfo-file name.
// Returns -error on failure.
//------------------------------------------------------------------------------
int Cache::GetCacheControlXAttr(int fd, std::string& ival)
{
if (m_metaXattr) {
char cc[512];
int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd);
if (res > 0)
{
ival = std::string(cc, res);
return res;
}
}
return 0;
}

//______________________________________________________________________________
// Calculate if the file is to be considered cached for the purposes of
// only-if-cached and setting of atime of the Stat() calls.
Expand Down Expand Up @@ -1097,6 +1156,64 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode)
if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
{
TRACE(Dump, "Prepare defer open " << f_name);

std::string icc;
if (GetCacheControlXAttr(i_name, icc) > 0) {
using namespace nlohmann;
json cc_json = json::parse(icc);

bool mustRevalidate = cc_json.contains("revalidate") && (cc_json["revalidate"] == true);
bool hasExpired = false;
if (cc_json.contains("expire"))
{
time_t current_time;
current_time = time(NULL);
if (current_time > cc_json["expire"])
hasExpired = true;
}

bool ccIsValid = true;

if (cc_json.contains("ETag") && (mustRevalidate || hasExpired)) {
// Compare cinfo xattr etag and the etag from file system query response
// Note: qeury returns only etag value, not a json string
XrdCl::FileSystem fs(url);
XrdCl::Buffer queryArgs(1024); // pass file path throug args: reserve bytes to store path
queryArgs.FromString(curl);
XrdCl::Buffer *response = nullptr;
XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Head, queryArgs, response);

if (st.IsOK())
{
std::string etag = response->ToString();
ccIsValid = (etag == cc_json["ETag"]);
TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << ccIsValid);

// update expiration time if Etag is valid
if (cc_json.contains("max-age"))
{
time_t ma = cc_json["max-age"];
cc_json["expire"] = ma + time(NULL);
char pfn[4096];
m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096);
WriteCacheControlXAttr(-1, pfn, cc_json.dump());
}
}
else
{
// Message has a status beacuse we are in the block condition for cache-contol xattr
TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str());
ccIsValid = false;
}
}

if (!ccIsValid)
{
// invalidate cinfo on ETag mismatch
UnlinkFile(f_name, false);
}
} // end chekcing cache control xattr in cinfo file

return 1;
}
else
Expand Down
4 changes: 4 additions & 0 deletions src/XrdPfc/XrdPfc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ public:
virtual int ConsiderCached(const char *url);

bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk);
void WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc);
void WriteFileSizeXAttr(int cinfo_fd, long long file_size);
long long DetermineFullFileSize(const std::string &cinfo_fname);
int GetCacheControlXAttr(const std::string &cinfo_fname, std::string& res);
int GetCacheControlXAttr(int fd, std::string& res);


//--------------------------------------------------------------------
//! \brief Makes decision if the original XrdOucCacheIO should be cached.
Expand Down
37 changes: 34 additions & 3 deletions src/XrdPfc/XrdPfcFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
#include "XrdSys/XrdSysTimer.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucJson.hh"
#include "XrdSfs/XrdSfsInterface.hh"

#include "XrdCl/XrdClFileStateHandler.hh"

#include <cstdio>
#include <sstream>
#include <fcntl.h>
Expand Down Expand Up @@ -135,10 +138,10 @@ void File::Close()

//------------------------------------------------------------------------------

File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO)
{
File *file = new File(path, offset, fileSize);
if ( ! file->Open())
if ( ! file->Open(inputIO))
{
delete file;
file = 0;
Expand Down Expand Up @@ -420,7 +423,7 @@ void File::RemoveIO(IO *io)

//------------------------------------------------------------------------------

bool File::Open()
bool File::Open(XrdOucCacheIO* inputIO)
{
// Sets errno accordingly.

Expand Down Expand Up @@ -531,6 +534,34 @@ bool File::Open()
m_cfi.Write(m_info_file, ifn.c_str());
m_info_file->Fsync();
cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);

// access and write cache-control attributes
XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::Head;
XrdCl::Buffer queryArgs(5);
std::string qs = std::to_string(queryCode);
queryArgs.FromString(qs);
XrdCl::Buffer *responseFctl = nullptr;
int resFctl = inputIO->Fcntl(queryArgs, responseFctl);
if (resFctl == 0)
{
std::string cc_str = responseFctl->ToString();
nlohmann::json cc_json = nlohmann::json::parse(cc_str);
if (cc_json.contains("max-age"))
{
time_t ma = cc_json["max-age"];
ma += time(NULL);
cc_json["expire"] = ma;
cc_str = cc_json.dump();
}
TRACE(Error, "GetFile() XrdCl::File::Fcntl value " << cc_str);
cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str);
}
else if (resFctl != kXR_Unsupported)
{
// Query XrdCl::QueryCode::Head is optional, print error only if informatin is supported
TRACE(Error, "GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::Head failed " << inputIO->Path());
}

TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/XrdPfc/XrdPfcFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public:
// Constructor, destructor, Open() and Close() are private.

//! Static constructor that also does Open. Returns null ptr if Open fails.
static File* FileOpen(const std::string &path, long long offset, long long fileSize);
static File* FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO*);

//! Handle removal of a block from Cache's write queue.
void BlockRemovedFromWriteQ(Block*);
Expand Down Expand Up @@ -301,7 +301,7 @@ private:
void Close();

//! Open file handle for data file and info file on local disk.
bool Open();
bool Open(XrdOucCacheIO* inputOrigin);

static const char *m_traceID;

Expand Down
7 changes: 7 additions & 0 deletions src/XrdPosix/XrdPosixFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,13 @@ int XrdPosixFile::Fstat(struct stat &buf)
buf.st_mode = myMode;
return 0;
}

int XrdPosixFile::Fcntl(const XrdCl::Buffer &arg, XrdCl::Buffer *&response)
{
// AMT: temporary solution to handle unsuported operations in XrdPfc::File::Open()
XrdCl::XRootDStatus status = clFile.Fcntl(arg, response);
return status.IsOK() ? 0 : status.errNo;
}

/******************************************************************************/
/* H a n d l e R e s p o n s e */
Expand Down
2 changes: 2 additions & 0 deletions src/XrdPosix/XrdPosixFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ static void DelayedDestroy(XrdPosixFile *fp);

int Fstat(struct stat &buf) override;

int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) override;

const char *Location(bool refresh=false) override;

void HandleResponse(XrdCl::XRootDStatus *status,
Expand Down
2 changes: 2 additions & 0 deletions src/XrdPosix/XrdPosixPrepIO.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ long long FSize() {return (Init() ? fileP->FSize() : openRC);}
int Fstat(struct stat &buf)
{return (Init() ? fileP->Fstat(buf) : openRC);}

int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return (Init() ? fileP->Fcntl(args, res) : openRC); }

int Open() {Init(); return openRC;}

const char *Path() {return fileP->Path();}
Expand Down
Loading