@@ -21,19 +21,25 @@ limitations under the License.
2121#include < utility>
2222#include < vector>
2323
24+ #include " file/base/options.pb.h"
2425#include " absl/status/status.h"
26+ #include " absl/strings/match.h"
2527#include " absl/strings/str_format.h"
2628#include " absl/strings/string_view.h"
2729#include " cpp/array_record_reader.h"
2830#include " cpp/array_record_writer.h"
2931#include " cpp/thread_pool.h"
32+ #include " third_party/cloud_cpp/google/cloud/storage/client.h"
3033#include " pybind11/gil.h"
3134#include " pybind11/pybind11.h"
3235#include " pybind11/pytypes.h"
3336#include " pybind11/stl.h"
3437#include " riegeli/base/maker.h"
3538#include " riegeli/bytes/fd_reader.h"
3639#include " riegeli/bytes/fd_writer.h"
40+ #include " riegeli/gcs/gcs_object.h"
41+ #include " riegeli/gcs/gcs_reader.h"
42+
3743
3844namespace py = pybind11;
3945
@@ -50,10 +56,13 @@ PYBIND11_MODULE(array_record_module, m) {
5056 throw py::value_error (
5157 std::string (status_or_option.status ().message ()));
5258 }
59+ riegeli::FdWriterBase::Options file_writer_options;
60+ file_writer_options.set_buffer_size (size_t {16 } << 20 );
5361 // Release the GIL because IO is time consuming.
5462 py::gil_scoped_release scoped_release;
5563 return new array_record::ArrayRecordWriter (
56- riegeli::Maker<riegeli::FdWriter>(path),
64+ riegeli::Maker<riegeli::FdWriter>(
65+ path, std::move (file_writer_options)),
5766 status_or_option.value ());
5867 }),
5968 py::arg (" path" ), py::arg (" options" ) = " " )
@@ -84,18 +93,29 @@ PYBIND11_MODULE(array_record_module, m) {
8493 std::string (status_or_option.status ().message ()));
8594 }
8695 riegeli::FdReaderBase::Options file_reader_options;
96+ riegeli::GcsReader::Options gcs_reader_options;
8797 if (kwargs.contains (" file_reader_buffer_size" )) {
8898 auto file_reader_buffer_size =
8999 kwargs[" file_reader_buffer_size" ].cast <int64_t >();
90100 file_reader_options.set_buffer_size (file_reader_buffer_size);
101+ gcs_reader_options.set_buffer_size (file_reader_buffer_size);
91102 }
92103 // Release the GIL because IO is time consuming.
93104 py::gil_scoped_release scoped_release;
94- return new array_record::ArrayRecordReader (
95- riegeli::Maker<riegeli::FdReader>(
96- path, std::move (file_reader_options)),
97- status_or_option.value (),
98- array_record::ArrayRecordGlobalPool ());
105+ if (absl::StartsWith (path, " gs://" )) {
106+ return new array_record::ArrayRecordReader (
107+ riegeli::Maker<riegeli::GcsReader>(
108+ google::cloud::storage::Client (),
109+ riegeli::GcsObject (path), std::move (gcs_reader_options)),
110+ status_or_option.value (),
111+ array_record::ArrayRecordGlobalPool ());
112+ } else {
113+ return new array_record::ArrayRecordReader (
114+ riegeli::Maker<riegeli::FdReader>(
115+ path, std::move (file_reader_options)),
116+ status_or_option.value (),
117+ array_record::ArrayRecordGlobalPool ());
118+ }
99119 }),
100120 py::arg (" path" ), py::arg (" options" ) = " " , R"(
101121 ArrayRecordReader for fast sequential or random access.
0 commit comments