1919
2020#include < cstdlib>
2121#include < mutex>
22- #include < string_view >
22+ #include < stdexcept >
2323
2424#include < arrow/filesystem/filesystem.h>
25- #include < arrow/filesystem/localfs.h>
26- #if __has_include(<arrow/filesystem/s3fs.h>)
27- #include < arrow/filesystem/s3fs.h>
28- #define ICEBERG_ARROW_HAS_S3 1
25+ #ifdef ICEBERG_S3_ENABLED
26+ # include < arrow/filesystem/s3fs.h>
27+ # define ICEBERG_ARROW_HAS_S3 1
2928#else
30- #define ICEBERG_ARROW_HAS_S3 0
29+ # define ICEBERG_ARROW_HAS_S3 0
3130#endif
3231
3332#include " iceberg/arrow/arrow_file_io.h"
@@ -40,23 +39,17 @@ namespace iceberg::arrow {
4039
4140namespace {
4241
43- bool IsS3Uri (std::string_view uri) { return uri.rfind (" s3://" , 0 ) == 0 ; }
44-
4542Status EnsureS3Initialized () {
4643#if ICEBERG_ARROW_HAS_S3
4744 static std::once_flag init_flag;
4845 static ::arrow::Status init_status = ::arrow::Status::OK ();
4946 std::call_once (init_flag, []() {
5047 ::arrow::fs::S3GlobalOptions options;
5148 init_status = ::arrow::fs::InitializeS3 (options);
52- if (init_status.ok ()) {
53- std::atexit ([]() { (void )::arrow::fs::FinalizeS3 (); });
54- }
5549 });
5650 if (!init_status.ok ()) {
57- return std::unexpected<Error>{
58- {.kind = ::iceberg::arrow::ToErrorKind (init_status),
59- .message = init_status.ToString ()}};
51+ return std::unexpected (Error{.kind = ::iceberg::arrow::ToErrorKind (init_status),
52+ .message = init_status.ToString ()});
6053 }
6154 return {};
6255#else
@@ -69,7 +62,7 @@ Status EnsureS3Initialized() {
6962// /
7063// / \param properties The configuration properties map.
7164// / \return Configured S3Options.
72- ::arrow::fs::S3Options ConfigureS3Options (
65+ Result< ::arrow::fs::S3Options> ConfigureS3Options (
7366 const std::unordered_map<std::string, std::string>& properties) {
7467 ::arrow::fs::S3Options options;
7568
@@ -100,13 +93,22 @@ ::arrow::fs::S3Options ConfigureS3Options(
10093 auto endpoint_it = properties.find (S3Properties::kEndpoint );
10194 if (endpoint_it != properties.end ()) {
10295 options.endpoint_override = endpoint_it->second ;
96+ } else {
97+ // Fall back to AWS standard environment variables for endpoint override
98+ const char * s3_endpoint_env = std::getenv (" AWS_ENDPOINT_URL_S3" );
99+ if (s3_endpoint_env != nullptr ) {
100+ options.endpoint_override = s3_endpoint_env;
101+ } else {
102+ const char * endpoint_env = std::getenv (" AWS_ENDPOINT_URL" );
103+ if (endpoint_env != nullptr ) {
104+ options.endpoint_override = endpoint_env;
105+ }
106+ }
103107 }
104108
105- // Configure path-style access (needed for MinIO)
106109 auto path_style_it = properties.find (S3Properties::kPathStyleAccess );
107- if (path_style_it != properties.end ()) {
108- // Arrow's S3 path-style is controlled via endpoint scheme
109- // For path-style access, we need to ensure the endpoint is properly configured
110+ if (path_style_it != properties.end () && path_style_it->second == " true" ) {
111+ options.force_virtual_addressing = false ;
110112 }
111113
112114 // Configure SSL
@@ -118,117 +120,45 @@ ::arrow::fs::S3Options ConfigureS3Options(
118120 // Configure timeouts
119121 auto connect_timeout_it = properties.find (S3Properties::kConnectTimeoutMs );
120122 if (connect_timeout_it != properties.end ()) {
121- options.connect_timeout = std::stod (connect_timeout_it->second ) / 1000.0 ;
123+ try {
124+ options.connect_timeout = std::stod (connect_timeout_it->second ) / 1000.0 ;
125+ } catch (const std::exception& e) {
126+ return InvalidArgument (" Invalid {}: '{}' ({})" , S3Properties::kConnectTimeoutMs ,
127+ connect_timeout_it->second , e.what ());
128+ }
122129 }
123130
124131 auto socket_timeout_it = properties.find (S3Properties::kSocketTimeoutMs );
125132 if (socket_timeout_it != properties.end ()) {
126- options.request_timeout = std::stod (socket_timeout_it->second ) / 1000.0 ;
133+ try {
134+ options.request_timeout = std::stod (socket_timeout_it->second ) / 1000.0 ;
135+ } catch (const std::exception& e) {
136+ return InvalidArgument (" Invalid {}: '{}' ({})" , S3Properties::kSocketTimeoutMs ,
137+ socket_timeout_it->second , e.what ());
138+ }
127139 }
128140
129141 return options;
130142}
131-
132- // / \brief Create an S3 FileSystem with the given options.
133- // /
134- // / \param options The S3Options to use.
135- // / \return A shared_ptr to the S3FileSystem, or an error.
136- Result<std::shared_ptr<::arrow::fs::FileSystem>> MakeS3FileSystem (
137- const ::arrow::fs::S3Options& options) {
138- ICEBERG_RETURN_UNEXPECTED (EnsureS3Initialized ());
139- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto fs, ::arrow::fs::S3FileSystem::Make (options));
140- return fs;
141- }
142143#endif
143144
144- Result<std::shared_ptr<::arrow::fs::FileSystem>> ResolveFileSystemFromUri (
145- const std::string& uri, std::string* out_path) {
146- if (IsS3Uri (uri)) {
147- ICEBERG_RETURN_UNEXPECTED (EnsureS3Initialized ());
148- }
149- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto fs, ::arrow::fs::FileSystemFromUri (uri, out_path));
150- return fs;
151- }
152-
153- // / \brief ArrowUriFileIO resolves FileSystem from URI for each operation.
154- // /
155- // / This implementation is thread-safe as it creates a new FileSystem instance
156- // / for each operation. However, it may be less efficient than caching the
157- // / FileSystem. S3 initialization is done once per process.
158- class ArrowUriFileIO : public FileIO {
159- public:
160- Result<std::string> ReadFile (const std::string& file_location,
161- std::optional<size_t > length) override {
162- std::string path;
163- ICEBERG_ASSIGN_OR_RAISE (auto fs, ResolveFileSystemFromUri (file_location, &path));
164- ::arrow::fs::FileInfo file_info (path);
165- if (length.has_value ()) {
166- file_info.set_size (length.value ());
167- }
168- std::string content;
169- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto file, fs->OpenInputFile (file_info));
170- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto file_size, file->GetSize ());
171-
172- content.resize (file_size);
173- size_t remain = file_size;
174- size_t offset = 0 ;
175- while (remain > 0 ) {
176- size_t read_length = std::min (remain, static_cast <size_t >(1024 * 1024 ));
177- ICEBERG_ARROW_ASSIGN_OR_RETURN (
178- auto read_bytes,
179- file->Read (read_length, reinterpret_cast <uint8_t *>(&content[offset])));
180- remain -= read_bytes;
181- offset += read_bytes;
182- }
183-
184- return content;
185- }
186-
187- Status WriteFile (const std::string& file_location,
188- std::string_view content) override {
189- std::string path;
190- ICEBERG_ASSIGN_OR_RAISE (auto fs, ResolveFileSystemFromUri (file_location, &path));
191- ICEBERG_ARROW_ASSIGN_OR_RETURN (auto file, fs->OpenOutputStream (path));
192- ICEBERG_ARROW_RETURN_NOT_OK (file->Write (content.data (), content.size ()));
193- ICEBERG_ARROW_RETURN_NOT_OK (file->Flush ());
194- ICEBERG_ARROW_RETURN_NOT_OK (file->Close ());
195- return {};
196- }
197-
198- Status DeleteFile (const std::string& file_location) override {
199- std::string path;
200- ICEBERG_ASSIGN_OR_RAISE (auto fs, ResolveFileSystemFromUri (file_location, &path));
201- ICEBERG_ARROW_RETURN_NOT_OK (fs->DeleteFile (path));
202- return {};
203- }
204- };
205-
206145} // namespace
207146
208147Result<std::unique_ptr<FileIO>> MakeS3FileIO (
209148 const std::string& uri,
210149 const std::unordered_map<std::string, std::string>& properties) {
211- if (!IsS3Uri ( uri)) {
150+ if (!uri. starts_with ( " s3:// " )) {
212151 return InvalidArgument (" S3 URI must start with s3://" );
213152 }
214153#if !ICEBERG_ARROW_HAS_S3
215154 return NotImplemented (" Arrow S3 support is not enabled" );
216155#else
217- // If properties are empty, use the simple URI-based resolution
218- if (properties.empty ()) {
219- // Validate that S3 can be initialized and the URI is valid
220- std::string path;
221- ICEBERG_ASSIGN_OR_RAISE (auto fs, ResolveFileSystemFromUri (uri, &path));
222- (void )path;
223- (void )fs;
224- return std::make_unique<ArrowUriFileIO>();
225- }
156+ ICEBERG_RETURN_UNEXPECTED (EnsureS3Initialized ());
226157
227- // Create S3FileSystem with explicit configuration
228- auto options = ConfigureS3Options (properties);
229- ICEBERG_ASSIGN_OR_RAISE (auto fs, MakeS3FileSystem (options));
158+ // Configure S3 options from properties (uses default credentials if empty)
159+ ICEBERG_ASSIGN_OR_RAISE ( auto options, ConfigureS3Options (properties) );
160+ ICEBERG_ARROW_ASSIGN_OR_RETURN (auto fs, :: arrow::fs::S3FileSystem::Make (options));
230161
231- // Return ArrowFileSystemFileIO with the configured S3 filesystem
232162 return std::make_unique<ArrowFileSystemFileIO>(std::move (fs));
233163#endif
234164}
0 commit comments