Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/value_parsing.h"

namespace arrow::fs {

Expand Down Expand Up @@ -3579,9 +3578,10 @@ S3GlobalOptions S3GlobalOptions::Defaults() {
log_level = S3LogLevel::Off;
}

value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) {
num_event_loop_threads = u;
auto maybe_num_threads =
arrow::internal::GetEnvVarInteger("ARROW_S3_THREADS", /*min_value=*/1);
if (maybe_num_threads.ok()) {
num_event_loop_threads = static_cast<int>(*maybe_num_threads);
}
return S3GlobalOptions{log_level, num_event_loop_threads};
}
Expand Down
24 changes: 8 additions & 16 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,23 +390,15 @@ namespace {
constexpr int kDefaultNumIoThreads = 8;

std::shared_ptr<ThreadPool> MakeIOThreadPool() {
int threads = 0;
auto maybe_env_var = ::arrow::internal::GetEnvVar("ARROW_IO_THREADS");
if (maybe_env_var.ok()) {
auto str = *std::move(maybe_env_var);
if (!str.empty()) {
try {
threads = std::stoi(str);
} catch (...) {
}
if (threads <= 0) {
ARROW_LOG(WARNING)
<< "ARROW_IO_THREADS does not contain a valid number of threads "
"(should be an integer > 0)";
}
}
int threads = kDefaultNumIoThreads;
auto maybe_num_threads = ::arrow::internal::GetEnvVarInteger(
"ARROW_IO_THREADS", /*min_value=*/1, /*max_value=*/std::numeric_limits<int>::max());
if (maybe_num_threads.ok()) {
threads = static_cast<int>(*maybe_num_threads);
} else if (!maybe_num_threads.status().IsKeyError()) {
maybe_num_threads.status().Warn();
}
auto maybe_pool = ThreadPool::MakeEternal(threads > 0 ? threads : kDefaultNumIoThreads);
auto maybe_pool = ThreadPool::MakeEternal(threads);
if (!maybe_pool.ok()) {
maybe_pool.status().Abort("Failed to create global IO thread pool");
}
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,21 +660,24 @@ LocaleGuard::LocaleGuard(const char* new_locale) : impl_(new Impl(new_locale)) {

LocaleGuard::~LocaleGuard() {}

EnvVarGuard::EnvVarGuard(const std::string& name, const std::string& value)
: name_(name) {
auto maybe_value = arrow::internal::GetEnvVar(name);
EnvVarGuard::EnvVarGuard(std::string name, std::optional<std::string> value)
: name_(std::move(name)) {
auto maybe_value = arrow::internal::GetEnvVar(name_);
if (maybe_value.ok()) {
was_set_ = true;
old_value_ = *std::move(maybe_value);
} else {
was_set_ = false;
old_value_ = std::nullopt;
}
if (value.has_value()) {
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *value));
} else {
ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
}
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name, value));
}

EnvVarGuard::~EnvVarGuard() {
if (was_set_) {
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, old_value_));
if (old_value_.has_value()) {
ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *old_value_));
} else {
ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
}
Expand Down
21 changes: 14 additions & 7 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,11 @@ ARROW_TESTING_EXPORT
void AssertChildExit(int child_pid, int expected_exit_status = 0);
#endif

