diff --git a/core/Cargo.lock b/core/Cargo.lock index 837d3f170d75..31fb82032728 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2799,6 +2799,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "edge_test_opfs_wasm32" +version = "0.55.0" +dependencies = [ + "futures", + "opendal", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test", + "web-sys", +] + [[package]] name = "edge_test_s3_read_on_wasm" version = "0.55.0" @@ -6881,6 +6893,7 @@ version = "0.55.0" dependencies = [ "js-sys", "opendal-core", + "send_wrapper", "serde", "wasm-bindgen", "wasm-bindgen-futures", diff --git a/core/edge/opfs_wasm32/Cargo.toml b/core/edge/opfs_wasm32/Cargo.toml new file mode 100644 index 000000000000..cb6adcb04d21 --- /dev/null +++ b/core/edge/opfs_wasm32/Cargo.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "edge_test_opfs_wasm32" + +edition.workspace = true +license.workspace = true +publish = false +rust-version.workspace = true +version.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +futures = { workspace = true, features = ["std", "async-await"] } +opendal = { path = "../..", default-features = false, features = [ + "services-opfs", +] } +wasm-bindgen = "0.2.89" +wasm-bindgen-futures = "0.4.39" +web-sys = { version = "0.3.77", features = ["console"] } + +[dev-dependencies] +wasm-bindgen-test = "0.3.41" diff --git a/core/edge/opfs_wasm32/README.md b/core/edge/opfs_wasm32/README.md new file mode 100644 index 000000000000..409c2f25de25 --- /dev/null +++ b/core/edge/opfs_wasm32/README.md @@ -0,0 +1,27 @@ +# OPFS on WASM + +This test verifies the OpenDAL OPFS service works in a browser environment. + +## Install + +```shell +cargo install wasm-pack +``` + +## Build + +```shell +wasm-pack build +``` + +## Test + +NOTE: + +- You need to have Chrome installed. +- OPFS requires a browser context (no Node.js support). +- Headless Chrome may not work for OPFS tests. + +```shell +wasm-pack test --chrome +``` diff --git a/core/edge/opfs_wasm32/src/lib.rs b/core/edge/opfs_wasm32/src/lib.rs new file mode 100644 index 000000000000..b56c3f6d7ce6 --- /dev/null +++ b/core/edge/opfs_wasm32/src/lib.rs @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![cfg(target_arch = "wasm32")] +#[cfg(test)] +mod tests { + use futures::TryStreamExt; + use opendal::EntryMode; + use opendal::ErrorKind; + use opendal::Operator; + use opendal::services::Opfs; + use opendal::services::OpfsConfig; + use wasm_bindgen_test::wasm_bindgen_test; + use wasm_bindgen_test::wasm_bindgen_test_configure; // Required for Next() + + macro_rules! console_log { + ($($arg:tt)*) => { + web_sys::console::log_1(&format!($($arg)*).into()) + }; + } + + wasm_bindgen_test_configure!(run_in_browser); + + fn new_operator() -> Operator { + Operator::from_config(OpfsConfig::default()) + .expect("failed to create opfs operator") + .finish() + } + + #[wasm_bindgen_test] + async fn test_create_directory_handle() { + let op = new_operator(); + op.create_dir("/dir/").await.expect("directory"); + op.create_dir("/dir///").await.expect("directory"); + op.create_dir("/dir:/").await.expect("directory"); + op.create_dir("/dir<>/").await.expect("directory"); + assert_eq!( + op.create_dir("/a/b/../x/y/z/").await.unwrap_err().kind(), + ErrorKind::Unexpected + ); + // this works on Chrome, but fails on macOS + // assert_eq!(op.create_dir("/dir\0/").await.unwrap_err().kind(), ErrorKind::Unexpected); + } + + #[wasm_bindgen_test] + async fn test_create_directory_handle_with_root_and_list() { + { + // create files and dirs + let op_rooted = Operator::new(Opfs::default().root("/myapp/subdir1/subdir2/")) + .expect("config") + .finish(); + op_rooted + .write("subdir3/somefile", "content") + .await + .expect("write under root"); + + let stat_rooted = op_rooted.stat("subdir3/somefile").await.expect("stat"); + + let op = new_operator(); + let stat = op + .stat("/myapp/subdir1/subdir2/subdir3/somefile") + .await + .expect("stat"); + assert_eq!(stat_rooted, stat_rooted); + let stat = op + .stat("myapp/subdir1/subdir2/subdir3/somefile") + .await + .expect("stat"); + assert_eq!(stat_rooted, stat_rooted); + } + + { + // simple list + let op = new_operator(); + let mut entries = op.lister("").await.expect("list"); + while let Some(entry) = entries.try_next().await.expect("next") { + console_log!("entry: {} {:?}", entry.path(), entry.metadata().mode()); + } + } + + // list test added here so we are sure there are dirs and files to list + { + // simple list + let op = new_operator(); + let mut entries = op.lister("myapp/").await.expect("list"); + while let Some(entry) = entries.try_next().await.expect("next") { + console_log!("entry: {} {:?}", entry.path(), entry.metadata().mode()); + } + } + + { + // recursive list + let op = new_operator(); + let mut entries = op + .lister_with("") + .recursive(true) + .await + .expect("recursive list"); + while let Some(entry) = entries.try_next().await.expect("next") { + console_log!("rec entry: {} {:?}", entry.path(), entry.metadata().mode()); + } + } + } + + #[wasm_bindgen_test] + async fn test_write() { + let op = new_operator(); + + // this does not even go to OPFS backend, short-circuited + assert_eq!( + op.write("/", "should_not_work").await.unwrap_err().kind(), + ErrorKind::IsADirectory + ); + } + + #[wasm_bindgen_test] + async fn test_write_read_simple() { + let path = "/test_file"; + let content = "Content of the file to write"; + { + let op = new_operator(); + let meta = op.write(path, content).await.expect("write"); + console_log!("{:?}", meta); + assert_eq!(meta.content_length(), content.len() as u64); + + // This is None - we have to use stat + assert!(meta.last_modified().is_none()); + + let stat = op.stat(path).await.expect("stat"); + console_log!("stat = {:?}", stat); + assert_eq!(stat.mode(), EntryMode::FILE); + assert_eq!(stat.content_length(), content.len() as u64); + assert!(stat.last_modified().is_some()); + } + + { + // read back and compare + let op = new_operator(); + let buffer = op.read(path).await.expect("read"); + console_log!("read = {:?}", buffer); + assert_eq!(buffer.to_bytes(), content.as_bytes()); + op.delete(path).await.expect("delete"); + } + } + + #[wasm_bindgen_test] + async fn test_write_write_twice_same_file() { + let op = new_operator(); + let content = "Content of the file to write"; + let path = "/test_file"; + let meta = op.write(path, content).await.expect("write"); + let meta = op.write(path, content).await.expect("write"); + assert_eq!(meta.content_length(), content.len() as u64); + assert!(meta.last_modified().is_none()); + op.delete(path).await.expect("delete"); + } + + #[wasm_bindgen_test] + async fn test_write_like_append_three_times() { + let op = new_operator(); + let content = "Content of the file to write"; + let path = "/test_file_write_multiple"; + let mut w = op.writer(path).await.expect("writer"); + w.write(content).await.expect("write"); + w.write(content).await.expect("write"); + w.write(content).await.expect("write"); + let meta = w.close().await.expect("close"); + let expected_file_size = (content.len() as u64) * 3; + assert_eq!(meta.content_length(), expected_file_size); + assert!(meta.last_modified().is_none()); + let stat = op.stat(path).await.expect("stat"); + assert_eq!(stat.content_length(), expected_file_size); + op.delete(path).await.expect("delete"); + } + + #[wasm_bindgen_test] + async fn test_write_large_file_quota() { + // you can simulate a lower disk space in Chrome + let op = new_operator(); + let path = "big_file"; + let mut w = op.writer(path).await.expect("writer"); + let chunk = vec![0u8; 1024 * 1024]; // 1MB + for _ in 0..1024 { + let res = w.write(chunk.clone()).await; + match res { + Ok(()) => (), + Err(e) => { + // OPFS filled up (you can simulate this in Chrome by setting a lower limit) + // parse_js_error: JsValue(TypeError: Cannot close a ERRORED writable stream + // TypeError: Cannot close a ERRORED writable stream + console_log!("got {e:?}"); + console_log!("message = {}", e.message()); + assert_eq!(e.kind(), ErrorKind::Unexpected); + return; + } + } + } + + let expected_file_size = 1024 * 1024 * 1024; // 1GB + let meta = w.close().await.expect("close"); + assert_eq!(meta.content_length(), expected_file_size); + let stat = op.stat(path).await.expect("stat"); + assert_eq!(stat.content_length(), expected_file_size); + + { + // read and compare + let op = new_operator(); + let buffer = op.read(path).await.expect("read"); + assert_eq!(buffer.to_bytes().len(), expected_file_size as usize); + } + op.delete(path).await.expect("delete"); + } + + #[wasm_bindgen_test] + async fn test_write_and_read_with_range() { + let op = new_operator(); + let path = "numbers.txt"; + let content = "0123456789"; + let meta = op.write(path, content).await.expect("write"); + let buffer = op.read_with(path).range(3..5).await.expect("read"); + assert_eq!(buffer.to_bytes(), "34".as_bytes()); + op.delete(path).await.expect("delete"); + } +} diff --git a/core/services/opfs/Cargo.toml b/core/services/opfs/Cargo.toml index b82b11271a27..ac33c17e9386 100644 --- a/core/services/opfs/Cargo.toml +++ b/core/services/opfs/Cargo.toml @@ -33,17 +33,24 @@ all-features = true [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3.77" opendal-core = { path = "../../core", version = "0.55.0", default-features = false } +send_wrapper = "0.6" serde = { workspace = true, features = ["derive"] } wasm-bindgen = "0.2.100" wasm-bindgen-futures = "0.4.50" web-sys = { version = "0.3.77", features = [ - "Window", + "Blob", + "console", + "DomException", "File", "FileSystemDirectoryHandle", "FileSystemFileHandle", "FileSystemGetDirectoryOptions", "FileSystemGetFileOptions", + "FileSystemRemoveOptions", "FileSystemWritableFileStream", "Navigator", + "WriteCommandType", + "WriteParams", "StorageManager", + "Window", ] } diff --git a/core/services/opfs/src/backend.rs b/core/services/opfs/src/backend.rs index 9e28868a507c..ab50212a5735 100644 --- a/core/services/opfs/src/backend.rs +++ b/core/services/opfs/src/backend.rs @@ -15,14 +15,21 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; use std::sync::Arc; -use web_sys::FileSystemGetDirectoryOptions; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::JsFuture; +use web_sys::File; +use web_sys::FileSystemWritableFileStream; -use super::OPFS_SCHEME; use super::config::OpfsConfig; +use super::core::OpfsCore; +use super::deleter::OpfsDeleter; +use super::error::*; +use super::lister::OpfsLister; +use super::reader::OpfsReader; use super::utils::*; +use super::writer::OpfsWriter; use opendal_core::raw::*; use opendal_core::*; @@ -32,44 +39,108 @@ pub struct OpfsBuilder { pub(super) config: OpfsConfig, } +impl OpfsBuilder { + /// Set root directory for this backend. + pub fn root(mut self, root: &str) -> Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + self + } +} + impl Builder for OpfsBuilder { type Config = OpfsConfig; fn build(self) -> Result { - Ok(OpfsBackend {}) + let root = normalize_root(&self.config.root.unwrap_or_default()); + let core = Arc::new(OpfsCore::new(root)); + Ok(OpfsBackend { core }) } } /// OPFS Service backend -#[derive(Default, Debug, Clone)] -pub struct OpfsBackend {} +#[derive(Debug, Clone)] +pub struct OpfsBackend { + core: Arc, +} impl Access for OpfsBackend { - type Reader = (); + type Reader = OpfsReader; - type Writer = (); + type Writer = OpfsWriter; - type Lister = (); + type Lister = OpfsLister; - type Deleter = (); + type Deleter = oio::OneShotDeleter; fn info(&self) -> Arc { - let info = AccessorInfo::default(); - info.set_scheme(OPFS_SCHEME); - info.set_name("opfs"); - info.set_root("/"); - info.set_native_capability(Capability { - create_dir: true, - ..Default::default() - }); - Arc::new(info) + self.core.info.clone() + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + let p = build_abs_path(&self.core.root, path); + + if p.ends_with('/') { + get_directory_handle(&p, false).await?; + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + // File: get metadata via getFile(). + let handle = get_file_handle(&p, false).await?; + let file: File = JsFuture::from(handle.get_file()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_content_length(file.size() as u64); + if let Ok(t) = Timestamp::from_millisecond(file.last_modified() as i64) { + meta.set_last_modified(t); + } + + Ok(RpStat::new(meta)) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.core.root, path); + let handle = get_file_handle(&p, false).await?; + + Ok((RpRead::new(), OpfsReader::new(handle, args.range()))) + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { + let p = build_abs_path(&self.core.root, path); + let dir = get_directory_handle(&p, false).await?; + + Ok((RpList::default(), OpfsLister::new(dir, path.to_string()))) } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let opt = FileSystemGetDirectoryOptions::new(); - opt.set_create(true); - get_directory_handle(path, &opt).await?; + debug_assert!(path != "/", "root path should be handled upstream"); + let p = build_abs_path(&self.core.root, path); + get_directory_handle(&p, true).await?; Ok(RpCreateDir::default()) } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.core.root, path); + let handle = get_file_handle(&p, true).await?; + let stream: FileSystemWritableFileStream = JsFuture::from(handle.create_writable()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + Ok((RpWrite::default(), OpfsWriter::new(stream))) + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(OpfsDeleter::new(self.core.clone())), + )) + } } diff --git a/core/services/opfs/src/config.rs b/core/services/opfs/src/config.rs index 38bd72c21be7..1201dcb00a5d 100644 --- a/core/services/opfs/src/config.rs +++ b/core/services/opfs/src/config.rs @@ -26,11 +26,16 @@ use super::backend::OpfsBuilder; #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] #[non_exhaustive] -pub struct OpfsConfig {} +pub struct OpfsConfig { + /// Root directory for this backend. + pub root: Option, +} impl Debug for OpfsConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OpfsConfig").finish_non_exhaustive() + f.debug_struct("OpfsConfig") + .field("root", &self.root) + .finish_non_exhaustive() } } diff --git a/core/services/opfs/src/core.rs b/core/services/opfs/src/core.rs index 10cd1da91996..943accb34238 100644 --- a/core/services/opfs/src/core.rs +++ b/core/services/opfs/src/core.rs @@ -15,59 +15,45 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; +use std::sync::Arc; -use wasm_bindgen::JsCast; -use wasm_bindgen_futures::JsFuture; -use web_sys::File; -use web_sys::FileSystemWritableFileStream; +use super::OPFS_SCHEME; +use opendal_core::raw::*; +use opendal_core::*; -use opendal_core::Error; -use opendal_core::Result; - -use super::error::*; -use super::utils::*; - -#[derive(Default, Debug)] -pub struct OpfsCore {} +#[derive(Debug, Clone)] +pub struct OpfsCore { + pub root: String, + pub info: Arc, +} impl OpfsCore { - #[allow(unused)] - async fn store_file(&self, file_name: &str, content: &[u8]) -> Result<(), Error> { - let handle = get_handle_by_filename(file_name).await?; + pub fn new(root: String) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(OPFS_SCHEME); + info.set_name("opfs"); + info.set_root(&root); + info.set_native_capability(Capability { + stat: true, - let writable: FileSystemWritableFileStream = JsFuture::from(handle.create_writable()) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; + read: true, - JsFuture::from( - writable - .write_with_u8_array(content) - .map_err(parse_js_error)?, - ) - .await - .map_err(parse_js_error)?; + list: true, - JsFuture::from(writable.close()) - .await - .map_err(parse_js_error)?; + create_dir: true, - Ok(()) - } + write: true, + write_can_empty: true, + write_can_multi: true, - #[allow(unused)] - async fn read_file(&self, file_name: &str) -> Result, Error> { - let handle = get_handle_by_filename(file_name).await?; + delete: true, - let file: File = JsFuture::from(handle.get_file()) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; - let array_buffer = JsFuture::from(file.array_buffer()) - .await - .map_err(parse_js_error)?; + ..Default::default() + }); - Ok(js_sys::Uint8Array::new(&array_buffer).to_vec()) + Self { + root, + info: Arc::new(info), + } } } diff --git a/core/services/opfs/src/deleter.rs b/core/services/opfs/src/deleter.rs new file mode 100644 index 000000000000..2d2399743495 --- /dev/null +++ b/core/services/opfs/src/deleter.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use wasm_bindgen_futures::JsFuture; +use web_sys::FileSystemRemoveOptions; + +use super::core::OpfsCore; +use super::error::*; +use super::utils::*; +use opendal_core::raw::*; +use opendal_core::*; + +pub struct OpfsDeleter { + core: Arc, +} + +impl OpfsDeleter { + pub fn new(core: Arc) -> Self { + Self { core } + } +} + +impl oio::OneShotDelete for OpfsDeleter { + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + let p = build_abs_path(&self.core.root, &path); + let (dir, name) = get_parent_dir_and_name(&p, false).await?; + + let opt = FileSystemRemoveOptions::new(); + opt.set_recursive(args.recursive()); + + match JsFuture::from(dir.remove_entry_with_options(name, &opt)).await { + Ok(_) => Ok(()), + Err(e) => { + let err = parse_js_error(e); + // Deleting a non-existent entry is not an error. + if err.kind() == ErrorKind::NotFound { + Ok(()) + } else { + Err(err) + } + } + } + } +} diff --git a/core/services/opfs/src/docs.md b/core/services/opfs/src/docs.md index a529c2ff3b5e..fef5f71f343c 100644 --- a/core/services/opfs/src/docs.md +++ b/core/services/opfs/src/docs.md @@ -2,12 +2,12 @@ This service can be used to: -- [ ] create_dir -- [ ] stat -- [ ] read -- [ ] write -- [ ] delete -- [ ] list +- [x] create_dir +- [x] stat +- [x] read +- [x] write +- [x] delete +- [x] list - [ ] copy - [ ] rename - [ ] presign diff --git a/core/services/opfs/src/error.rs b/core/services/opfs/src/error.rs index 5f2a1a0fe343..b50a223c3a0f 100644 --- a/core/services/opfs/src/error.rs +++ b/core/services/opfs/src/error.rs @@ -15,14 +15,39 @@ // specific language governing permissions and limitations // under the License. +use wasm_bindgen::JsCast; use wasm_bindgen::JsValue; +use web_sys::DomException; use opendal_core::Error; use opendal_core::ErrorKind; -pub(crate) fn parse_js_error(msg: JsValue) -> Error { +pub(crate) fn parse_js_error(value: JsValue) -> Error { + if let Some(exc) = value.dyn_ref::() { + let kind = match exc.name().as_str() { + "NotFoundError" => ErrorKind::NotFound, + "TypeMismatchError" => ErrorKind::NotFound, + "NotAllowedError" => ErrorKind::PermissionDenied, + "QuotaExceededError" => ErrorKind::Unexpected, + e => { + console_debug!("Got unhandled DOM exception {e:?}"); + ErrorKind::Unexpected + } + }; + return Error::new(kind, exc.message()); + } + + // FIXME: how to map `TypeError`` from `getDirectoryHandle`: + // "name specified is not a valid string or contains characters that + // would interfere with the native file system" + if let Some(err) = value.dyn_ref::() { + return Error::new(ErrorKind::Unexpected, String::from(err.message())); + } + Error::new( ErrorKind::Unexpected, - msg.as_string().unwrap_or_else(String::new), + value + .as_string() + .unwrap_or_else(|| "unknown JS error".to_string()), ) } diff --git a/core/services/opfs/src/lib.rs b/core/services/opfs/src/lib.rs index 06887f57ffcf..42ba6a7ab206 100644 --- a/core/services/opfs/src/lib.rs +++ b/core/services/opfs/src/lib.rs @@ -25,11 +25,22 @@ pub fn register_opfs_service(registry: &opendal_core::OperatorRegistry) { registry.register::(OPFS_SCHEME); } +macro_rules! console_debug { + ($($arg:tt)*) => { + #[cfg(debug_assertions)] + web_sys::console::log_1(&format!($($arg)*).into()) + }; +} + mod backend; mod config; mod core; +mod deleter; mod error; +mod lister; +mod reader; mod utils; +mod writer; pub use backend::OpfsBuilder as Opfs; pub use config::OpfsConfig; diff --git a/core/services/opfs/src/lister.rs b/core/services/opfs/src/lister.rs new file mode 100644 index 000000000000..e188bf6f826f --- /dev/null +++ b/core/services/opfs/src/lister.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use send_wrapper::SendWrapper; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::JsFuture; +use web_sys::FileSystemDirectoryHandle; + +use super::error::*; +use opendal_core::raw::*; +use opendal_core::*; + +pub struct OpfsLister { + iter: SendWrapper, + path: String, +} + +impl OpfsLister { + pub fn new(dir: FileSystemDirectoryHandle, path: String) -> Self { + // Entry paths must not start with '/'. + // For root listing, path is "/" — normalize to "". + let path = if path == "/" { String::new() } else { path }; + Self { + iter: SendWrapper::new(dir.entries()), + path, + } + } +} + +impl oio::List for OpfsLister { + async fn next(&mut self) -> Result> { + let result = JsFuture::from(self.iter.next().map_err(parse_js_error)?) + .await + .map_err(parse_js_error)?; + + let done = js_sys::Reflect::get(&result, &"done".into()) + .unwrap_or(JsValue::TRUE) + .as_bool() + .unwrap_or(true); + if done { + return Ok(None); + } + + let value = js_sys::Reflect::get(&result, &"value".into()).map_err(parse_js_error)?; + let pair: js_sys::Array = value.unchecked_into(); + let name = pair.get(0).as_string().unwrap_or_default(); + let handle = pair.get(1); + + let kind = js_sys::Reflect::get(&handle, &"kind".into()) + .ok() + .and_then(|v| v.as_string()) + .unwrap_or_default(); + + let (entry_path, mode) = if kind == "directory" { + (format!("{}{}/", self.path, name), EntryMode::DIR) + } else { + (format!("{}{}", self.path, name), EntryMode::FILE) + }; + + Ok(Some(oio::Entry::new(&entry_path, Metadata::new(mode)))) + } +} diff --git a/core/services/opfs/src/reader.rs b/core/services/opfs/src/reader.rs new file mode 100644 index 000000000000..418a281afee6 --- /dev/null +++ b/core/services/opfs/src/reader.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use send_wrapper::SendWrapper; +use wasm_bindgen::JsCast; +use wasm_bindgen_futures::JsFuture; +use web_sys::FileSystemFileHandle; + +use super::error::*; +use opendal_core::raw::*; +use opendal_core::*; + +pub struct OpfsReader { + handle: SendWrapper, + range: BytesRange, + done: bool, +} + +impl OpfsReader { + pub fn new(handle: FileSystemFileHandle, range: BytesRange) -> Self { + Self { + handle: SendWrapper::new(handle), + range, + done: false, + } + } +} + +impl oio::Read for OpfsReader { + async fn read(&mut self) -> Result { + if self.done { + return Ok(Buffer::new()); + } + self.done = true; + + let file: web_sys::File = JsFuture::from(self.handle.get_file()) + .await + .and_then(JsCast::dyn_into) + .map_err(parse_js_error)?; + + let blob: &web_sys::Blob = file.as_ref(); + let blob = if self.range.is_full() { + blob.clone() + } else { + let offset = self.range.offset() as f64; + let end = match self.range.size() { + Some(size) => offset + size as f64, + None => blob.size(), + }; + blob.slice_with_f64_and_f64(offset, end) + .map_err(parse_js_error)? + }; + + let array_buffer = JsFuture::from(blob.array_buffer()) + .await + .map_err(parse_js_error)?; + let uint8_array = js_sys::Uint8Array::new(&array_buffer); + + Ok(Buffer::from(uint8_array.to_vec())) + } +} diff --git a/core/services/opfs/src/utils.rs b/core/services/opfs/src/utils.rs index 4c2997a73015..ec49caefd426 100644 --- a/core/services/opfs/src/utils.rs +++ b/core/services/opfs/src/utils.rs @@ -26,24 +26,34 @@ use web_sys::window; use super::error::*; +/// Get the OPFS root directory handle. pub(crate) async fn get_root_directory_handle() -> Result { let navigator = window().unwrap().navigator(); let storage_manager = navigator.storage(); + // This may fail if not secure (not: HTTPS or localhost) JsFuture::from(storage_manager.get_directory()) .await .and_then(JsCast::dyn_into) .map_err(parse_js_error) } +/// Navigate to a directory handle by path. +/// +/// When `create` is true, intermediate directories are created as needed. pub(crate) async fn get_directory_handle( - dir: &str, - dir_opt: &FileSystemGetDirectoryOptions, + path: &str, + create: bool, ) -> Result { - let dirs: Vec<&str> = dir.trim_matches('/').split('/').collect(); + let opt = FileSystemGetDirectoryOptions::new(); + opt.set_create(create); + let trimmed = path.trim_matches('/'); let mut handle = get_root_directory_handle().await?; - for dir in dirs { - handle = JsFuture::from(handle.get_directory_handle_with_options(dir, dir_opt)) + if trimmed.is_empty() { + return Ok(handle); + } + for dir in trimmed.split('/') { + handle = JsFuture::from(handle.get_directory_handle_with_options(dir, &opt)) .await .and_then(JsCast::dyn_into) .map_err(parse_js_error)?; @@ -52,18 +62,36 @@ pub(crate) async fn get_directory_handle( Ok(handle) } -pub(crate) async fn get_handle_by_filename(filename: &str) -> Result { - let navigator = window().unwrap().navigator(); - let storage_manager = navigator.storage(); - let root: FileSystemDirectoryHandle = JsFuture::from(storage_manager.get_directory()) - .await - .and_then(JsCast::dyn_into) - .map_err(parse_js_error)?; +/// Split a file path into its parent directory handle and filename. +/// +/// When `create` is true, intermediate directories are created as needed. +pub(crate) async fn get_parent_dir_and_name<'a>( + path: &'a str, + create: bool, +) -> Result<(FileSystemDirectoryHandle, &'a str)> { + let trimmed = path.trim_matches('/'); + match trimmed.rsplit_once('/') { + Some((parent, name)) => { + let dir = get_directory_handle(parent, create).await?; + Ok((dir, name)) + } + None => { + let root = get_root_directory_handle().await?; + Ok((root, trimmed)) + } + } +} + +/// Get a file handle by its full path. +/// +/// When `create` is true, intermediate directories and the file itself are created as needed. +pub(crate) async fn get_file_handle(path: &str, create: bool) -> Result { + let (dir, name) = get_parent_dir_and_name(path, create).await?; let opt = FileSystemGetFileOptions::new(); - opt.set_create(true); + opt.set_create(create); - JsFuture::from(root.get_file_handle_with_options(filename, &opt)) + JsFuture::from(dir.get_file_handle_with_options(name, &opt)) .await .and_then(JsCast::dyn_into) .map_err(parse_js_error) diff --git a/core/services/opfs/src/writer.rs b/core/services/opfs/src/writer.rs new file mode 100644 index 000000000000..551131ae345f --- /dev/null +++ b/core/services/opfs/src/writer.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use send_wrapper::SendWrapper; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use web_sys::FileSystemWritableFileStream; +use web_sys::WriteCommandType; +use web_sys::WriteParams; + +use opendal_core::raw::*; +use opendal_core::*; + +use super::error::*; + +pub struct OpfsWriter { + stream: SendWrapper, + bytes_written: u64, +} + +impl OpfsWriter { + pub fn new(stream: FileSystemWritableFileStream) -> Self { + Self { + stream: SendWrapper::new(stream), + bytes_written: 0, + } + } +} + +impl oio::Write for OpfsWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let bytes = bs.to_bytes(); + let params = WriteParams::new(WriteCommandType::Write); + params.set_size(Some(bytes.len() as f64)); + let data: JsValue = js_sys::Uint8Array::from(bytes.as_ref()).into(); + params.set_data(&data); + JsFuture::from( + self.stream + .write_with_write_params(¶ms.into()) + .map_err(parse_js_error)?, + ) + .await + .map_err(parse_js_error)?; + + self.bytes_written += bytes.len() as u64; + Ok(()) + } + + async fn close(&mut self) -> Result { + JsFuture::from(self.stream.close()) + .await + .map_err(parse_js_error)?; + + // We cannot set LastModified here - stream does not have such metadata + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_content_length(self.bytes_written); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + JsFuture::from(self.stream.abort()) + .await + .map_err(parse_js_error)?; + Ok(()) + } +}