@@ -22,18 +22,22 @@ limitations under the License.
2222#include < vector>
2323
2424#include " absl/status/status.h"
25+ #include " absl/strings/match.h"
2526#include " absl/strings/str_format.h"
2627#include " absl/strings/string_view.h"
2728#include " cpp/array_record_reader.h"
2829#include " cpp/array_record_writer.h"
2930#include " cpp/thread_pool.h"
31+ #include " third_party/cloud_cpp/google/cloud/storage/client.h"
3032#include " pybind11/gil.h"
3133#include " pybind11/pybind11.h"
3234#include " pybind11/pytypes.h"
3335#include " pybind11/stl.h"
3436#include " riegeli/base/maker.h"
3537#include " riegeli/bytes/fd_reader.h"
3638#include " riegeli/bytes/fd_writer.h"
39+ #include " riegeli/gcs/gcs_object.h"
40+ #include " riegeli/gcs/gcs_reader.h"
3741
3842namespace py = pybind11;
3943
@@ -50,10 +54,13 @@ PYBIND11_MODULE(array_record_module, m) {
5054 throw py::value_error (
5155 std::string (status_or_option.status ().message ()));
5256 }
57+ riegeli::FdWriterBase::Options file_writer_options;
58+ file_writer_options.set_buffer_size (size_t {16 } << 20 );
5359 // Release the GIL because IO is time consuming.
5460 py::gil_scoped_release scoped_release;
5561 return new array_record::ArrayRecordWriter (
56- riegeli::Maker<riegeli::FdWriter>(path),
62+ riegeli::Maker<riegeli::FdWriter>(
63+ path, std::move (file_writer_options)),
5764 status_or_option.value ());
5865 }),
5966 py::arg (" path" ), py::arg (" options" ) = " " )
@@ -84,18 +91,29 @@ PYBIND11_MODULE(array_record_module, m) {
8491 std::string (status_or_option.status ().message ()));
8592 }
8693 riegeli::FdReaderBase::Options file_reader_options;
94+ riegeli::GcsReader::Options gcs_reader_options;
8795 if (kwargs.contains (" file_reader_buffer_size" )) {
8896 auto file_reader_buffer_size =
8997 kwargs[" file_reader_buffer_size" ].cast <int64_t >();
9098 file_reader_options.set_buffer_size (file_reader_buffer_size);
99+ gcs_reader_options.set_buffer_size (file_reader_buffer_size);
91100 }
92101 // Release the GIL because IO is time consuming.
93102 py::gil_scoped_release scoped_release;
94- return new array_record::ArrayRecordReader (
95- riegeli::Maker<riegeli::FdReader>(
96- path, std::move (file_reader_options)),
97- status_or_option.value (),
98- array_record::ArrayRecordGlobalPool ());
103+ if (absl::StartsWith (path, " gs://" )) {
104+ return new array_record::ArrayRecordReader (
105+ riegeli::Maker<riegeli::GcsReader>(
106+ google::cloud::storage::Client (),
107+ riegeli::GcsObject (path), std::move (gcs_reader_options)),
108+ status_or_option.value (),
109+ array_record::ArrayRecordGlobalPool ());
110+ } else {
111+ return new array_record::ArrayRecordReader (
112+ riegeli::Maker<riegeli::FdReader>(
113+ path, std::move (file_reader_options)),
114+ status_or_option.value (),
115+ array_record::ArrayRecordGlobalPool ());
116+ }
99117 }),
100118 py::arg (" path" ), py::arg (" options" ) = " " , R"(
101119 ArrayRecordReader for fast sequential or random access.
0 commit comments