通过tonic支持节点间通信(初步

This commit is contained in:
junxiang Mu
2024-08-27 11:40:00 +08:00
parent d183dd6e91
commit 0a373bc260
21 changed files with 1632 additions and 36 deletions

629
Cargo.lock generated
View File

@@ -17,6 +17,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "ahash"
version = "0.7.8"
@@ -86,12 +92,40 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anyhow"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "arrayvec"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.80"
@@ -124,6 +158,53 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "axum"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
dependencies = [
"async-trait",
"axum-core",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 0.1.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.73"
@@ -134,11 +215,17 @@ dependencies = [
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"miniz_oxide 0.7.4",
"object",
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "base64-simd"
version = "0.8.0"
@@ -267,6 +354,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32c"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
dependencies = [
"rustc_version",
]
[[package]]
name = "crc32fast"
version = "1.4.2"
@@ -307,6 +403,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "e2e_test"
version = "0.0.1"
dependencies = [
"flatbuffers",
"protos",
"tokio",
"tonic",
]
[[package]]
name = "ecstore"
version = "0.1.0"
@@ -346,12 +452,60 @@ dependencies = [
"xxhash-rust",
]
[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "fastrand"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flatbuffers"
version = "24.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f"
dependencies = [
"bitflags 1.3.2",
"rustc_version",
]
[[package]]
name = "flate2"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666"
dependencies = [
"crc32fast",
"miniz_oxide 0.8.0",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -510,7 +664,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http",
"indexmap",
"indexmap 2.2.6",
"slab",
"tokio",
"tokio-util",
@@ -627,6 +781,20 @@ dependencies = [
"pin-project-lite",
"smallvec",
"tokio",
"want",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
@@ -636,12 +804,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http",
"http-body",
"hyper",
"pin-project-lite",
"socket2",
"tokio",
"tower",
"tower-service",
"tracing",
]
[[package]]
@@ -654,6 +827,16 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]]
name = "indexmap"
version = "2.2.6"
@@ -679,6 +862,15 @@ version = "1.70.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800"
[[package]]
name = "itertools"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.11"
@@ -703,6 +895,12 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock_api"
version = "0.4.12"
@@ -737,6 +935,12 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "memchr"
version = "2.7.4"
@@ -764,6 +968,15 @@ dependencies = [
"adler",
]
[[package]]
name = "miniz_oxide"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "0.8.11"
@@ -775,6 +988,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "netif"
version = "0.1.6"
@@ -839,6 +1058,12 @@ dependencies = [
"libc",
]
[[package]]
name = "numeric_cast"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf70ee2d9b1737d1836c20d9f8f96ec3901b2bf92128439db13237ddce9173a5"
[[package]]
name = "object"
version = "0.36.0"
@@ -965,6 +1190,36 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.2.6",
]
[[package]]
name = "pin-project"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
@@ -995,6 +1250,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro2"
version = "1.0.86"
@@ -1005,10 +1270,97 @@ dependencies = [
]
[[package]]
name = "quick-xml"
version = "0.31.0"
name = "prost"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
dependencies = [
"bytes",
"heck",
"itertools",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2"
dependencies = [
"prost",
]
[[package]]
name = "protobuf"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bcc343da15609eaecd65f8aa76df8dc4209d325131d8219358c0aaaebab0bf6"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror",
]
[[package]]
name = "protobuf-support"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0766e3675a627c327e4b3964582594b0e8741305d628a98a5de75a1d15f99b9"
dependencies = [
"thiserror",
]
[[package]]
name = "protos"
version = "0.0.1"
dependencies = [
"flatbuffers",
"prost",
"prost-build",
"protobuf",
"tokio",
"tonic",
"tonic-build",
"tower",
]
[[package]]
name = "quick-xml"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
"serde",
@@ -1119,6 +1471,21 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "ring"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if",
"getrandom",
"libc",
"spin",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rmp"
version = "0.8.14"
@@ -1147,6 +1514,15 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rustfs"
version = "0.1.0"
@@ -1155,21 +1531,95 @@ dependencies = [
"bytes",
"clap",
"ecstore",
"flatbuffers",
"futures",
"futures-util",
"http",
"http-body",
"hyper",
"hyper-util",
"mime",
"netif",
"pin-project-lite",
"prost",
"prost-build",
"prost-types",
"protobuf",
"protos",
"s3s",
"time",
"tokio",
"tonic",
"tonic-build",
"tonic-reflection",
"tower",
"tracing",
"tracing-error",
"tracing-subscriber",
"transform-stream",
]
[[package]]
name = "rustix"
version = "0.38.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.23.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pemfile"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425"
dependencies = [
"base64",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
[[package]]
name = "rustls-webpki"
version = "0.102.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
[[package]]
name = "ryu"
version = "1.0.18"
@@ -1178,9 +1628,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "s3s"
version = "0.10.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67e6cdc8002708b435946eec39afa13c43e4288d1de6316a12816e4cfaaa6c2c"
checksum = "fa54e3b4b4791c8c62291516997866b4f265c3fcbfdbcdd0b8da62896fba8bfa"
dependencies = [
"arrayvec",
"async-trait",
@@ -1189,7 +1639,9 @@ dependencies = [
"bytes",
"bytestring",
"chrono",
"crc32c",
"crc32fast",
"digest",
"futures",
"hex-simd",
"hmac",
@@ -1202,6 +1654,7 @@ dependencies = [
"mime",
"nom",
"nugine-rust-utils",
"numeric_cast",
"pin-project-lite",
"quick-xml",
"serde",
@@ -1209,8 +1662,11 @@ dependencies = [
"sha1",
"sha2",
"smallvec",
"sync_wrapper 1.0.1",
"thiserror",
"time",
"tokio",
"tower",
"tracing",
"transform-stream",
"urlencoding",
@@ -1223,6 +1679,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.203"
@@ -1363,15 +1825,40 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.68"
version = "2.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9"
checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sync_wrapper"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
[[package]]
name = "tempfile"
version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.52.0",
]
[[package]]
name = "thiserror"
version = "1.0.61"
@@ -1477,6 +1964,17 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.15"
@@ -1501,12 +1999,104 @@ dependencies = [
"tokio",
]
[[package]]
name = "tonic"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"flate2",
"h2",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"rustls-pemfile",
"socket2",
"tokio",
"tokio-rustls",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tonic-reflection"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b742c83ad673e9ab5b4ce0981f7b9e8932be9d60e8682cbf9120494764dbc173"
dependencies = [
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -1582,6 +2172,12 @@ dependencies = [
"futures-core",
]
[[package]]
name = "try-lock"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "typenum"
version = "1.17.0"
@@ -1609,6 +2205,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.2"
@@ -1667,6 +2269,15 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64"
[[package]]
name = "want"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View File

@@ -1,21 +1,42 @@
[workspace]
resolver = "2"
members = ["rustfs", "ecstore"]
members = ["rustfs", "ecstore", "e2e_test", "common/protos"]
[workspace.package]
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rustfs/rustfs"
rust-version = "1.75"
version = "0.0.1"
[workspace.dependencies]
async-trait = "0.1.80"
bytes = "1.6.0"
clap = { version = "4.5.7", features = ["derive"] }
ecstore = { path = "./ecstore" }
flatbuffers = "24.3.25"
futures = "0.3.30"
futures-util = "0.3.30"
hyper = "1.3.1"
hyper-util = { version = "0.1.5", features = [
"tokio",
"server-auto",
"server-graceful",
] }
http = "1.1.0"
http-body = "1.0.0"
mime = "0.3.17"
netif = "0.1.6"
pin-project-lite = "0.2"
# pin-utils = "0.1.0"
prost = "0.13.1"
prost-build = "0.13.1"
prost-types = "0.13.1"
protobuf = "3.2"
protos = { path = "./common/protos" }
s3s = { version = "0.10.1", default-features = true, features = ["tower"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tracing = "0.1.40"
tracing-error = "0.2.0"
futures = "0.3.30"
bytes = "1.6.0"
http = "1.1.0"
thiserror = "1.0.61"
time = { version = "0.3.36", features = [
"std",
@@ -24,6 +45,12 @@ time = { version = "0.3.36", features = [
"macros",
"serde",
] }
async-trait = "0.1.80"
tokio = { version = "1.38.0", features = ["fs"] }
futures-util = "0.3.30"
tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-build = "0.12.1"
tonic-reflection = "0.12"
tower = "0.4.13"
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
transform-stream = "0.3.0"

17
common/protos/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "protos"
version.workspace = true
edition.workspace = true
[dependencies]
#async-backtrace = { workspace = true, optional = true }
flatbuffers = { workspace = true }
prost = { workspace = true }
protobuf = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "tls"] }
tower = { workspace = true }
[build-dependencies]
prost-build = { workspace = true }
tonic-build = { workspace = true }

258
common/protos/build.rs Normal file
View File

@@ -0,0 +1,258 @@
use std::{
cmp, env, fs,
io::Write,
path::{Path, PathBuf},
process::Command,
};
type AnyError = Box<dyn std::error::Error>;
const ENV_OUT_DIR: &str = "OUT_DIR";
const VERSION_PROTOBUF: Version = Version(27, 0, 0); // 27.0
const VERSION_FLATBUFFERS: Version = Version(24, 3, 25); // 24.3.25
/// Build protos if the major version of `flatc` or `protoc` is greater
/// or lesser than the expected version.
const ENV_BUILD_PROTOS: &str = "BUILD_PROTOS";
/// Path of `flatc` binary.
const ENV_FLATC_PATH: &str = "FLATC_PATH";
fn main() -> Result<(), AnyError> {
let version = protobuf_compiler_version()?;
let need_compile = match version.compare_ext(&VERSION_PROTOBUF) {
Ok(cmp::Ordering::Equal) => true,
Ok(_) => {
let version_err = Version::build_error_message(&version, &VERSION_PROTOBUF).unwrap();
println!("cargo:warning=Tool `protoc` {version_err}, skip compiling.");
false
}
Err(version_err) => {
// return Err(format!("Tool `protoc` {version_err}, please update it.").into());
println!("cargo:warning=Tool `protoc` {version_err}, please update it.");
false
}
};
if !need_compile {
return Ok(());
}
// path of proto file
let project_root_dir = env::current_dir()?;
let proto_dir = project_root_dir.join("src");
let proto_files = &["node.proto"];
let proto_out_dir = project_root_dir.join("src").join("proto_gen");
let flatbuffer_out_dir = project_root_dir.join("src").join("flatbuffers_generated");
let descriptor_set_path = PathBuf::from(env::var(ENV_OUT_DIR).unwrap()).join("proto-descriptor.bin");
tonic_build::configure()
.out_dir(proto_out_dir)
.file_descriptor_set_path(descriptor_set_path)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_well_known_types(true)
.emit_rerun_if_changed(false)
.compile(proto_files, &[proto_dir.clone()])
.map_err(|e| format!("Failed to generate protobuf file: {e}."))?;
// protos/gen/mod.rs
let generated_mod_rs_path = project_root_dir.join("src").join("proto_gen").join("mod.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "pub mod node_service;")?;
generated_mod_rs.flush()?;
let generated_mod_rs_path = project_root_dir.join("src").join("lib.rs");
let mut generated_mod_rs = fs::File::create(generated_mod_rs_path)?;
writeln!(&mut generated_mod_rs, "#![allow(unused_imports)]")?;
writeln!(&mut generated_mod_rs, "#![allow(clippy::all)]")?;
writeln!(&mut generated_mod_rs, "pub mod proto_gen;")?;
generated_mod_rs.flush()?;
let flatc_path = match env::var(ENV_FLATC_PATH) {
Ok(path) => {
println!("cargo:warning=Specified flatc path by environment {ENV_FLATC_PATH}={path}");
path
}
Err(_) => "flatc".to_string(),
};
// build src/protos/*.fbs files to src/protos/gen/
compile_flatbuffers_models(
&mut generated_mod_rs,
&flatc_path,
proto_dir.clone(),
flatbuffer_out_dir.clone(),
vec!["models"],
)?;
Ok(())
}
/// Compile proto/**.fbs files.
fn compile_flatbuffers_models<P: AsRef<Path>, S: AsRef<str>>(
generated_mod_rs: &mut fs::File,
flatc_path: &str,
in_fbs_dir: P,
out_rust_dir: P,
mod_names: Vec<S>,
) -> Result<(), AnyError> {
let version = flatbuffers_compiler_version(flatc_path)?;
let need_compile = match version.compare_ext(&VERSION_FLATBUFFERS) {
Ok(cmp::Ordering::Equal) => true,
Ok(_) => {
let version_err = Version::build_error_message(&version, &VERSION_FLATBUFFERS).unwrap();
println!("cargo:warning=Tool `{flatc_path}` {version_err}, skip compiling.");
false
}
Err(version_err) => {
return Err(format!("Tool `{flatc_path}` {version_err}, please update it.").into());
}
};
let fbs_dir = in_fbs_dir.as_ref();
let rust_dir = out_rust_dir.as_ref();
fs::create_dir_all(rust_dir)?;
// $rust_dir/mod.rs
let mut sub_mod_rs = fs::File::create(rust_dir.join("mod.rs"))?;
writeln!(generated_mod_rs)?;
writeln!(generated_mod_rs, "mod flatbuffers_generated;")?;
for mod_name in mod_names.iter() {
let mod_name = mod_name.as_ref();
writeln!(generated_mod_rs, "pub use flatbuffers_generated::{mod_name}::*;")?;
writeln!(&mut sub_mod_rs, "pub mod {mod_name};")?;
if need_compile {
let fbs_file_path = fbs_dir.join(format!("{mod_name}.fbs"));
let output = Command::new(flatc_path)
.arg("-o")
.arg(rust_dir)
.arg("--rust")
.arg("--gen-mutable")
.arg("--gen-onefile")
.arg("--gen-name-strings")
.arg("--filename-suffix")
.arg("")
.arg(&fbs_file_path)
.output()
.map_err(|e| format!("Failed to execute process of flatc: {e}"))?;
if !output.status.success() {
return Err(format!(
"Failed to generate file '{}' by flatc(path: '{flatc_path}'): {}.",
fbs_file_path.display(),
String::from_utf8_lossy(&output.stderr),
)
.into());
}
}
}
generated_mod_rs.flush()?;
sub_mod_rs.flush()?;
Ok(())
}
/// Run command `flatc --version` to get the version of flatc.
///
/// ```ignore
/// $ flatc --version
/// flatc version 24.3.25
/// ```
fn flatbuffers_compiler_version(flatc_path: impl AsRef<Path>) -> Result<Version, String> {
let flatc_path = flatc_path.as_ref();
Version::try_get(format!("{}", flatc_path.display()), |output| {
const PREFIX_OF_VERSION: &str = "flatc version ";
let output = output.trim();
if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) {
Ok(version.to_string())
} else {
Err(format!("Failed to get flatc version: {output}"))
}
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct Version(u32, u32, u32);
impl Version {
fn try_get<F: FnOnce(&str) -> Result<String, String>>(exe: String, output_to_version_string: F) -> Result<Self, String> {
let cmd = format!("{exe} --version");
let output = std::process::Command::new(exe)
.arg("--version")
.output()
.map_err(|e| format!("Failed to execute `{cmd}`: {e}",))?;
let output_utf8 = String::from_utf8(output.stdout).map_err(|e| {
let output_lossy = String::from_utf8_lossy(e.as_bytes());
format!("Command `{cmd}` returned invalid UTF-8('{output_lossy}'): {e}")
})?;
if output.status.success() {
let version_string = output_to_version_string(&output_utf8)?;
Ok(version_string.parse::<Self>()?)
} else {
Err(format!("Failed to get version by command `{cmd}`: {output_utf8}"))
}
}
fn build_error_message(version: &Self, expected: &Self) -> Option<String> {
match version.compare_major_version(expected) {
cmp::Ordering::Equal => None,
cmp::Ordering::Greater => Some(format!("version({version}) is greater than version({expected})")),
cmp::Ordering::Less => Some(format!("version({version}) is lesser than version({expected})")),
}
}
fn compare_ext(&self, expected_version: &Self) -> Result<cmp::Ordering, String> {
match env::var(ENV_BUILD_PROTOS) {
Ok(build_protos) => {
if build_protos.is_empty() || build_protos == "0" {
Ok(self.compare_major_version(expected_version))
} else {
match self.compare_major_version(expected_version) {
cmp::Ordering::Equal => Ok(cmp::Ordering::Equal),
_ => Err(Self::build_error_message(self, expected_version).unwrap()),
}
}
}
Err(_) => Ok(self.compare_major_version(expected_version)),
}
}
fn compare_major_version(&self, other: &Self) -> cmp::Ordering {
self.0.cmp(&other.0)
}
}
impl std::str::FromStr for Version {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut version = [0_u32; 3];
for (i, v) in s.split('.').take(3).enumerate() {
version[i] = v.parse().map_err(|e| format!("Failed to parse version string '{s}': {e}"))?;
}
Ok(Version(version[0], version[1], version[2]))
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.0, self.1, self.2)
}
}
/// Run command `protoc --version` to get the version of flatc.
///
/// ```ignore
/// $ protoc --version
/// libprotoc 27.0
/// ```
fn protobuf_compiler_version() -> Result<Version, String> {
Version::try_get("protoc".to_string(), |output| {
const PREFIX_OF_VERSION: &str = "libprotoc ";
let output = output.trim();
if let Some(version) = output.strip_prefix(PREFIX_OF_VERSION) {
Ok(version.to_string())
} else {
Err(format!("Failed to get protoc version: {output}"))
}
})
}

View File

@@ -0,0 +1 @@
pub mod models;

View File

@@ -0,0 +1,124 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

6
common/protos/src/lib.rs Normal file
View File

@@ -0,0 +1,6 @@
#![allow(unused_imports)]
#![allow(clippy::all)]
pub mod proto_gen;
mod flatbuffers_generated;
pub use flatbuffers_generated::models::*;

View File

@@ -0,0 +1,5 @@
namespace models;
table PingBody {
payload: [ubyte];
}

View File

@@ -0,0 +1,19 @@
syntax = "proto3";
package node_service;
/* -------------------------------------------------------------------- */
message PingRequest {
uint64 version = 1;
bytes body = 2;
}
message PingResponse {
uint64 version = 1;
bytes body = 2;
}
/* -------------------------------------------------------------------- */
service NodeService {
rpc Ping(PingRequest) returns (PingResponse) {};
}

View File

@@ -0,0 +1 @@
pub mod node_service;

View File

@@ -0,0 +1,250 @@
// This file is @generated by prost-build.
/// --------------------------------------------------------------------
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PingRequest {
#[prost(uint64, tag = "1")]
pub version: u64,
#[prost(bytes = "vec", tag = "2")]
pub body: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PingResponse {
#[prost(uint64, tag = "1")]
pub version: u64,
#[prost(bytes = "vec", tag = "2")]
pub body: ::prost::alloc::vec::Vec<u8>,
}
/// Generated client implementations.
pub mod node_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
#[derive(Debug, Clone)]
pub struct NodeServiceClient<T> {
inner: tonic::client::Grpc<T>,
}
impl NodeServiceClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> NodeServiceClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> NodeServiceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody>,
>,
<T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error: Into<StdError> + Send + Sync,
{
NodeServiceClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn ping(
&mut self,
request: impl tonic::IntoRequest<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Ping");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "Ping"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod node_service_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer.
#[async_trait]
pub trait NodeService: Send + Sync + 'static {
async fn ping(
&self,
request: tonic::Request<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct NodeServiceServer<T: NodeService> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T: NodeService> NodeServiceServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for NodeServiceServer<T>
where
T: NodeService,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() {
"/node_service.NodeService/Ping" => {
#[allow(non_camel_case_types)]
struct PingSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::PingRequest> for PingSvc<T> {
type Response = super::PingResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::PingRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::ping(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = PingSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE)
.body(empty_body())
.unwrap())
}),
}
}
}
impl<T: NodeService> Clone for NodeServiceServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: NodeService> tonic::server::NamedService for NodeServiceServer<T> {
const NAME: &'static str = "node_service.NodeService";
}
}

15
e2e_test/Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "e2e_test"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flatbuffers.workspace = true
protos.workspace = true
tonic = { version = "0.12.1", features = ["gzip"] }
tokio = { workspace = true }

0
e2e_test/README.md Normal file
View File

1
e2e_test/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
mod reliant;

View File

@@ -0,0 +1 @@
The test cases in this dir need to run the cluster

View File

@@ -0,0 +1 @@
mod node_interact_test;

View File

@@ -0,0 +1,46 @@
#![cfg(test)]
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{node_service_client::NodeServiceClient, PingRequest, PingResponse},
};
use std::error::Error;
use tonic::Request;
#[tokio::test]
async fn main() -> Result<(), Box<dyn Error>> {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello world");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
// 创建客户端
let mut client = NodeServiceClient::connect("http://localhost:9000").await?;
// 构造 PingRequest
let request = Request::new(PingRequest {
version: 1,
body: finished_data.to_vec(),
});
// 发送请求并获取响应
let response: PingResponse = client.ping(request).await?.into_inner();
// 打印响应
let ping_response_body = flatbuffers::root::<PingBody>(&response.body);
if let Err(e) = ping_response_body {
eprintln!("{}", e);
} else {
println!("ping_resp:body(flatbuffer): {:?}", ping_response_body);
}
Ok(())
}

View File

@@ -10,6 +10,24 @@ rust-version.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
ecstore.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
mime.workspace = true
netif.workspace = true
pin-project-lite.workspace = true
prost.workspace = true
prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
s3s.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
tokio = { workspace = true, features = [
@@ -18,22 +36,13 @@ tokio = { workspace = true, features = [
"net",
"signal",
] }
tonic = { version = "0.12.1", features = ["gzip"] }
tonic-reflection.workspace = true
tower.workspace = true
tracing-error.workspace = true
http.workspace = true
bytes.workspace = true
futures.workspace = true
futures-util.workspace = true
tracing-subscriber.workspace = true
transform-stream.workspace = true
ecstore = { path = "../ecstore" }
s3s = "0.10.0"
clap = { version = "4.5.7", features = ["derive"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
hyper-util = { version = "0.1.5", features = [
"tokio",
"server-auto",
"server-graceful",
] }
mime = "0.3.17"
transform-stream = "0.3.0"
netif = "0.1.6"
# pin-utils = "0.1.0"
[build-dependencies]
prost-build.workspace = true
tonic-build.workspace = true

47
rustfs/src/grpc.rs Normal file
View File

@@ -0,0 +1,47 @@
use tonic::{Request, Response, Status};
use tracing::{debug, error, info};
use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
PingRequest, PingResponse,
},
};
#[derive(Debug)]
struct NodeService {}
pub fn make_server() -> NodeServer<impl Node> {
NodeServer::new(NodeService {})
}
#[tonic::async_trait]
impl Node for NodeService {
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
debug!("PING");
let ping_req = request.into_inner();
let ping_body = flatbuffers::root::<PingBody>(&ping_req.body);
if let Err(e) = ping_body {
error!("{}", e);
} else {
info!("ping_req:body(flatbuffer): {:?}", ping_body);
}
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let payload = fbb.create_vector(b"hello, caller");
let mut builder = PingBodyBuilder::new(&mut fbb);
builder.add_payload(payload);
let root = builder.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();
Ok(tonic::Response::new(PingResponse {
version: 1,
body: finished_data.to_vec(),
}))
}
}

View File

@@ -1,13 +1,18 @@
mod config;
mod grpc;
mod service;
mod storage;
use clap::Parser;
use ecstore::error::Result;
use grpc::make_server;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
service::TowerToHyperService,
};
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use service::hybrid;
use std::{io::IsTerminal, net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use tracing::{debug, info};
@@ -96,6 +101,8 @@ async fn run(opt: config::Opt) -> Result<()> {
let hyper_service = service.into_shared();
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, make_server()));
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
@@ -119,7 +126,7 @@ async fn run(opt: config::Opt) -> Result<()> {
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;

150
rustfs/src/service.rs Normal file
View File

@@ -0,0 +1,150 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Future;
use http_body::Frame;
use hyper::body::Incoming;
use hyper::{Request, Response};
use pin_project_lite::pin_project;
use tower::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Generate a [`HybridService`]
pub(crate) fn hybrid<MakeRest, Grpc>(make_rest: MakeRest, grpc: Grpc) -> HybridService<MakeRest, Grpc> {
HybridService { rest: make_rest, grpc }
}
/// The service that can serve both gRPC and REST HTTP Requests
#[derive(Clone)]
pub struct HybridService<Rest, Grpc> {
rest: Rest,
grpc: Grpc,
}
impl<Rest, Grpc, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc>
where
Rest: Service<Request<Incoming>, Response = Response<RestBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Rest::Error: Into<BoxError>,
Grpc::Error: Into<BoxError>,
{
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Error = BoxError;
type Future = HybridFuture<Rest::Future, Grpc::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.rest.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
}
}
/// When calling the service, gRPC is served if the HTTP request version is HTTP/2
/// and if the Content-Type is "application/grpc"; otherwise, the request is served
/// as a REST request
fn call(&mut self, req: Request<Incoming>) -> Self::Future {
match (req.version(), req.headers().get(hyper::header::CONTENT_TYPE)) {
(hyper::Version::HTTP_2, Some(hv)) if hv.as_bytes().starts_with(b"application/grpc") => HybridFuture::Grpc {
grpc_future: self.grpc.call(req),
},
_ => HybridFuture::Rest {
rest_future: self.rest.call(req),
},
}
}
}
pin_project! {
/// A hybrid HTTP body that will be used in the response type for the
/// [`HybridFuture`], i.e., the output of the [`HybridService`]
#[project = HybridBodyProj]
pub enum HybridBody<RestBody, GrpcBody> {
Rest {
#[pin]
rest_body: RestBody
},
Grpc {
#[pin]
grpc_body: GrpcBody
},
}
}
impl<RestBody, GrpcBody> http_body::Body for HybridBody<RestBody, GrpcBody>
where
RestBody: http_body::Body + Send + Unpin,
GrpcBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
RestBody::Error: Into<BoxError>,
GrpcBody::Error: Into<BoxError>,
{
type Data = RestBody::Data;
type Error = BoxError;
fn is_end_stream(&self) -> bool {
match self {
Self::Rest { rest_body } => rest_body.is_end_stream(),
Self::Grpc { grpc_body } => grpc_body.is_end_stream(),
}
}
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into),
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self {
Self::Rest { rest_body } => rest_body.size_hint(),
Self::Grpc { grpc_body } => grpc_body.size_hint(),
}
}
}
pin_project! {
/// A future that accepts an HTTP request as input and returns an HTTP
/// response as output for the [`HybridService`]
#[project = HybridFutureProj]
pub enum HybridFuture<RestFuture, GrpcFuture> {
Rest {
#[pin]
rest_future: RestFuture,
},
Grpc {
#[pin]
grpc_future: GrpcFuture,
}
}
}
impl<RestFuture, GrpcFuture, RestBody, GrpcBody, RestError, GrpcError> Future for HybridFuture<RestFuture, GrpcFuture>
where
RestFuture: Future<Output = Result<Response<RestBody>, RestError>>,
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
RestError: Into<BoxError>,
GrpcError: Into<BoxError>,
{
type Output = Result<Response<HybridBody<RestBody, GrpcBody>>, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
HybridFutureProj::Rest { rest_future } => match rest_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|rest_body| HybridBody::Rest { rest_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
HybridFutureProj::Grpc { grpc_future } => match grpc_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|grpc_body| HybridBody::Grpc { grpc_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
}
}
}