Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ The reading is divided into two stages:

## Development

### Clone the Repository

If you don't have `git-lfs` installed, please install it first.

```
$ git clone https://github.com/alibaba/paimon-cpp.git
$ cd paimon-cpp
$ git lfs pull
```

### CMake

```
Expand Down
1 change: 1 addition & 0 deletions docs/source/building.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ repository:

$ git clone https://github.com/alibaba/paimon-cpp.git
$ cd paimon-cpp
$ git lfs pull

Manual configuration
--------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

#include "paimon/core/casting/string_to_timestamp_cast_executor.h"

#include <arrow/compute/cast.h>

#include <cassert>
#include <string>
#include <utility>

#include "arrow/compute/cast.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "paimon/core/casting/casting_utils.h"
Expand Down
3 changes: 1 addition & 2 deletions src/paimon/core/catalog/renaming_snapshot_commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#pragma once

#include <paimon/fs/file_system.h>

#include <memory>
#include <string>
#include <utility>
Expand All @@ -26,6 +24,7 @@
#include "paimon/core/catalog/snapshot_commit.h"
#include "paimon/core/snapshot.h"
#include "paimon/core/utils/snapshot_manager.h"
#include "paimon/fs/file_system.h"
#include "paimon/result.h"
#include "paimon/status.h"

Expand Down
70 changes: 30 additions & 40 deletions src/paimon/core/operation/file_store_commit_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/fs/resolving_file_system.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/catalog/commit_table_request.h"
Expand All @@ -62,7 +61,6 @@
#include "paimon/fs/local/local_file_system_factory.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/metrics.h"
#include "paimon/string_builder.h"
#include "paimon/testing/utils/binary_row_generator.h"
#include "paimon/testing/utils/io_exception_helper.h"
#include "paimon/testing/utils/testharness.h"
Expand All @@ -88,26 +86,27 @@ class GmockFileSystemFactory : public LocalFileSystemFactory {

Result<std::unique_ptr<FileSystem>> Create(
const std::string& path, const std::map<std::string, std::string>& options) const override {
auto fs = std::make_unique<GmockFileSystem>();
auto fs = std::make_unique<testing::NiceMock<GmockFileSystem>>();
auto fs_ptr = fs.get();
using ::testing::A;
using ::testing::Invoke;

ON_CALL(*fs, ListDir(A<const std::string&>(),
A<std::vector<std::unique_ptr<BasicFileStatus>>*>()))
.WillByDefault(
Invoke([&](const std::string& directory,
std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) {
return fs->LocalFileSystem::ListDir(directory, file_status_list);
Invoke([fs_ptr](const std::string& directory,
std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) {
return fs_ptr->LocalFileSystem::ListDir(directory, file_status_list);
}));

ON_CALL(*fs, ReadFile(A<const std::string&>(), A<std::string*>()))
.WillByDefault(Invoke([&](const std::string& path, std::string* content) {
return fs->FileSystem::ReadFile(path, content);
.WillByDefault(Invoke([fs_ptr](const std::string& path, std::string* content) {
return fs_ptr->FileSystem::ReadFile(path, content);
}));

ON_CALL(*fs, AtomicStore(A<const std::string&>(), A<const std::string&>()))
.WillByDefault(Invoke([&](const std::string& path, const std::string& content) {
return fs->FileSystem::AtomicStore(path, content);
.WillByDefault(Invoke([fs_ptr](const std::string& path, const std::string& content) {
return fs_ptr->FileSystem::AtomicStore(path, content);
}));

return fs;
Expand Down Expand Up @@ -364,33 +363,31 @@ TEST_F(FileStoreCommitImplTest, TestRESTCatalogCommit) {
ASSERT_FALSE(exist);
}

// TODO(jinli.zjw): fix disabled case
TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryTenTimes) {
TEST_F(FileStoreCommitImplTest, TestCommitWithConflictSnapshotAndRetryTenTimes) {
std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/";
auto dir = UniqueTestDirectory::Create();
std::string table_path = dir->Str();
ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("gmock_fs", table_path, {}));
CommitContextBuilder context_builder(table_path, "commit_user_1");
ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context,
context_builder.AddOption(Options::MANIFEST_FORMAT, "orc")
.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb")
.AddOption(Options::COMMIT_MAX_RETRIES, "10")
.AddOption(Options::FILE_SYSTEM, "gmock_fs")
.WithFileSystem(fs)
.Finish());

ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context)));
auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(commit.get());
std::string latest_hint = PathUtil::JoinPath(table_path, "snapshot/LATEST");

auto* resolving_fs =
dynamic_cast<ResolvingFileSystem*>(commit_impl->snapshot_manager_->fs_.get());
ASSERT_OK_AND_ASSIGN(auto real_fs, resolving_fs->GetRealFileSystem(table_path));
auto* mock_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
auto* mock_fs = dynamic_cast<GmockFileSystem*>(fs.get());
EXPECT_CALL(*mock_fs, ReadFile(testing::StrEq(latest_hint), testing::_))
.WillRepeatedly(testing::Invoke([](const std::string& path, std::string* content) {
*content = "-1";
return Status::OK();
}));
EXPECT_CALL(*mock_fs, ListDir(testing::_, testing::_)).Times(testing::AnyNumber());
EXPECT_CALL(*mock_fs,
ListDir(testing::StrEq(PathUtil::JoinPath(table_path, "snapshot")), testing::_))
.WillRepeatedly(
Expand All @@ -407,25 +404,23 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryT
ASSERT_NOK(commit->Commit(msgs));
}

TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryOnce) {
TEST_F(FileStoreCommitImplTest, TestCommitWithConflictSnapshotAndRetryOnce) {
std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/";
auto dir = UniqueTestDirectory::Create();
std::string table_path = dir->Str();
ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("gmock_fs", table_path, {}));
CommitContextBuilder context_builder(table_path, "commit_user_1");
ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context,
context_builder.AddOption(Options::MANIFEST_FORMAT, "orc")
.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb")
.AddOption(Options::FILE_SYSTEM, "gmock_fs")
.WithFileSystem(fs)
.Finish());

ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context)));
auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(commit.get());
std::string latest_hint = PathUtil::JoinPath(table_path, "snapshot/LATEST");
auto* resolving_fs =
dynamic_cast<ResolvingFileSystem*>(commit_impl->snapshot_manager_->fs_.get());
ASSERT_OK_AND_ASSIGN(auto real_fs, resolving_fs->GetRealFileSystem(table_path));
auto* mock_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
auto* mock_fs = dynamic_cast<GmockFileSystem*>(fs.get());
EXPECT_CALL(*mock_fs, ReadFile(testing::StrEq(latest_hint), testing::_))
.WillRepeatedly(testing::Invoke([](const std::string& path, std::string* content) {
*content = "-1";
Expand All @@ -439,6 +434,7 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryO
return mock_fs->FileSystem::ReadFile(path, content);
}));

EXPECT_CALL(*mock_fs, ListDir(testing::_, testing::_)).Times(testing::AnyNumber());
EXPECT_CALL(*mock_fs,
ListDir(testing::StrEq(PathUtil::JoinPath(table_path, "snapshot")), testing::_))
.WillOnce(
Expand Down Expand Up @@ -468,25 +464,23 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryO
ASSERT_TRUE(exist);
}

TEST_F(FileStoreCommitImplTest,
DISABLED_TestCommitWithAtomicWriteSnapshotTimeoutAndActuallySucceed) {
TEST_F(FileStoreCommitImplTest, TestCommitWithAtomicWriteSnapshotTimeoutAndActuallySucceed) {
std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/";
auto dir = UniqueTestDirectory::Create();
std::string table_path = dir->Str();
ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("gmock_fs", table_path, {}));
CommitContextBuilder context_builder(table_path, "commit_user_1");
ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context,
context_builder.AddOption(Options::MANIFEST_FORMAT, "orc")
.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb")
.AddOption(Options::FILE_SYSTEM, "gmock_fs")
.WithFileSystem(fs)
.Finish());

ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context)));
std::string new_snapshot_6 = PathUtil::JoinPath(table_path, "snapshot/snapshot-6");
auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(commit.get());
auto* resolving_fs = dynamic_cast<ResolvingFileSystem*>(commit_impl->fs_.get());
ASSERT_OK_AND_ASSIGN(auto real_fs, resolving_fs->GetRealFileSystem(table_path));
auto* mock_fs = dynamic_cast<GmockFileSystem*>(real_fs.get());
auto* mock_fs = dynamic_cast<GmockFileSystem*>(fs.get());
EXPECT_CALL(*mock_fs, AtomicStore(testing::StrEq(new_snapshot_6), testing::_))
.WillOnce(testing::Invoke([&](const std::string& path, const std::string& content) {
// to mock atomic store timeout actually succeed
Expand All @@ -507,22 +501,18 @@ TEST_F(FileStoreCommitImplTest,

CommitContextBuilder context_builder_2(table_path, "commit_user_1");
ASSERT_OK_AND_ASSIGN(std::unique_ptr<CommitContext> commit_context_2,
context_builder.AddOption(Options::MANIFEST_FORMAT, "orc")
context_builder_2.AddOption(Options::MANIFEST_FORMAT, "orc")
.AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb")
.AddOption(Options::FILE_SYSTEM, "gmock_fs")
.WithFileSystem(fs)
.Finish());

ASSERT_OK_AND_ASSIGN(auto commit_2, FileStoreCommit::Create(std::move(commit_context_2)));
ASSERT_OK_AND_ASSIGN(int32_t num_committed, commit_2->FilterAndCommit({{1, msgs}}));
ASSERT_EQ(0, num_committed);
auto commit_impl_2 = dynamic_cast<FileStoreCommitImpl*>(commit_2.get());
auto* resolving_fs_2 = dynamic_cast<ResolvingFileSystem*>(commit_impl_2->fs_.get());
ASSERT_OK_AND_ASSIGN(auto real_fs_2, resolving_fs_2->GetRealFileSystem(table_path));
auto* mock_fs_2 = dynamic_cast<GmockFileSystem*>(real_fs_2.get());
std::string new_snapshot_7 = PathUtil::JoinPath(table_path, "snapshot/snapshot-7");
EXPECT_CALL(*mock_fs_2, AtomicStore(testing::StrEq(new_snapshot_7), testing::_))
EXPECT_CALL(*mock_fs, AtomicStore(testing::StrEq(new_snapshot_7), testing::_))
.WillOnce(testing::Invoke([&](const std::string& path, const std::string& content) {
return mock_fs_2->FileSystem::AtomicStore(path, content);
return mock_fs->FileSystem::AtomicStore(path, content);
}));
std::vector<std::shared_ptr<CommitMessage>> msgs_2 =
GetCommitMessages(paimon::test::GetDataDir() +
Expand Down
3 changes: 1 addition & 2 deletions src/paimon/core/utils/tag_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "paimon/core/utils/tag_manager.h"

#include <paimon/fs/file_system.h>

#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -26,6 +24,7 @@
#include "paimon/common/utils/path_util.h"
#include "paimon/core/tag/tag.h"
#include "paimon/core/utils/branch_manager.h"
#include "paimon/fs/file_system.h"

namespace paimon {

Expand Down
Loading