From 26818d2f6e28dd4b6321283da20ca96a8b73b507 Mon Sep 17 00:00:00 2001 From: Yige Zhu Date: Fri, 19 Aug 2022 10:16:28 -0700 Subject: [PATCH] Add storage service under onedocker Summary: Two motivations behind this diff: 1. in the long term we want to remove onedocker's dependency on fbpcp, and split fbpcp into two decoupled repos, onedocker and pce 2. With the new onedocker storage service, we are able to add more customized API for onedocker repository services, such as handling the storage of metadata. Differential Revision: D38866302 fbshipit-source-id: 7223635bfacade7a8e82f20480aaee05b4fa6476 --- onedocker/service/storage.py | 77 +++++++ onedocker/service/storage_s3.py | 230 +++++++++++++++++++++ onedocker/tests/service/test_storage_s3.py | 220 ++++++++++++++++++++ 3 files changed, 527 insertions(+) create mode 100644 onedocker/service/storage.py create mode 100644 onedocker/service/storage_s3.py create mode 100644 onedocker/tests/service/test_storage_s3.py diff --git a/onedocker/service/storage.py b/onedocker/service/storage.py new file mode 100644 index 00000000..7d91d50d --- /dev/null +++ b/onedocker/service/storage.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import abc +import re +from enum import Enum +from typing import List + +from fbpcp.entity.file_information import FileInfo +from fbpcp.entity.policy_statement import PolicyStatement, PublicAccessBlockConfig + + +class PathType(Enum): + Local = 1 + S3 = 2 + GCS = 3 + + +class StorageService(abc.ABC): + @abc.abstractmethod + def read(self, filename: str) -> str: + pass + + @abc.abstractmethod + def write(self, filename: str, data: str) -> None: + pass + + @abc.abstractmethod + def copy(self, source: str, destination: str) -> None: + pass + + @abc.abstractmethod + def file_exists(self, filename: str) -> bool: + pass + + @staticmethod + def path_type(filename: str) -> PathType: + s3_match = re.search( + "^https?:/([^.]+).s3.([^.]+).amazonaws.com/(.*)$", filename + ) + if s3_match: + return PathType.S3 + + gcs_match = re.search("^https?://storage.cloud.google.com/(.*)$", filename) + if gcs_match: + return PathType.GCS + + return PathType.Local + + @abc.abstractmethod + def get_file_size(self, filename: str) -> int: + pass + + @abc.abstractmethod + def get_file_info(self, filename: str) -> FileInfo: + pass + + @abc.abstractmethod + def list_folders(self, filename: str) -> List[str]: + pass + + @abc.abstractmethod + def get_bucket_policy_statements(self, bucket: str) -> List[PolicyStatement]: + pass + + @abc.abstractmethod + def get_bucket_public_access_block(self, bucket: str) -> PublicAccessBlockConfig: + pass + + @abc.abstractmethod + def list_files(self, dirPath: str) -> List[str]: + pass diff --git a/onedocker/service/storage_s3.py b/onedocker/service/storage_s3.py new file mode 100644 index 00000000..b3f36cb0 --- /dev/null +++ b/onedocker/service/storage_s3.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import os +from os import path +from os.path import join, normpath, relpath +from typing import Any, Dict, List, Optional + +from fbpcp.entity.file_information import FileInfo +from fbpcp.entity.policy_statement import PolicyStatement, PublicAccessBlockConfig +from fbpcp.gateway.s3 import S3Gateway +from fbpcp.util.s3path import S3Path +from onedocker.service.storage import PathType, StorageService + + +class S3StorageService(StorageService): + def __init__( + self, + region: str = "us-west-1", + access_key_id: Optional[str] = None, + access_key_data: Optional[str] = None, + config: Optional[Dict[str, Any]] = None, + session_token: Optional[str] = None, + ) -> None: + self.s3_gateway = S3Gateway( + region, access_key_id, access_key_data, config, session_token + ) + + def read(self, filename: str) -> str: + """Read a file data + Keyword arguments: + filename -- "https://bucket-name.s3.Region.amazonaws.com/key-name" + """ + s3_path = S3Path(filename) + return self.s3_gateway.get_object(s3_path.bucket, s3_path.key) + + def write(self, filename: str, data: str) -> None: + """Write data into a file + Keyword arguments: + filename -- "https://bucket-name.s3.Region.amazonaws.com/key-name"` + """ + s3_path = S3Path(filename) + self.s3_gateway.put_object(s3_path.bucket, s3_path.key, data) + + def copy(self, source: str, destination: str, recursive: bool = False) -> None: + """Move a file or folder between local storage and S3, as well as, S3 and S3 + Keyword arguments: + source -- source file + destination -- destination file + recursive -- whether to recursively copy a folder + """ + if StorageService.path_type(source) == PathType.Local: + # from local to S3 + if StorageService.path_type(destination) == PathType.Local: + raise ValueError("Both source and destination are local files") + s3_path = S3Path(destination) + if path.isdir(source): + if not recursive: + raise ValueError(f"Source {source} is a folder. Use --recursive") + self.upload_dir(source, s3_path.bucket, s3_path.key) + else: + self.s3_gateway.upload_file(source, s3_path.bucket, s3_path.key) + else: + source_s3_path = S3Path(source) + if StorageService.path_type(destination) == PathType.S3: + # from S3 to S3 + dest_s3_path = S3Path(destination) + if source_s3_path == dest_s3_path: + raise ValueError( + f"Source {source} and destination {destination} are the same" + ) + + if source.endswith("/"): + if not recursive: + raise ValueError( + f"Source {source} is a folder. Use --recursive" + ) + + self.copy_dir( + source_s3_path.bucket, + source_s3_path.key + "/", + dest_s3_path.bucket, + dest_s3_path.key, + ) + else: + self.s3_gateway.copy( + source_s3_path.bucket, + source_s3_path.key, + dest_s3_path.bucket, + dest_s3_path.key, + ) + else: + # from S3 to local + if source.endswith("/"): + if not recursive: + raise ValueError( + f"Source {source} is a folder. Use --recursive" + ) + self.download_dir( + source_s3_path.bucket, + source_s3_path.key + "/", + destination, + ) + else: + self.s3_gateway.download_file( + source_s3_path.bucket, source_s3_path.key, destination + ) + + def upload_dir(self, source: str, s3_path_bucket: str, s3_path_key: str) -> None: + for root, dirs, files in os.walk(source): + for file in files: + local_path = join(root, file) + destination_path = s3_path_key + "/" + relpath(local_path, source) + + self.s3_gateway.upload_file( + local_path, + s3_path_bucket, + destination_path, + ) + for dir in dirs: + local_path = join(root, dir) + destination_path = s3_path_key + "/" + relpath(local_path, source) + + self.s3_gateway.put_object( + s3_path_bucket, + destination_path + "/", + "", + ) + + def download_dir( + self, s3_path_bucket: str, s3_path_key: str, destination: str + ) -> None: + if not self.s3_gateway.object_exists(s3_path_bucket, s3_path_key): + raise ValueError( + f"Key {s3_path_key} does not exist in bucket {s3_path_bucket}" + ) + keys = self.s3_gateway.list_object2(s3_path_bucket, s3_path_key) + for key in keys: + local_path = normpath(destination + "/" + key[len(s3_path_key) :]) + if key.endswith("/"): + if not path.exists(local_path): + os.makedirs(local_path) + else: + self.s3_gateway.download_file(s3_path_bucket, key, local_path) + + def copy_dir( + self, + source_bucket: str, + source_key: str, + destination_bucket: str, + destination_key: str, + ) -> None: + if not self.s3_gateway.object_exists(source_bucket, source_key): + raise ValueError( + f"Key {source_key} does not exist in bucket {source_bucket}" + ) + keys = self.s3_gateway.list_object2(source_bucket, source_key) + for key in keys: + destination_path = destination_key + "/" + key[len(source_key) :] + if key.endswith("/"): + self.s3_gateway.put_object( + source_bucket, + destination_path, + "", + ) + else: + self.s3_gateway.copy( + source_bucket, + key, + destination_bucket, + destination_path, + ) + + def delete(self, filename: str) -> None: + """Delete an s3 file + Keyword arguments: + filename -- the s3 file to be deleted + """ + if StorageService.path_type(filename) == PathType.S3: + s3_path = S3Path(filename) + self.s3_gateway.delete_object(s3_path.bucket, s3_path.key) + else: + raise ValueError("The file is not an s3 file") + + def file_exists(self, filename: str) -> bool: + if StorageService.path_type(filename) == PathType.S3: + s3_path = S3Path(filename) + return self.s3_gateway.object_exists(s3_path.bucket, s3_path.key) + else: + raise ValueError(f"File {filename} is not an S3 filepath") + + def get_file_info(self, filename: str) -> FileInfo: + """Show file information (last modified time, type and size) + Keyword arguments: + filename -- the s3 file to be shown + """ + s3_path = S3Path(filename) + file_info_dict = self.s3_gateway.get_object_info(s3_path.bucket, s3_path.key) + return FileInfo( + file_name=filename, + last_modified=file_info_dict.get("LastModified").ctime(), + file_size=file_info_dict.get("ContentLength"), + ) + + def get_file_size(self, filename: str) -> int: + s3_path = S3Path(filename) + return self.s3_gateway.get_object_size(s3_path.bucket, s3_path.key) + + def list_folders(self, filename: str) -> List[str]: + s3_path = S3Path(filename) + return self.s3_gateway.list_folders(s3_path.bucket, s3_path.key) + + def get_bucket_policy_statements(self, bucket: str) -> List[PolicyStatement]: + return self.s3_gateway.get_policy_statements(bucket) + + def get_bucket_public_access_block(self, bucket: str) -> PublicAccessBlockConfig: + return self.s3_gateway.get_public_access_block(bucket) + + def list_files(self, dirPath: str) -> List[str]: + """Returns all paths of files in folders and sub folders recursively + Keyword arguments: + dirPath -- s3 dir path + """ + s3_path = S3Path(dirPath) + return self.s3_gateway.list_object2(s3_path.bucket, s3_path.key) diff --git a/onedocker/tests/service/test_storage_s3.py b/onedocker/tests/service/test_storage_s3.py new file mode 100644 index 00000000..13120612 --- /dev/null +++ b/onedocker/tests/service/test_storage_s3.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os +import unittest +from unittest.mock import call, MagicMock, patch + +from onedocker.service.storage_s3 import S3StorageService + + +class TestS3StorageService(unittest.TestCase): + LOCAL_FILE = "/usr/test_file" + LOCAL_FOLDER = "/foo" + S3_FILE = "https://bucket.s3.Region.amazonaws.com/test_file" + S3_FILE_COPY = "https://bucket.s3.Region.amazonaws.com/test_file_copy" + S3_FOLDER = "https://bucket.s3.Region.amazonaws.com/test_folder/" + S3_FOLDER_COPY = "https://bucket.s3.Region.amazonaws.com/test_folder_copy/" + S3_FILE_WITH_SUBFOLDER = ( + "https://bucket.s3.Region.amazonaws.com/test_folder/test_file" + ) + """ + The layout of LOCAL_DIR: + /foo/ + ├── bar/ + └── baz/ + ├── a + └── b + """ + LOCAL_DIR = [ + ("/foo", ("bar",), ("baz",)), + ("/foo/baz", (), ("a", "b")), + ] + + S3_DIR = [ + "test_folder/bar/", + "test_folder/baz/", + "test_folder/baz/a", + "test_folder/baz/b", + ] + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_local_to_s3(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.upload_file = MagicMock(return_value=None) + service.copy(self.LOCAL_FILE, self.S3_FILE) + service.s3_gateway.upload_file.assert_called_with( + str(self.LOCAL_FILE), "bucket", "test_file" + ) + + def test_copy_local_dir_to_s3_recursive_false(self): + service = S3StorageService("us-west-1") + with patch("os.path.isdir", return_value=True): + self.assertRaises( + ValueError, service.copy, self.LOCAL_FOLDER, self.S3_FOLDER, False + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_local_dir_to_s3_recursive_true(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.put_object = MagicMock(return_value=None) + service.s3_gateway.upload_file = MagicMock(return_value=None) + + with patch("os.path.isdir", return_value=True): + with patch("os.walk", return_value=self.LOCAL_DIR): + service.copy(self.LOCAL_FOLDER, self.S3_FOLDER, True) + + service.s3_gateway.put_object.assert_called_with( + "bucket", "test_folder/bar/", "" + ) + + service.s3_gateway.upload_file.assert_has_calls( + [ + call("/foo/baz/a", "bucket", "test_folder/baz/a"), + call("/foo/baz/b", "bucket", "test_folder/baz/b"), + ], + any_order=True, + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_to_local(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.download_file = MagicMock(return_value=None) + service.copy(self.S3_FILE, self.LOCAL_FILE) + service.s3_gateway.download_file.assert_called_with( + "bucket", "test_file", str(self.LOCAL_FILE) + ) + + def test_copy_s3_dir_to_local_recursive_false(self): + service = S3StorageService("us-west-1") + self.assertRaises( + ValueError, service.copy, self.S3_FOLDER, self.LOCAL_FOLDER, False + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_dir_to_local_source_does_not_exist(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.object_exists = MagicMock(return_value=False) + self.assertRaises( + ValueError, service.copy, self.S3_FOLDER, self.LOCAL_FOLDER, False + ) + + @patch("os.makedirs") + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_dir_to_local_ok(self, MockS3Gateway, os_makedirs): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.object_exists = MagicMock(return_value=True) + service.s3_gateway.list_object2 = MagicMock(return_value=self.S3_DIR) + service.s3_gateway.download_file = MagicMock(return_value=None) + + service.copy(self.S3_FOLDER, self.LOCAL_FOLDER, True) + + os.makedirs.assert_has_calls( + [ + call("/foo/bar"), + call("/foo/baz"), + ], + any_order=True, + ) + + service.s3_gateway.download_file.assert_has_calls( + [ + call("bucket", "test_folder/baz/a", "/foo/baz/a"), + call("bucket", "test_folder/baz/b", "/foo/baz/b"), + ], + any_order=True, + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_local_to_local(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + self.assertRaises(ValueError, service.copy, self.LOCAL_FILE, self.LOCAL_FILE) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_to_s3(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.copy(self.S3_FILE, self.S3_FILE_COPY) + service.s3_gateway.copy.assert_called_with( + "bucket", "test_file", "bucket", "test_file_copy" + ) + + def test_copy_s3_dir_to_s3_recursive_false(self): + service = S3StorageService("us-west-1") + self.assertRaises( + ValueError, service.copy, self.S3_FOLDER, self.S3_FOLDER_COPY, False + ) + + def test_copy_s3_dir_to_s3_source_and_dest_are_the_same(self): + service = S3StorageService("us-west-1") + self.assertRaises( + ValueError, service.copy, self.S3_FOLDER, self.S3_FOLDER, True + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_dir_to_s3_source_does_not_exist(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.object_exists = MagicMock(return_value=False) + self.assertRaises( + ValueError, service.copy, self.S3_FOLDER, self.S3_FOLDER_COPY, False + ) + + @patch("os.makedirs") + @patch("fbpcp.gateway.s3.S3Gateway") + def test_copy_s3_dir_to_s3_ok(self, MockS3Gateway, os_makedirs): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.s3_gateway.object_exists = MagicMock(return_value=True) + service.s3_gateway.list_object2 = MagicMock(return_value=self.S3_DIR) + service.s3_gateway.put_object = MagicMock(return_value=None) + service.s3_gateway.copy = MagicMock(return_value=None) + + service.copy(self.S3_FOLDER, self.S3_FOLDER_COPY, True) + + service.s3_gateway.put_object.assert_has_calls( + [ + call("bucket", "test_folder_copy/bar/", ""), + call("bucket", "test_folder_copy/baz/", ""), + ], + any_order=True, + ) + + service.s3_gateway.copy.assert_has_calls( + [ + call("bucket", "test_folder/baz/a", "bucket", "test_folder_copy/baz/a"), + call("bucket", "test_folder/baz/b", "bucket", "test_folder_copy/baz/b"), + ], + any_order=True, + ) + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_delete_s3(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.delete(self.S3_FILE) + service.s3_gateway.delete_object.assert_called_with("bucket", "test_file") + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_file_exists(self, MockS3Gateway): + service = S3StorageService("us-west-1") + + service.s3_gateway = MockS3Gateway() + service.file_exists(self.S3_FILE) + service.s3_gateway.object_exists.assert_called_with("bucket", "test_file") + + @patch("fbpcp.gateway.s3.S3Gateway") + def test_list_files(self, MockS3Gateway): + service = S3StorageService("us-west-1") + service.s3_gateway = MockS3Gateway() + service.list_files(self.S3_FOLDER) + service.s3_gateway.list_object2.assert_called_with("bucket", "test_folder")