From 0a373bc260afd1c9a8a3153b60ff888088927439 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 27 Aug 2024 11:40:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87tonic=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=97=B4=E9=80=9A=E4=BF=A1=EF=BC=88=E5=88=9D?= =?UTF-8?q?=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 629 +++++++++++++++++- Cargo.toml | 45 +- common/protos/Cargo.toml | 17 + common/protos/build.rs | 258 +++++++ .../protos/src/flatbuffers_generated/mod.rs | 1 + .../src/flatbuffers_generated/models.rs | 124 ++++ common/protos/src/lib.rs | 6 + common/protos/src/models.fbs | 5 + common/protos/src/node.proto | 19 + common/protos/src/proto_gen/mod.rs | 1 + common/protos/src/proto_gen/node_service.rs | 250 +++++++ e2e_test/Cargo.toml | 15 + e2e_test/README.md | 0 e2e_test/src/lib.rs | 1 + e2e_test/src/reliant/README.md | 1 + e2e_test/src/reliant/mod.rs | 1 + e2e_test/src/reliant/node_interact_test.rs | 46 ++ rustfs/Cargo.toml | 43 +- rustfs/src/grpc.rs | 47 ++ rustfs/src/main.rs | 9 +- rustfs/src/service.rs | 150 +++++ 21 files changed, 1632 insertions(+), 36 deletions(-) create mode 100644 common/protos/Cargo.toml create mode 100644 common/protos/build.rs create mode 100644 common/protos/src/flatbuffers_generated/mod.rs create mode 100644 common/protos/src/flatbuffers_generated/models.rs create mode 100644 common/protos/src/lib.rs create mode 100644 common/protos/src/models.fbs create mode 100644 common/protos/src/node.proto create mode 100644 common/protos/src/proto_gen/mod.rs create mode 100644 common/protos/src/proto_gen/node_service.rs create mode 100644 e2e_test/Cargo.toml create mode 100644 e2e_test/README.md create mode 100644 e2e_test/src/lib.rs create mode 100644 e2e_test/src/reliant/README.md create mode 100644 e2e_test/src/reliant/mod.rs create mode 100644 e2e_test/src/reliant/node_interact_test.rs create mode 100644 rustfs/src/grpc.rs create mode 100644 rustfs/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index dd42ff16..ce909a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.7.8" @@ -86,12 +92,40 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + [[package]] name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -124,6 +158,53 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -134,11 +215,17 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.8.0" @@ -267,6 +354,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -307,6 +403,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "e2e_test" +version = "0.0.1" +dependencies = [ + "flatbuffers", + "protos", + "tokio", + "tonic", +] + [[package]] name = "ecstore" version = "0.1.0" @@ -346,12 +452,60 @@ dependencies = [ "xxhash-rust", ] +[[package]] +name = "either" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + [[package]] name = "equivalent" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "flatbuffers" +version = "24.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + +[[package]] +name = "flate2" +version = "1.0.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" +dependencies = [ + "crc32fast", + "miniz_oxide 0.8.0", +] + [[package]] name = "fnv" version = "1.0.7" @@ -510,7 +664,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.2.6", "slab", "tokio", "tokio-util", @@ -627,6 +781,20 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", ] [[package]] @@ -636,12 +804,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" dependencies = [ "bytes", + "futures-channel", "futures-util", "http", "http-body", "hyper", "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -654,6 +827,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.2.6" @@ -679,6 +862,15 @@ version = "1.70.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -703,6 +895,12 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.12" @@ -737,6 +935,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.4" @@ -764,6 +968,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "0.8.11" @@ -775,6 +988,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "netif" version = "0.1.6" @@ -839,6 +1058,12 @@ dependencies = [ "libc", ] +[[package]] +name = "numeric_cast" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf70ee2d9b1737d1836c20d9f8f96ec3901b2bf92128439db13237ddce9173a5" + [[package]] name = "object" version = "0.36.0" @@ -965,6 +1190,36 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -995,6 +1250,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -1005,10 +1270,97 @@ dependencies = [ ] [[package]] -name = "quick-xml" -version = "0.31.0" +name = "prost" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost", +] + +[[package]] +name = "protobuf" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bcc343da15609eaecd65f8aa76df8dc4209d325131d8219358c0aaaebab0bf6" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror", +] + +[[package]] +name = "protobuf-support" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0766e3675a627c327e4b3964582594b0e8741305d628a98a5de75a1d15f99b9" +dependencies = [ + "thiserror", +] + +[[package]] +name = "protos" +version = "0.0.1" +dependencies = [ + "flatbuffers", + "prost", + "prost-build", + "protobuf", + "tokio", + "tonic", + "tonic-build", + "tower", +] + +[[package]] +name = "quick-xml" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" dependencies = [ "memchr", "serde", @@ -1119,6 +1471,21 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rmp" version = "0.8.14" @@ -1147,6 +1514,15 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustfs" version = "0.1.0" @@ -1155,21 +1531,95 @@ dependencies = [ "bytes", "clap", "ecstore", + "flatbuffers", "futures", "futures-util", "http", + "http-body", + "hyper", "hyper-util", "mime", "netif", + "pin-project-lite", + "prost", + "prost-build", + "prost-types", + "protobuf", + "protos", "s3s", "time", "tokio", + "tonic", + "tonic-build", + "tonic-reflection", + "tower", "tracing", "tracing-error", "tracing-subscriber", "transform-stream", ] +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pemfile" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" + +[[package]] +name = "rustls-webpki" +version = "0.102.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "ryu" version = "1.0.18" @@ -1178,9 +1628,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3s" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67e6cdc8002708b435946eec39afa13c43e4288d1de6316a12816e4cfaaa6c2c" +checksum = "fa54e3b4b4791c8c62291516997866b4f265c3fcbfdbcdd0b8da62896fba8bfa" dependencies = [ "arrayvec", "async-trait", @@ -1189,7 +1639,9 @@ dependencies = [ "bytes", "bytestring", "chrono", + "crc32c", "crc32fast", + "digest", "futures", "hex-simd", "hmac", @@ -1202,6 +1654,7 @@ dependencies = [ "mime", "nom", "nugine-rust-utils", + "numeric_cast", "pin-project-lite", "quick-xml", "serde", @@ -1209,8 +1662,11 @@ dependencies = [ "sha1", "sha2", "smallvec", + "sync_wrapper 1.0.1", "thiserror", "time", + "tokio", + "tower", "tracing", "transform-stream", "urlencoding", @@ -1223,6 +1679,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.203" @@ -1363,15 +1825,40 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.68" +version = "2.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + +[[package]] +name = "tempfile" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "thiserror" version = "1.0.61" @@ -1477,6 +1964,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -1501,12 +1999,104 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "flate2", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-pemfile", + "socket2", + "tokio", + "tokio-rustls", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tonic-reflection" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b742c83ad673e9ab5b4ce0981f7b9e8932be9d60e8682cbf9120494764dbc173" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1582,6 +2172,12 @@ dependencies = [ "futures-core", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.17.0" @@ -1609,6 +2205,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" @@ -1667,6 +2269,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 9420e9b3..0ec2e78d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,21 +1,42 @@ [workspace] resolver = "2" -members = ["rustfs", "ecstore"] +members = ["rustfs", "ecstore", "e2e_test", "common/protos"] [workspace.package] edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/rustfs/rustfs" rust-version = "1.75" +version = "0.0.1" [workspace.dependencies] +async-trait = "0.1.80" +bytes = "1.6.0" +clap = { version = "4.5.7", features = ["derive"] } +ecstore = { path = "./ecstore" } +flatbuffers = "24.3.25" +futures = "0.3.30" +futures-util = "0.3.30" +hyper = "1.3.1" +hyper-util = { version = "0.1.5", features = [ + "tokio", + "server-auto", + "server-graceful", +] } +http = "1.1.0" +http-body = "1.0.0" +mime = "0.3.17" +netif = "0.1.6" +pin-project-lite = "0.2" +# pin-utils = "0.1.0" +prost = "0.13.1" +prost-build = "0.13.1" +prost-types = "0.13.1" +protobuf = "3.2" +protos = { path = "./common/protos" } +s3s = { version = "0.10.1", default-features = true, features = ["tower"] } serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" -tracing = "0.1.40" -tracing-error = "0.2.0" -futures = "0.3.30" -bytes = "1.6.0" -http = "1.1.0" thiserror = "1.0.61" time = { version = "0.3.36", features = [ "std", @@ -24,6 +45,12 @@ time = { version = "0.3.36", features = [ "macros", "serde", ] } -async-trait = "0.1.80" -tokio = { version = "1.38.0", features = ["fs"] } -futures-util = "0.3.30" +tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] } +tonic = { version = "0.12.1", features = ["gzip"] } +tonic-build = "0.12.1" +tonic-reflection = "0.12" +tower = "0.4.13" +tracing = "0.1.40" +tracing-error = "0.2.0" +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } +transform-stream = "0.3.0" \ No newline at end of file diff --git a/common/protos/Cargo.toml b/common/protos/Cargo.toml new file mode 100644 index 00000000..a6ab9df6 --- /dev/null +++ b/common/protos/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "protos" +version.workspace = true +edition.workspace = true + +[dependencies] +#async-backtrace = { workspace = true, optional = true } +flatbuffers = { workspace = true } +prost = { workspace = true } +protobuf = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true, features = ["transport", "tls"] } +tower = { workspace = true } + +[build-dependencies] +prost-build = { workspace = true } +tonic-build = { workspace = true } diff --git a/common/protos/build.rs b/common/protos/build.rs new file mode 100644 index 00000000..35efacb7 --- /dev/null +++ b/common/protos/build.rs @@ -0,0 +1,258 @@ +use std::{ + cmp, env, fs, + io::Write, + path::{Path, PathBuf}, + process::Command, +}; + +type AnyError = Box; + +const ENV_OUT_DIR: &str = "OUT_DIR"; +const VERSION_PROTOBUF: Version = Version(27, 0, 0); // 27.0 +const VERSION_FLATBUFFERS: Version = Version(24, 3, 25); // 24.3.25 +/// Build protos if the major version of `flatc` or `protoc` is greater +/// or lesser than the expected version. +const ENV_BUILD_PROTOS: &str = "BUILD_PROTOS"; +/// Path of `flatc` binary. +const ENV_FLATC_PATH: &str = "FLATC_PATH"; + +fn main() -> Result<(), AnyError> { + let version = protobuf_compiler_version()?; + let need_compile = match version.compare_ext(&VERSION_PROTOBUF) { + Ok(cmp::Ordering::Equal) => true, + Ok(_) => { + let version_err = Version::build_error_message(&version, &VERSION_PROTOBUF).unwrap(); + println!("cargo:warning=Tool `protoc` {version_err}, skip compiling."); + false + } + Err(version_err) => { + // return Err(format!("Tool `protoc` {version_err}, please update it.").into()); + println!("cargo:warning=Tool `protoc` {version_err}, please update it."); + false + } + }; + + if !need_compile { + return Ok(()); + } + + // path of proto file + let project_root_dir = env::current_dir()?; + let proto_dir = project_root_dir.join("src"); + let proto_files = &["node.proto"]; + let proto_out_dir = project_root_dir.join("src").join("proto_gen"); + let flatbuffer_out_dir = project_root_dir.join("src").join("flatbuffers_generated"); + let descriptor_set_path = PathBuf::from(env::var(ENV_OUT_DIR).unwrap()).join("proto-descriptor.bin"); + + tonic_build::configure() + .out_dir(proto_out_dir) + .file_descriptor_set_path(descriptor_set_path) + .protoc_arg("--experimental_allow_proto3_optional") + .compile_well_known_types(true) + .emit_rerun_if_changed(false) + .compile(proto_files, &[proto_dir.clone()]) + .map_err(|e| format!("Failed to generate protobuf file: {e}."))?; + + // protos/gen/mod.rs + let generated_mod_rs_path = project_root_dir.join("src").join("proto_gen").join("mod.rs"); + + let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?; + writeln!(&mut generated_mod_rs, "pub mod node_service;")?; + generated_mod_rs.flush()?; + + let generated_mod_rs_path = project_root_dir.join("src").join("lib.rs"); + + let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?; + writeln!(&mut generated_mod_rs, "#![allow(unused_imports)]")?; + writeln!(&mut generated_mod_rs, "#![allow(clippy::all)]")?; + writeln!(&mut generated_mod_rs, "pub mod proto_gen;")?; + generated_mod_rs.flush()?; + + let flatc_path = match env::var(ENV_FLATC_PATH) { + Ok(path) => { + println!("cargo:warning=Specified flatc path by environment {ENV_FLATC_PATH}={path}"); + path + } + Err(_) => "flatc".to_string(), + }; + + // build src/protos/*.fbs files to src/protos/gen/ + compile_flatbuffers_models( + &mut generated_mod_rs, + &flatc_path, + proto_dir.clone(), + flatbuffer_out_dir.clone(), + vec!["models"], + )?; + Ok(()) +} + +/// Compile proto/**.fbs files. +fn compile_flatbuffers_models, S: AsRef>( + generated_mod_rs: &mut fs::File, + flatc_path: &str, + in_fbs_dir: P, + out_rust_dir: P, + mod_names: Vec, +) -> Result<(), AnyError> { + let version = flatbuffers_compiler_version(flatc_path)?; + let need_compile = match version.compare_ext(&VERSION_FLATBUFFERS) { + Ok(cmp::Ordering::Equal) => true, + Ok(_) => { + let version_err = Version::build_error_message(&version, &VERSION_FLATBUFFERS).unwrap(); + println!("cargo:warning=Tool `{flatc_path}` {version_err}, skip compiling."); + false + } + Err(version_err) => { + return Err(format!("Tool `{flatc_path}` {version_err}, please update it.").into()); + } + }; + + let fbs_dir = in_fbs_dir.as_ref(); + let rust_dir = out_rust_dir.as_ref(); + fs::create_dir_all(rust_dir)?; + + // $rust_dir/mod.rs + let mut sub_mod_rs = fs::File::create(rust_dir.join("mod.rs"))?; + writeln!(generated_mod_rs)?; + writeln!(generated_mod_rs, "mod flatbuffers_generated;")?; + for mod_name in mod_names.iter() { + let mod_name = mod_name.as_ref(); + writeln!(generated_mod_rs, "pub use flatbuffers_generated::{mod_name}::*;")?; + writeln!(&mut sub_mod_rs, "pub mod {mod_name};")?; + + if need_compile { + let fbs_file_path = fbs_dir.join(format!("{mod_name}.fbs")); + let output = Command::new(flatc_path) + .arg("-o") + .arg(rust_dir) + .arg("--rust") + .arg("--gen-mutable") + .arg("--gen-onefile") + .arg("--gen-name-strings") + .arg("--filename-suffix") + .arg("") + .arg(&fbs_file_path) + .output() + .map_err(|e| format!("Failed to execute process of flatc: {e}"))?; + if !output.status.success() { + return Err(format!( + "Failed to generate file '{}' by flatc(path: '{flatc_path}'): {}.", + fbs_file_path.display(), + String::from_utf8_lossy(&output.stderr), + ) + .into()); + } + } + } + generated_mod_rs.flush()?; + sub_mod_rs.flush()?; + + Ok(()) +} + +/// Run command `flatc --version` to get the version of flatc. +/// +/// ```ignore +/// $ flatc --version +/// flatc version 24.3.25 +/// ``` +fn flatbuffers_compiler_version(flatc_path: impl AsRef) -> Result { + let flatc_path = flatc_path.as_ref(); + Version::try_get(format!("{}", flatc_path.display()), |output| { + const PREFIX_OF_VERSION: &str = "flatc version "; + let output = output.trim(); + if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) { + Ok(version.to_string()) + } else { + Err(format!("Failed to get flatc version: {output}")) + } + }) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +struct Version(u32, u32, u32); + +impl Version { + fn try_get Result>(exe: String, output_to_version_string: F) -> Result { + let cmd = format!("{exe} --version"); + let output = std::process::Command::new(exe) + .arg("--version") + .output() + .map_err(|e| format!("Failed to execute `{cmd}`: {e}",))?; + let output_utf8 = String::from_utf8(output.stdout).map_err(|e| { + let output_lossy = String::from_utf8_lossy(e.as_bytes()); + format!("Command `{cmd}` returned invalid UTF-8('{output_lossy}'): {e}") + })?; + if output.status.success() { + let version_string = output_to_version_string(&output_utf8)?; + Ok(version_string.parse::()?) + } else { + Err(format!("Failed to get version by command `{cmd}`: {output_utf8}")) + } + } + + fn build_error_message(version: &Self, expected: &Self) -> Option { + match version.compare_major_version(expected) { + cmp::Ordering::Equal => None, + cmp::Ordering::Greater => Some(format!("version({version}) is greater than version({expected})")), + cmp::Ordering::Less => Some(format!("version({version}) is lesser than version({expected})")), + } + } + + fn compare_ext(&self, expected_version: &Self) -> Result { + match env::var(ENV_BUILD_PROTOS) { + Ok(build_protos) => { + if build_protos.is_empty() || build_protos == "0" { + Ok(self.compare_major_version(expected_version)) + } else { + match self.compare_major_version(expected_version) { + cmp::Ordering::Equal => Ok(cmp::Ordering::Equal), + _ => Err(Self::build_error_message(self, expected_version).unwrap()), + } + } + } + Err(_) => Ok(self.compare_major_version(expected_version)), + } + } + + fn compare_major_version(&self, other: &Self) -> cmp::Ordering { + self.0.cmp(&other.0) + } +} + +impl std::str::FromStr for Version { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut version = [0_u32; 3]; + for (i, v) in s.split('.').take(3).enumerate() { + version[i] = v.parse().map_err(|e| format!("Failed to parse version string '{s}': {e}"))?; + } + Ok(Version(version[0], version[1], version[2])) + } +} + +impl std::fmt::Display for Version { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}.{}", self.0, self.1, self.2) + } +} + +/// Run command `protoc --version` to get the version of flatc. +/// +/// ```ignore +/// $ protoc --version +/// libprotoc 27.0 +/// ``` +fn protobuf_compiler_version() -> Result { + Version::try_get("protoc".to_string(), |output| { + const PREFIX_OF_VERSION: &str = "libprotoc "; + let output = output.trim(); + if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) { + Ok(version.to_string()) + } else { + Err(format!("Failed to get protoc version: {output}")) + } + }) +} diff --git a/common/protos/src/flatbuffers_generated/mod.rs b/common/protos/src/flatbuffers_generated/mod.rs new file mode 100644 index 00000000..c446ac88 --- /dev/null +++ b/common/protos/src/flatbuffers_generated/mod.rs @@ -0,0 +1 @@ +pub mod models; diff --git a/common/protos/src/flatbuffers_generated/models.rs b/common/protos/src/flatbuffers_generated/models.rs new file mode 100644 index 00000000..e4949fdc --- /dev/null +++ b/common/protos/src/flatbuffers_generated/models.rs @@ -0,0 +1,124 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +// @generated + +use core::cmp::Ordering; +use core::mem; + +extern crate flatbuffers; +use self::flatbuffers::{EndianScalar, Follow}; + +#[allow(unused_imports, dead_code)] +pub mod models { + + use core::cmp::Ordering; + use core::mem; + + extern crate flatbuffers; + use self::flatbuffers::{EndianScalar, Follow}; + + pub enum PingBodyOffset {} + #[derive(Copy, Clone, PartialEq)] + + pub struct PingBody<'a> { + pub _tab: flatbuffers::Table<'a>, + } + + impl<'a> flatbuffers::Follow<'a> for PingBody<'a> { + type Inner = PingBody<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table::new(buf, loc), + } + } + } + + impl<'a> PingBody<'a> { + pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; + + pub const fn get_fully_qualified_name() -> &'static str { + "models.PingBody" + } + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + PingBody { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args PingBodyArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = PingBodyBuilder::new(_fbb); + if let Some(x) = args.payload { + builder.add_payload(x); + } + builder.finish() + } + + #[inline] + pub fn payload(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::>>(PingBody::VT_PAYLOAD, None) + } + } + } + + impl flatbuffers::Verifiable for PingBody<'_> { + #[inline] + fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>>("payload", Self::VT_PAYLOAD, false)? + .finish(); + Ok(()) + } + } + pub struct PingBodyArgs<'a> { + pub payload: Option>>, + } + impl<'a> Default for PingBodyArgs<'a> { + #[inline] + fn default() -> Self { + PingBodyArgs { payload: None } + } + } + + pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, + } + impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> { + #[inline] + pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { + self.fbb_ + .push_slot_always::>(PingBody::VT_PAYLOAD, payload); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + PingBodyBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } + } + + impl core::fmt::Debug for PingBody<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("PingBody"); + ds.field("payload", &self.payload()); + ds.finish() + } + } +} // pub mod models diff --git a/common/protos/src/lib.rs b/common/protos/src/lib.rs new file mode 100644 index 00000000..4ab5a438 --- /dev/null +++ b/common/protos/src/lib.rs @@ -0,0 +1,6 @@ +#![allow(unused_imports)] +#![allow(clippy::all)] +pub mod proto_gen; + +mod flatbuffers_generated; +pub use flatbuffers_generated::models::*; diff --git a/common/protos/src/models.fbs b/common/protos/src/models.fbs new file mode 100644 index 00000000..d6a771ec --- /dev/null +++ b/common/protos/src/models.fbs @@ -0,0 +1,5 @@ +namespace models; + +table PingBody { + payload: [ubyte]; +} \ No newline at end of file diff --git a/common/protos/src/node.proto b/common/protos/src/node.proto new file mode 100644 index 00000000..879eb97f --- /dev/null +++ b/common/protos/src/node.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; +package node_service; + +/* -------------------------------------------------------------------- */ +message PingRequest { + uint64 version = 1; + bytes body = 2; +} + +message PingResponse { + uint64 version = 1; + bytes body = 2; +} + +/* -------------------------------------------------------------------- */ + +service NodeService { + rpc Ping(PingRequest) returns (PingResponse) {}; +} \ No newline at end of file diff --git a/common/protos/src/proto_gen/mod.rs b/common/protos/src/proto_gen/mod.rs new file mode 100644 index 00000000..35d3fe1b --- /dev/null +++ b/common/protos/src/proto_gen/mod.rs @@ -0,0 +1 @@ +pub mod node_service; diff --git a/common/protos/src/proto_gen/node_service.rs b/common/protos/src/proto_gen/node_service.rs new file mode 100644 index 00000000..463aac82 --- /dev/null +++ b/common/protos/src/proto_gen/node_service.rs @@ -0,0 +1,250 @@ +// This file is @generated by prost-build. +/// -------------------------------------------------------------------- +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingRequest { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(bytes = "vec", tag = "2")] + pub body: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingResponse { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(bytes = "vec", tag = "2")] + pub body: ::prost::alloc::vec::Vec, +} +/// Generated client implementations. +pub mod node_service_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; + use tonic::codegen::*; + #[derive(Debug, Clone)] + pub struct NodeServiceClient { + inner: tonic::client::Grpc, + } + impl NodeServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NodeServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor(inner: T, interceptor: F) -> NodeServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response<>::ResponseBody>, + >, + >>::Error: Into + Send + Sync, + { + NodeServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn ping( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Ping"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "Ping")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod node_service_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer. + #[async_trait] + pub trait NodeService: Send + Sync + 'static { + async fn ping( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct NodeServiceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl NodeServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NodeServiceServer + where + T: NodeService, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/node_service.NodeService/Ping" => { + #[allow(non_camel_case_types)] + struct PingSvc(pub Arc); + impl tonic::server::UnaryService for PingSvc { + type Response = super::PingResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::ping(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PingSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE) + .body(empty_body()) + .unwrap()) + }), + } + } + } + impl Clone for NodeServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl tonic::server::NamedService for NodeServiceServer { + const NAME: &'static str = "node_service.NodeService"; + } +} diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml new file mode 100644 index 00000000..a1e30a55 --- /dev/null +++ b/e2e_test/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "e2e_test" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flatbuffers.workspace = true +protos.workspace = true +tonic = { version = "0.12.1", features = ["gzip"] } +tokio = { workspace = true } \ No newline at end of file diff --git a/e2e_test/README.md b/e2e_test/README.md new file mode 100644 index 00000000..e69de29b diff --git a/e2e_test/src/lib.rs b/e2e_test/src/lib.rs new file mode 100644 index 00000000..07abfb17 --- /dev/null +++ b/e2e_test/src/lib.rs @@ -0,0 +1 @@ +mod reliant; diff --git a/e2e_test/src/reliant/README.md b/e2e_test/src/reliant/README.md new file mode 100644 index 00000000..b1a0f384 --- /dev/null +++ b/e2e_test/src/reliant/README.md @@ -0,0 +1 @@ +The test cases in this dir need to run the cluster \ No newline at end of file diff --git a/e2e_test/src/reliant/mod.rs b/e2e_test/src/reliant/mod.rs new file mode 100644 index 00000000..67d6e260 --- /dev/null +++ b/e2e_test/src/reliant/mod.rs @@ -0,0 +1 @@ +mod node_interact_test; diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs new file mode 100644 index 00000000..b2f97e13 --- /dev/null +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -0,0 +1,46 @@ +#![cfg(test)] + +use protos::{ + models::{PingBody, PingBodyBuilder}, + proto_gen::node_service::{node_service_client::NodeServiceClient, PingRequest, PingResponse}, +}; +use std::error::Error; +use tonic::Request; + +#[tokio::test] +async fn main() -> Result<(), Box> { + let mut fbb = flatbuffers::FlatBufferBuilder::new(); + let payload = fbb.create_vector(b"hello world"); + + let mut builder = PingBodyBuilder::new(&mut fbb); + builder.add_payload(payload); + let root = builder.finish(); + fbb.finish(root, None); + + let finished_data = fbb.finished_data(); + + let decoded_payload = flatbuffers::root::(finished_data); + assert!(decoded_payload.is_ok()); + + // 创建客户端 + let mut client = NodeServiceClient::connect("http://localhost:9000").await?; + + // 构造 PingRequest + let request = Request::new(PingRequest { + version: 1, + body: finished_data.to_vec(), + }); + + // 发送请求并获取响应 + let response: PingResponse = client.ping(request).await?.into_inner(); + + // 打印响应 + let ping_response_body = flatbuffers::root::(&response.body); + if let Err(e) = ping_response_body { + eprintln!("{}", e); + } else { + println!("ping_resp:body(flatbuffer): {:?}", ping_response_body); + } + + Ok(()) +} diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 5e5414f0..a155cc22 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -10,6 +10,24 @@ rust-version.workspace = true [dependencies] async-trait.workspace = true +bytes.workspace = true +clap.workspace = true +ecstore.workspace = true +flatbuffers.workspace = true +futures.workspace = true +futures-util.workspace = true +hyper.workspace = true +hyper-util.workspace = true +http.workspace = true +http-body.workspace = true +mime.workspace = true +netif.workspace = true +pin-project-lite.workspace = true +prost.workspace = true +prost-types.workspace = true +protos.workspace = true +protobuf.workspace = true +s3s.workspace = true tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting"] } tokio = { workspace = true, features = [ @@ -18,22 +36,13 @@ tokio = { workspace = true, features = [ "net", "signal", ] } +tonic = { version = "0.12.1", features = ["gzip"] } +tonic-reflection.workspace = true +tower.workspace = true tracing-error.workspace = true -http.workspace = true -bytes.workspace = true -futures.workspace = true -futures-util.workspace = true +tracing-subscriber.workspace = true +transform-stream.workspace = true -ecstore = { path = "../ecstore" } -s3s = "0.10.0" -clap = { version = "4.5.7", features = ["derive"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } -hyper-util = { version = "0.1.5", features = [ - "tokio", - "server-auto", - "server-graceful", -] } -mime = "0.3.17" -transform-stream = "0.3.0" -netif = "0.1.6" -# pin-utils = "0.1.0" +[build-dependencies] +prost-build.workspace = true +tonic-build.workspace = true \ No newline at end of file diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs new file mode 100644 index 00000000..6f9b2ae7 --- /dev/null +++ b/rustfs/src/grpc.rs @@ -0,0 +1,47 @@ +use tonic::{Request, Response, Status}; +use tracing::{debug, error, info}; + +use protos::{ + models::{PingBody, PingBodyBuilder}, + proto_gen::node_service::{ + node_service_server::{NodeService as Node, NodeServiceServer as NodeServer}, + PingRequest, PingResponse, + }, +}; + +#[derive(Debug)] +struct NodeService {} + +pub fn make_server() -> NodeServer { + NodeServer::new(NodeService {}) +} + +#[tonic::async_trait] +impl Node for NodeService { + async fn ping(&self, request: Request) -> Result, Status> { + debug!("PING"); + + let ping_req = request.into_inner(); + let ping_body = flatbuffers::root::(&ping_req.body); + if let Err(e) = ping_body { + error!("{}", e); + } else { + info!("ping_req:body(flatbuffer): {:?}", ping_body); + } + + let mut fbb = flatbuffers::FlatBufferBuilder::new(); + let payload = fbb.create_vector(b"hello, caller"); + + let mut builder = PingBodyBuilder::new(&mut fbb); + builder.add_payload(payload); + let root = builder.finish(); + fbb.finish(root, None); + + let finished_data = fbb.finished_data(); + + Ok(tonic::Response::new(PingResponse { + version: 1, + body: finished_data.to_vec(), + })) + } +} diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index d43b4ab5..113f81a7 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -1,13 +1,18 @@ mod config; +mod grpc; +mod service; mod storage; use clap::Parser; use ecstore::error::Result; +use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, + service::TowerToHyperService, }; use s3s::{auth::SimpleAuth, service::S3ServiceBuilder}; +use service::hybrid; use std::{io::IsTerminal, net::SocketAddr, str::FromStr}; use tokio::net::TcpListener; use tracing::{debug, info}; @@ -96,6 +101,8 @@ async fn run(opt: config::Opt) -> Result<()> { let hyper_service = service.into_shared(); + let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, make_server())); + let http_server = ConnBuilder::new(TokioExecutor::new()); let graceful = hyper_util::server::graceful::GracefulShutdown::new(); @@ -119,7 +126,7 @@ async fn run(opt: config::Opt) -> Result<()> { } }; - let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone()); + let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone()); let conn = graceful.watch(conn.into_owned()); tokio::spawn(async move { let _ = conn.await; diff --git a/rustfs/src/service.rs b/rustfs/src/service.rs new file mode 100644 index 00000000..ea1d4272 --- /dev/null +++ b/rustfs/src/service.rs @@ -0,0 +1,150 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::Future; +use http_body::Frame; +use hyper::body::Incoming; +use hyper::{Request, Response}; +use pin_project_lite::pin_project; +use tower::Service; + +type BoxError = Box; + +/// Generate a [`HybridService`] +pub(crate) fn hybrid(make_rest: MakeRest, grpc: Grpc) -> HybridService { + HybridService { rest: make_rest, grpc } +} + +/// The service that can serve both gRPC and REST HTTP Requests +#[derive(Clone)] +pub struct HybridService { + rest: Rest, + grpc: Grpc, +} + +impl Service> for HybridService +where + Rest: Service, Response = Response>, + Grpc: Service, Response = Response>, + Rest::Error: Into, + Grpc::Error: Into, +{ + type Response = Response>; + type Error = BoxError; + type Future = HybridFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.rest.poll_ready(cx) { + Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Pending => Poll::Pending, + }, + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Pending => Poll::Pending, + } + } + + /// When calling the service, gRPC is served if the HTTP request version is HTTP/2 + /// and if the Content-Type is "application/grpc"; otherwise, the request is served + /// as a REST request + fn call(&mut self, req: Request) -> Self::Future { + match (req.version(), req.headers().get(hyper::header::CONTENT_TYPE)) { + (hyper::Version::HTTP_2, Some(hv)) if hv.as_bytes().starts_with(b"application/grpc") => HybridFuture::Grpc { + grpc_future: self.grpc.call(req), + }, + _ => HybridFuture::Rest { + rest_future: self.rest.call(req), + }, + } + } +} + +pin_project! { + /// A hybrid HTTP body that will be used in the response type for the + /// [`HybridFuture`], i.e., the output of the [`HybridService`] + #[project = HybridBodyProj] + pub enum HybridBody { + Rest { + #[pin] + rest_body: RestBody + }, + Grpc { + #[pin] + grpc_body: GrpcBody + }, + } +} + +impl http_body::Body for HybridBody +where + RestBody: http_body::Body + Send + Unpin, + GrpcBody: http_body::Body + Send + Unpin, + RestBody::Error: Into, + GrpcBody::Error: Into, +{ + type Data = RestBody::Data; + type Error = BoxError; + + fn is_end_stream(&self) -> bool { + match self { + Self::Rest { rest_body } => rest_body.is_end_stream(), + Self::Grpc { grpc_body } => grpc_body.is_end_stream(), + } + } + + fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, Self::Error>>> { + match self.project() { + HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into), + HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into), + } + } + + fn size_hint(&self) -> http_body::SizeHint { + match self { + Self::Rest { rest_body } => rest_body.size_hint(), + Self::Grpc { grpc_body } => grpc_body.size_hint(), + } + } +} + +pin_project! { + /// A future that accepts an HTTP request as input and returns an HTTP + /// response as output for the [`HybridService`] + #[project = HybridFutureProj] + pub enum HybridFuture { + Rest { + #[pin] + rest_future: RestFuture, + }, + Grpc { + #[pin] + grpc_future: GrpcFuture, + } + } +} + +impl Future for HybridFuture +where + RestFuture: Future, RestError>>, + GrpcFuture: Future, GrpcError>>, + RestError: Into, + GrpcError: Into, +{ + type Output = Result>, BoxError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + HybridFutureProj::Rest { rest_future } => match rest_future.poll(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|rest_body| HybridBody::Rest { rest_body }))), + Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Pending => Poll::Pending, + }, + HybridFutureProj::Grpc { grpc_future } => match grpc_future.poll(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|grpc_body| HybridBody::Grpc { grpc_body }))), + Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Pending => Poll::Pending, + }, + } + } +}