// A RAII-style object that switches to a new locale, and switches back
// to the old locale when going out of scope. Doesn't do anything if the
// new locale doesn't exist on the local machine.
// A RAII-style object that temporarily switches to a new locale
//
// The guard switches back to the old locale when going out of scope.
// It doesn't do anything if the new locale doesn't exist on the local machine.
//
// ATTENTION: may crash with an assertion failure on Windows debug builds.
// See ARROW-6108, also https://gerrit.libreoffice.org/#/c/54110/
class ARROW_TESTING_EXPORT LocaleGuard {
Expand All @@ -433,15 +435,20 @@ class ARROW_TESTING_EXPORT LocaleGuard {
std::unique_ptr<Impl> impl_;
};

// A RAII-style object that temporarily sets an environment variable
//
// The guard restores the variable's previous value when going out of scope,
// or deletes the variable if it was not initially set.
// The environment variable can also be temporarily deleted if std::nullopt
// is passed instead of a string value.
class ARROW_TESTING_EXPORT EnvVarGuard {
public:
EnvVarGuard(const std::string& name, const std::string& value);
EnvVarGuard(std::string name, std::optional<std::string> value);
~EnvVarGuard();

protected:
const std::string name_;
std::string old_value_;
bool was_set_;
std::string name_;
std::optional<std::string> old_value_;
};

namespace internal {
Expand Down
22 changes: 21 additions & 1 deletion cpp/src/arrow/util/atfork_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ namespace internal {

namespace {

bool IsAtForkEnabled() {
static bool is_enabled = [] {
auto maybe_value =
GetEnvVarInteger("ARROW_REGISTER_ATFORK", /*min_value=*/0, /*max_value=*/1);
if (maybe_value.ok()) {
return *maybe_value != 0;
}
if (!maybe_value.status().IsKeyError()) {
maybe_value.status().Warn();
}
// Enabled by default
return true;
}();
return is_enabled;
}

// Singleton state for at-fork management.
// We do not use global variables because of initialization order issues (ARROW-18383).
// Instead, a function-local static ensures the state is initialized
Expand Down Expand Up @@ -147,7 +163,11 @@ AtForkState* GetAtForkState() {
}; // namespace

void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
GetAtForkState()->RegisterAtFork(std::move(weak_handler));
// Only fetch the atfork state (and thus lazily call pthread_atfork) if enabled at all,
// to minimize potential nastiness with fork and threads.
if (IsAtForkEnabled()) {
GetAtForkState()->RegisterAtFork(std::move(weak_handler));
}
}

} // namespace internal
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/atfork_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ TEST_F(TestAtFork, SingleThread) {
ASSERT_THAT(child_after_, ElementsAre());
}

// XXX we would like to test the ARROW_REGISTER_ATFORK environment variable,
// but that would require spawning a test subprocess

# if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))

Expand Down
17 changes: 8 additions & 9 deletions cpp/src/arrow/util/fuzz_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ MemoryPool* fuzzing_memory_pool() {

void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
static const int kVerbosity = []() {
auto maybe_env_value = GetEnvVar("ARROW_FUZZING_VERBOSITY");
if (maybe_env_value.status().IsKeyError()) {
return 0;
auto maybe_env_value =
GetEnvVarInteger("ARROW_FUZZING_VERBOSITY", /*min_value=*/0, /*max_value=*/1);
if (maybe_env_value.ok()) {
return static_cast<int>(*maybe_env_value);
}
auto env_value = std::move(maybe_env_value).ValueOrDie();
int32_t value;
if (!ParseValue<Int32Type>(env_value.data(), env_value.length(), &value)) {
Status::Invalid("Invalid value for ARROW_FUZZING_VERBOSITY: '", env_value, "'")
.Abort();
if (!maybe_env_value.status().IsKeyError()) {
maybe_env_value.status().Abort();
}
return value;
// Quiet by default
return 0;
}();

if (!st.ok() && kVerbosity >= 1) {
Expand Down
69 changes: 49 additions & 20 deletions cpp/src/arrow/util/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/mutex.h"
#include "arrow/util/value_parsing.h"

// For filename conversion
#if defined(_WIN32)
Expand Down Expand Up @@ -1762,38 +1763,54 @@ Result<std::string> GetEnvVar(std::string_view name) {
#ifdef _WIN32
// On Windows, getenv() reads an early copy of the process' environment
// which doesn't get updated when SetEnvironmentVariable() is called.
constexpr int32_t bufsize = 2000;
char c_str[bufsize];
auto res = GetEnvironmentVariableA(name.data(), c_str, bufsize);
if (res >= bufsize) {
return Status::CapacityError("environment variable value too long");
} else if (res == 0) {
return Status::KeyError("environment variable '", name, "'undefined");
}
return std::string(c_str);
std::string value(100, '\0');

uint32_t res = GetEnvironmentVariableA(name.data(), value.data(),
static_cast<uint32_t>(value.size()));
if (res >= value.size()) {
// Value buffer too small, need to upsize
// (`res` includes the null-terminating character in this case)
value.resize(res);
res = GetEnvironmentVariableA(name.data(), value.data(),
static_cast<uint32_t>(value.size()));
}
if (res == 0) {
return Status::KeyError("environment variable '", name, "' undefined");
}
// On success, `res` does not include the null-terminating character
DCHECK_EQ(value.data()[res], 0);
value.resize(res);
return value;
#else
char* c_str = getenv(name.data());
if (c_str == nullptr) {
return Status::KeyError("environment variable '", name, "'undefined");
return Status::KeyError("environment variable '", name, "' undefined");
}
return std::string(c_str);
#endif
}

#ifdef _WIN32
Result<NativePathString> GetEnvVarNative(std::string_view name) {
NativePathString w_name;
constexpr int32_t bufsize = 2000;
wchar_t w_str[bufsize];
ARROW_ASSIGN_OR_RAISE(std::wstring w_name, StringToNative(name));
std::wstring value(100, '\0');

ARROW_ASSIGN_OR_RAISE(w_name, StringToNative(name));
auto res = GetEnvironmentVariableW(w_name.c_str(), w_str, bufsize);
if (res >= bufsize) {
return Status::CapacityError("environment variable value too long");
} else if (res == 0) {
return Status::KeyError("environment variable '", name, "'undefined");
uint32_t res = GetEnvironmentVariableW(w_name.data(), value.data(),
static_cast<uint32_t>(value.size()));
if (res >= value.size()) {
// Value buffer too small, need to upsize
// (`res` includes the null-terminating character in this case)
value.resize(res);
res = GetEnvironmentVariableW(w_name.data(), value.data(),
static_cast<uint32_t>(value.size()));
}
if (res == 0) {
return Status::KeyError("environment variable '", name, "' undefined");
}
return NativePathString(w_str);
// On success, `res` does not include the null-terminating character
DCHECK_EQ(value.data()[res], 0);
value.resize(res);
return value;
}

#else
Expand All @@ -1804,6 +1821,18 @@ Result<NativePathString> GetEnvVarNative(std::string_view name) {

#endif

Result<int64_t> GetEnvVarInteger(std::string_view name, std::optional<int64_t> min_value,
std::optional<int64_t> max_value) {
ARROW_ASSIGN_OR_RAISE(auto env_string, GetEnvVar(name));
int64_t value;
if (!ParseValue<Int64Type>(env_string.data(), env_string.length(), &value) ||
(min_value.has_value() && value < *min_value) ||
(max_value.has_value() && value > *max_value)) {
return Status::Invalid("Invalid value for ", name, ": '", env_string, "'");
}
return value;
}

Status SetEnvVar(std::string_view name, std::string_view value) {
#ifdef _WIN32
if (SetEnvironmentVariableA(name.data(), value.data())) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/io_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ ARROW_EXPORT
Result<std::string> GetEnvVar(std::string_view name);
ARROW_EXPORT
Result<NativePathString> GetEnvVarNative(std::string_view name);
// Returns KeyError if the environment variable doesn't exist,
// Invalid if it's not a valid integer in the given range.
ARROW_EXPORT
Result<int64_t> GetEnvVarInteger(std::string_view name,
std::optional<int64_t> min_value = {},
std::optional<int64_t> max_value = {});

ARROW_EXPORT
Status SetEnvVar(std::string_view name, std::string_view value);
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/arrow/util/io_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1134,5 +1134,44 @@ TEST(CpuAffinity, NumberOfCores) {
#endif
}

TEST(Environment, GetEnvVar) {
// An environment variable that should exist on roughly all platforms
ASSERT_OK_AND_ASSIGN(auto v, GetEnvVar("PATH"));
ASSERT_FALSE(v.empty());
ASSERT_OK_AND_ASSIGN(auto w, GetEnvVarNative("PATH"));
ASSERT_FALSE(w.empty());
// An environment variable that most probably does not exist
ASSERT_RAISES(KeyError, GetEnvVar("BZZT_NONEXISTENT_VAR"));
ASSERT_RAISES(KeyError, GetEnvVarNative("BZZT_NONEXISTENT_VAR"));
// (we try not to rely on EnvVarGuard here as that would be circular)
}

TEST(Environment, GetEnvVarInteger) {
{
EnvVarGuard guard("FOOBAR", "5");
ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR"));
ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR", /*min_value=*/5, /*max_value=*/7));
ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/6, /*max_value=*/7));
ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/3, /*max_value=*/4));
}
{
EnvVarGuard guard("FOOBAR", "BAZ");
ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR"));
}
{
EnvVarGuard guard("FOOBAR", std::nullopt);
ASSERT_RAISES(KeyError, GetEnvVarInteger("FOOBAR"));
}
}

TEST(Environment, SetEnvVar) {
EnvVarGuard guard("FOOBAR", "one");
ASSERT_OK_AND_EQ("one", GetEnvVar("FOOBAR"));
ASSERT_OK(SetEnvVar("FOOBAR", "two"));
ASSERT_OK_AND_EQ("two", GetEnvVar("FOOBAR"));
ASSERT_OK(DelEnvVar("FOOBAR"));
ASSERT_RAISES(KeyError, GetEnvVar("FOOBAR"));
}

} // namespace internal
} // namespace arrow
12 changes: 12 additions & 0 deletions docs/source/cpp/env_vars.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ that changing their value later will have an effect.
``libhdfs.dylib`` on macOS, ``libhdfs.so`` on other platforms).
Alternatively, one can set :envvar:`HADOOP_HOME`.

.. envvar:: ARROW_REGISTER_ATFORK

**Experimental**. An integer value to enable or disable the registration
of at-fork handlers. These are enabled by default or explicitly using the
value "1"; use "0" to disable.

If enabled, at-fork handlers make Arrow C++ compatible with the use of the
``fork()`` system call, such as by Python's :python:mod:`multiprocessing`,
but at the expense of executing
`potentially unsafe code <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_atfork.html>`__
in a forked child process if the parent process is multi-threaded.

.. envvar:: ARROW_S3_LOG_LEVEL

Controls the verbosity of logging produced by S3 calls. Defaults to ``FATAL``
Expand Down
3 changes: 1 addition & 2 deletions python/pyarrow/tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ def run_with_env_var(env_var):
for v in ('-1', 'z'):
out, err = run_with_env_var(v)
assert out.strip() == '8' # default value
assert ("ARROW_IO_THREADS does not contain a valid number of threads"
in err.strip())
assert "Invalid value for ARROW_IO_THREADS" in err.strip()


def test_build_info():
Expand Down
Loading