Merge branch 'main' into quorum

This commit is contained in:
weisd
2024-09-25 16:55:20 +08:00
5 changed files with 158 additions and 32 deletions

85
.github/workflows/e2e.yml vendored Normal file
View File

@@ -0,0 +1,85 @@
name: e2e
on:
push:
pull_request:
branches: [ "main" ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
timeout-minutes: 10
runs-on: ubuntu-latest
strategy:
matrix:
rust:
- stable
- nightly
steps:
- name: cache protoc bin
id: cache-protoc-action
uses: actions/cache@v3
env:
cache-name: cache-protoc-action-bin
with:
path: /usr/local/bin/protoc
key: ${{ runner.os }}-build-${{ env.cache-name }}-v0.0.1
- name: install protoc
if: steps.cache-protoc-action.outputs.cache-hit != 'true'
run: |
wget https://github.com/protocolbuffers/protobuf/releases/download/v27.0/protoc-27.0-linux-x86_64.zip
unzip protoc-27.0-linux-x86_64.zip -d protoc3
mv protoc3/bin/* /usr/local/bin/
chmod +x /usr/local/bin/protoc
rm -rf protoc-27.0-linux-x86_64.zip protoc3
- name: print protoc version
run: protoc --version
- name: cache flatc bin
id: cache-flatc-action
uses: actions/cache@v3
env:
cache-name: cache-flatc-action-bin
with:
path: /usr/local/bin/flatc
key: ${{ runner.os }}-build-${{ env.cache-name }}-v0.0.1
- name: install flatc
if: steps.cache-flatc-action.outputs.cache-hit != 'true'
run: |
wget https://github.com/google/flatbuffers/releases/download/v24.3.25/Linux.flatc.binary.g++-13.zip
unzip Linux.flatc.binary.g++-13.zip
mv flatc /usr/local/bin/
chmod +x /usr/local/bin/flatc
rm -rf Linux.flatc.binary.g++-13.zip
- name: toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}
# components: rustfmt, clippy
- name: checkout
uses: actions/checkout@v2
- name: run fs
working-directory: .
run: |
make e2e-server > /dev/null &
make probe-e2e
- name: e2e test
run: cargo test -p e2e_test --lib
- name: cache cargo
uses: actions/cache@v3
env:
cache-name: cache-cargo
with:
path: ~/.cargo
key: ${{ runner.os }}-build-${{ env.cache-name }}-v0.0.1

View File

@@ -16,7 +16,6 @@ jobs:
matrix:
rust:
- stable
- beta
- nightly
steps:
@@ -59,20 +58,24 @@ jobs:
chmod +x /usr/local/bin/flatc
rm -rf Linux.flatc.binary.g++-13.zip
- name: toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}
# components: rustfmt, clippy
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ matrix.rust }}
override: true
components: rustfmt, clippy
- name: cargo build
run: cargo build
- uses: actions-rs/cargo@v1
with:
command: build
- name: cargo test
run: cargo test --all --exclude e2e_test
- uses: actions-rs/cargo@v1
- name: cache cargo
uses: actions/cache@v3
env:
cache-name: cache-cargo
with:
command: test
args: --all
path: ~/.cargo
key: ${{ runner.os }}-build-${{ env.cache-name }}-v0.0.1

View File

@@ -21,3 +21,11 @@ start:
.PHONY: stop
stop:
$(DOCKER_CLI) stop $(CONTAINER_NAME)
.PHONY: e2e-server
e2e-server:
sh $(shell pwd)/scripts/run.sh
.PHONY: probe-e2e
probe-e2e:
sh $(shell pwd)/scripts/probe.sh

View File

@@ -281,31 +281,46 @@ impl StorageAPI for Sets {
}
}
let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
let mut jhs = Vec::with_capacity(semaphore.available_permits());
// let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
// let mut jhs = Vec::with_capacity(semaphore.available_permits());
// for (k, v) in set_obj_map {
// let disks = self.get_disks(k);
// let semaphore = semaphore.clone();
// let opts = opts.clone();
// let bucket = bucket.to_string();
// let jh = tokio::spawn(async move {
// let _permit = semaphore.acquire().await.unwrap();
// let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
// disks.delete_objects(&bucket, objs, opts).await
// });
// jhs.push(jh);
// }
// let mut results = Vec::with_capacity(jhs.len());
// for jh in jhs {
// results.push(jh.await?.unwrap());
// }
// for (dobjects, errs) in results {
// del_objects.extend(dobjects);
// del_errs.extend(errs);
// }
// TODO: 并发
for (k, v) in set_obj_map {
let disks = self.get_disks(k);
let semaphore = semaphore.clone();
let opts = opts.clone();
let bucket = bucket.to_string();
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
let jh = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
disks.delete_objects(&bucket, objs, opts).await
});
jhs.push(jh);
}
for (i, err) in errs.into_iter().enumerate() {
let obj = v.get(i).unwrap();
let mut results = Vec::with_capacity(jhs.len());
for jh in jhs {
results.push(jh.await?.unwrap());
}
del_errs[obj.orig_idx] = err;
for (dobjects, errs) in results {
del_objects.extend(dobjects);
del_errs.extend(errs);
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
}
}
Ok((del_objects, del_errs))

15
scripts/probe.sh Normal file
View File

@@ -0,0 +1,15 @@
#!/bin/sh
# Please use POSIX Shell
# https://www.grymoire.com/Unix/Sh.html
IP=127.0.0.1
PORT=9000
while true; do
nc -zv ${IP} ${PORT}
if [ "$?" -eq "0" ]; then
exit 0
fi
sleep 2
done