JMAP Registry API implementation - part 3

This commit is contained in:
mdecimus
2026-02-25 19:07:06 +01:00
parent c3586d9af6
commit f16e737221
47 changed files with 1775 additions and 1043 deletions

1
Cargo.lock generated
View File

@@ -6018,6 +6018,7 @@ dependencies = [
"ahash",
"hashify",
"jmap-tools",
"mail-auth 0.8.0",
"serde",
"serde_json",
"trc",

View File

@@ -124,7 +124,6 @@ pub const KV_QUOTA_BLOB: u8 = 11;
pub const KV_GREYLIST: u8 = 16;
pub const KV_LOCK_PURGE_ACCOUNT: u8 = 20;
pub const KV_LOCK_QUEUE_MESSAGE: u8 = 21;
pub const KV_LOCK_QUEUE_REPORT: u8 = 22;
pub const KV_LOCK_TASK: u8 = 23;
pub const KV_LOCK_HOUSEKEEPER: u8 = 24;
pub const KV_LOCK_DAV: u8 = 25;

View File

@@ -313,7 +313,12 @@ impl Family {
SUBSPACE_PROPERTY,
],
Family::Blob => &[SUBSPACE_BLOBS, SUBSPACE_BLOB_LINK],
Family::Registry => &[SUBSPACE_REGISTRY],
Family::Registry => &[
SUBSPACE_REGISTRY,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
SUBSPACE_DIRECTORY,
],
Family::Changelog => &[SUBSPACE_LOGS],
Family::Queue => &[SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUEUE_EVENT],
Family::Report => &[SUBSPACE_REPORT_OUT, SUBSPACE_REPORT_IN],

View File

@@ -9,7 +9,10 @@ use base64::engine::general_purpose;
use std::env;
use std::io::{self, Write};
use store::write::{AnyClass, AnyKey, BatchBuilder, ValueClass};
use store::{Deserialize, IterateParams, SUBSPACE_INDEXES, Store};
use store::{
Deserialize, IterateParams, SUBSPACE_INDEXES, SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL, Store,
};
const HELP: &str = concat!(
"Stalwart Server v",
@@ -72,7 +75,14 @@ pub async fn store_console(store: Store) {
key: to_key.collect::<Vec<_>>(),
},
)
.set_values(![SUBSPACE_INDEXES].contains(&from_subspace)),
.set_values(
![
SUBSPACE_INDEXES,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
]
.contains(&from_subspace),
),
|key, value| {
print!("{}", char::from(from_subspace));
print_escaped(key);

View File

@@ -14,7 +14,8 @@ use std::{
path::{Path, PathBuf},
};
use store::{
BlobStore, SUBSPACE_BLOBS, SUBSPACE_COUNTER, SUBSPACE_INDEXES, SUBSPACE_QUOTA, Store, U32_LEN,
BlobStore, SUBSPACE_BLOBS, SUBSPACE_COUNTER, SUBSPACE_INDEXES, SUBSPACE_QUOTA,
SUBSPACE_REGISTRY_IDX, SUBSPACE_REGISTRY_IDX_GLOBAL, Store, U32_LEN,
write::{AnyClass, BatchBuilder, ValueClass, key::DeserializeBigEndian},
};
use types::{collection::Collection, field::Field};
@@ -84,7 +85,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
}
}
}
SUBSPACE_INDEXES => {
SUBSPACE_INDEXES | SUBSPACE_REGISTRY_IDX | SUBSPACE_REGISTRY_IDX_GLOBAL => {
while let Some((key, _)) = reader.next() {
let account_id = key
.as_slice()

View File

@@ -267,9 +267,9 @@ impl RegistryGet for Server {
ObjectType::Log => {}
ObjectType::QueuedMessage => {}
ObjectType::Task => {}
ObjectType::ArfFeedbackReport => {}
ObjectType::DmarcReport => {}
ObjectType::TlsReport => {}
ObjectType::ArfExternalReport => {}
ObjectType::DmarcExternalReport => {}
ObjectType::TlsExternalReport => {}
ObjectType::DeletedItem => {}
ObjectType::Metric => {}
ObjectType::Trace => {}

View File

@@ -140,9 +140,9 @@ impl RegistrySet for Server {
ObjectType::Task => {}
// Move to registry?
ObjectType::ArfFeedbackReport => {}
ObjectType::DmarcReport => {}
ObjectType::TlsReport => {}
ObjectType::ArfExternalReport => {}
ObjectType::DmarcExternalReport => {}
ObjectType::TlsExternalReport => {}
ObjectType::DeletedItem => {}
ObjectType::Metric => {}
ObjectType::Trace => {}

View File

@@ -41,7 +41,7 @@ jemallocator = "0.5.0"
[features]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "azure", "nats", "enterprise"]
default = ["rocks", "enterprise"]
default = ["rocks", "enterprise", "sqlite"]
sqlite = ["store/sqlite", "directory/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres", "directory/postgres"]

View File

@@ -13,6 +13,7 @@ hashify = "0.2.7"
ahash = { version = "0.8" }
jmap-tools = { version = "0.1" }
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
mail-auth = { path = "/Users/me/code/mail-auth" }
[features]
test_mode = []

View File

@@ -8,3 +8,4 @@ pub mod jmap;
pub mod pickle;
pub mod schema;
pub mod types;
pub mod utils;

View File

@@ -4,7 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use utils::map::vec_map::VecMap;
use utils::{
codec::leb128::{Leb128_, Leb128Reader, Leb128Writer},
map::vec_map::VecMap,
};
use crate::types::EnumImpl;
use std::collections::HashMap;
@@ -30,15 +33,23 @@ impl<'x> PickledStream<'x> {
}
pub fn read(&mut self) -> Option<u8> {
let byte = *self.data.get(self.pos)?;
self.pos += 1;
Some(byte)
self.data.get(self.pos).copied().inspect(|_| self.pos += 1)
}
pub fn read_leb128<T: Leb128_>(&mut self) -> Option<T> {
self.data
.get(self.pos..)
.and_then(|bytes| bytes.read_leb128())
.map(|(value, read_bytes)| {
self.pos += read_bytes;
value
})
}
pub fn read_bytes(&mut self, len: usize) -> Option<&'x [u8]> {
let bytes = self.data.get(self.pos..self.pos + len)?;
self.pos += len;
Some(bytes)
self.data.get(self.pos..self.pos + len).inspect(|_| {
self.pos += len;
})
}
pub fn eof(&self) -> bool {
@@ -48,53 +59,49 @@ impl<'x> PickledStream<'x> {
pub fn bytes(&self) -> &'x [u8] {
self.data
}
pub fn assert_version(&mut self, expected: u8) -> Option<u8> {
self.read().filter(|&version| version == expected)
}
}
impl Pickle for u16 {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.to_be_bytes());
let _ = out.write_leb128(*self);
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; std::mem::size_of::<u16>()];
arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<u16>())?);
Some(u16::from_be_bytes(arr))
stream.read_leb128()
}
}
impl Pickle for u64 {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.to_be_bytes());
let _ = out.write_leb128(*self);
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; std::mem::size_of::<u64>()];
arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<u64>())?);
Some(u64::from_be_bytes(arr))
stream.read_leb128()
}
}
impl Pickle for u32 {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.to_be_bytes());
let _ = out.write_leb128(*self);
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; std::mem::size_of::<u32>()];
arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<u32>())?);
Some(u32::from_be_bytes(arr))
stream.read_leb128()
}
}
impl Pickle for i64 {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.to_be_bytes());
let _ = out.write_leb128(*self as u64);
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; std::mem::size_of::<i64>()];
arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<i64>())?);
Some(i64::from_be_bytes(arr))
stream.read_leb128::<u64>().map(|v| v as i64)
}
}
@@ -126,27 +133,24 @@ impl Pickle for bool {
impl Pickle for String {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&(self.len() as u32).to_be_bytes());
(self.len() as u32).pickle(out);
out.extend_from_slice(self.as_bytes());
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut len_arr = [0u8; std::mem::size_of::<u32>()];
len_arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<u32>())?);
let bytes = stream.read_bytes(u32::from_be_bytes(len_arr) as usize)?;
String::from_utf8(bytes.to_vec()).ok()
u32::unpickle(stream)
.and_then(|len| stream.read_bytes(len as usize))
.and_then(|bytes| String::from_utf8(bytes.to_vec()).ok())
}
}
impl<T: EnumImpl> Pickle for T {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.to_id().to_be_bytes());
self.to_id().pickle(out);
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut id_arr = [0u8; std::mem::size_of::<u16>()];
id_arr.copy_from_slice(stream.read_bytes(std::mem::size_of::<u16>())?);
Self::from_id(u16::from_be_bytes(id_arr))
u16::unpickle(stream).and_then(Self::from_id)
}
}
@@ -180,16 +184,14 @@ where
T: Pickle,
{
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&(self.len() as u32).to_be_bytes());
(self.len() as u32).pickle(out);
for item in self {
item.pickle(out);
}
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut len_arr = [0u8; 4];
len_arr.copy_from_slice(stream.read_bytes(4)?);
let len = u32::from_be_bytes(len_arr) as usize;
let len = u32::unpickle(stream)? as usize;
let mut vec = Vec::with_capacity(len);
for _ in 0..len {
vec.push(T::unpickle(stream)?);
@@ -205,7 +207,7 @@ where
S: std::hash::BuildHasher + Default,
{
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&(self.len() as u32).to_be_bytes());
(self.len() as u32).pickle(out);
for (key, value) in self {
key.pickle(out);
value.pickle(out);
@@ -213,9 +215,7 @@ where
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut len_arr = [0u8; 4];
len_arr.copy_from_slice(stream.read_bytes(4)?);
let len = u32::from_be_bytes(len_arr) as usize;
let len = u32::unpickle(stream)? as usize;
let mut map = HashMap::with_capacity_and_hasher(len, S::default());
for _ in 0..len {
let key = K::unpickle(stream)?;
@@ -232,7 +232,7 @@ where
V: Pickle,
{
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&(self.len() as u32).to_be_bytes());
(self.len() as u32).pickle(out);
for (key, value) in self {
key.pickle(out);
value.pickle(out);
@@ -240,9 +240,7 @@ where
}
fn unpickle(stream: &mut PickledStream<'_>) -> Option<Self> {
let mut len_arr = [0u8; 4];
len_arr.copy_from_slice(stream.read_bytes(4)?);
let len = u32::from_be_bytes(len_arr) as usize;
let len = u32::unpickle(stream)? as usize;
let mut map = VecMap::with_capacity(len);
for _ in 0..len {
let key = K::unpickle(stream)?;

View File

@@ -5,24 +5,14 @@
*/
use crate::{
pickle::Pickle,
schema::{
enums::{TracingLevel, TracingLevelOpt},
prelude::{
Account, Duration, GroupAccount, HttpAuth, NodeRange, Object, ObjectInner, Property,
Task, TaskStatus, TaskStatusPending, UTCDateTime, UserAccount,
},
prelude::{NodeRange, Object, ObjectInner, Property},
},
types::EnumImpl,
};
use std::{cmp::Ordering, fmt::Display};
use trc::TOTAL_EVENT_COUNT;
use utils::{
Client, HeaderMap,
cron::SimpleCron,
http::{build_http_client, build_http_headers},
map::vec_map::VecMap,
};
#[allow(clippy::derivable_impls)]
pub mod enums;
@@ -38,174 +28,11 @@ pub mod structs;
#[allow(clippy::derivable_impls)]
pub mod structs_impl;
impl From<prelude::Cron> for SimpleCron {
fn from(value: prelude::Cron) -> Self {
match value {
prelude::Cron::Daily(cron) => SimpleCron::Day {
hour: cron.hour as u32,
minute: cron.minute as u32,
},
prelude::Cron::Weekly(cron) => SimpleCron::Week {
day: cron.day as u32,
hour: cron.hour as u32,
minute: cron.minute as u32,
},
prelude::Cron::Hourly(cron) => SimpleCron::Hour {
minute: cron.minute as u32,
},
}
}
}
impl NodeRange {
pub fn contains(&self, node_id: u64) -> bool {
node_id >= self.from_node_id && node_id <= self.to_node_id
}
}
impl Account {
pub fn into_user(self) -> Option<UserAccount> {
if let Account::User(user) = self {
Some(user)
} else {
None
}
}
pub fn into_group(self) -> Option<GroupAccount> {
if let Account::Group(group) = self {
Some(group)
} else {
None
}
}
}
impl HttpAuth {
pub fn build_headers(
&self,
extra_headers: VecMap<String, String>,
content_type: Option<&str>,
) -> Result<HeaderMap, String> {
match self {
HttpAuth::Unauthenticated => {
build_http_headers(extra_headers, None, None, None, content_type)
}
HttpAuth::Basic(auth) => build_http_headers(
extra_headers,
auth.username.as_str().into(),
auth.secret.as_str().into(),
None,
content_type,
),
HttpAuth::Bearer(auth) => build_http_headers(
extra_headers,
None,
None,
auth.bearer_token.as_str().into(),
content_type,
),
}
}
pub fn build_http_client(
&self,
extra_headers: VecMap<String, String>,
content_type: Option<&str>,
timeout: Duration,
allow_invalid_certs: bool,
) -> Result<Client, String> {
match self {
HttpAuth::Unauthenticated => build_http_client(
extra_headers,
None,
None,
None,
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
HttpAuth::Basic(auth) => build_http_client(
extra_headers,
auth.username.as_str().into(),
auth.secret.as_str().into(),
None,
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
HttpAuth::Bearer(auth) => build_http_client(
extra_headers,
None,
None,
auth.bearer_token.as_str().into(),
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
}
}
}
impl Task {
pub fn set_status(&mut self, status: TaskStatus) {
match self {
Task::IndexDocument(task) => task.status = status,
Task::UnindexDocument(task) => task.status = status,
Task::IndexTrace(task) => task.status = status,
Task::CalendarAlarmEmail(task) => task.status = status,
Task::CalendarAlarmNotification(task) => task.status = status,
Task::CalendarItipMessage(task) => task.status = status,
Task::MergeThreads(task) => task.status = status,
}
}
pub fn status(&self) -> &TaskStatus {
match self {
Task::IndexDocument(task) => &task.status,
Task::UnindexDocument(task) => &task.status,
Task::IndexTrace(task) => &task.status,
Task::CalendarAlarmEmail(task) => &task.status,
Task::CalendarAlarmNotification(task) => &task.status,
Task::CalendarItipMessage(task) => &task.status,
Task::MergeThreads(task) => &task.status,
}
}
pub fn attempt_number(&self) -> u64 {
match self.status() {
TaskStatus::Pending(_) => 0,
TaskStatus::Retry(status) => status.attempt_number,
TaskStatus::Failed(status) => status.failed_attempt_number,
}
}
pub fn due_timestamp(&self) -> u64 {
match self.status() {
TaskStatus::Pending(status) => status.due.timestamp() as u64,
TaskStatus::Retry(status) => status.due.timestamp() as u64,
TaskStatus::Failed(_) => u64::MAX,
}
}
}
impl TaskStatus {
pub fn now() -> Self {
let now = UTCDateTime::now();
TaskStatus::Pending(TaskStatusPending {
created_at: now,
due: now,
})
}
pub fn at(timestamp: i64) -> Self {
TaskStatus::Pending(TaskStatusPending {
due: UTCDateTime::from_timestamp(timestamp),
created_at: UTCDateTime::now(),
})
}
}
impl Display for Property {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
@@ -303,13 +130,3 @@ impl Object {
Object { inner, revision: 0 }
}
}
impl Pickle for Object {
fn pickle(&self, out: &mut Vec<u8>) {
Object::pickle(self, out);
}
fn unpickle(stream: &mut crate::pickle::PickledStream<'_>) -> Option<Self> {
Object::unpickle(stream)
}
}

View File

@@ -32,7 +32,7 @@ pub use utils::map::vec_map::VecMap;
#[derive(Debug, Clone)]
pub struct Object {
pub inner: ObjectInner,
pub revision: u32,
pub revision: u64,
}
#[derive(Debug)]

View File

@@ -125,15 +125,11 @@ impl FromStr for Duration {
impl Pickle for Duration {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&(self.0.as_millis() as u64).to_be_bytes());
(self.0.as_millis() as u64).pickle(out);
}
fn unpickle(data: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; 8];
arr.copy_from_slice(data.read_bytes(8)?);
Some(Duration(std::time::Duration::from_millis(
u64::from_be_bytes(arr),
)))
u64::unpickle(data).map(|timestamp| Duration(std::time::Duration::from_millis(timestamp)))
}
}

View File

@@ -68,15 +68,11 @@ impl Default for ObjectId {
impl Pickle for Id {
fn pickle(&self, out: &mut Vec<u8>) {
out.extend_from_slice(&self.id().to_be_bytes());
self.id().pickle(out);
}
fn unpickle(data: &mut PickledStream<'_>) -> Option<Self> {
let mut arr = [0u8; std::mem::size_of::<u64>()];
arr.copy_from_slice(data.read_bytes(std::mem::size_of::<u64>())?);
let id = u64::from_be_bytes(arr);
Some(Id::new(id))
u64::unpickle(data).map(Id::new)
}
}

View File

@@ -75,14 +75,12 @@ impl AsRef<std::net::SocketAddr> for SocketAddr {
impl Pickle for SocketAddr {
fn pickle(&self, out: &mut Vec<u8>) {
self.0.ip().pickle(out);
out.extend_from_slice(&self.0.port().to_be_bytes());
self.0.port().pickle(out);
}
fn unpickle(data: &mut PickledStream<'_>) -> Option<Self> {
let ip = std::net::IpAddr::unpickle(data)?;
let mut port_bytes = [0u8; 2];
port_bytes.copy_from_slice(data.read_bytes(2)?);
let port = u16::from_be_bytes(port_bytes);
let port = u16::unpickle(data)?;
Some(SocketAddr(std::net::SocketAddr::new(ip, port)))
}
}

View File

@@ -0,0 +1,25 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::schema::prelude::{Account, GroupAccount, UserAccount};
impl Account {
pub fn into_user(self) -> Option<UserAccount> {
if let Account::User(user) = self {
Some(user)
} else {
None
}
}
pub fn into_group(self) -> Option<GroupAccount> {
if let Account::Group(group) = self {
Some(group)
} else {
None
}
}
}

View File

@@ -0,0 +1,27 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::schema::prelude::Cron;
use utils::cron::SimpleCron;
impl From<Cron> for SimpleCron {
fn from(value: Cron) -> Self {
match value {
Cron::Daily(cron) => SimpleCron::Day {
hour: cron.hour as u32,
minute: cron.minute as u32,
},
Cron::Weekly(cron) => SimpleCron::Week {
day: cron.day as u32,
hour: cron.hour as u32,
minute: cron.minute as u32,
},
Cron::Hourly(cron) => SimpleCron::Hour {
minute: cron.minute as u32,
},
}
}
}

View File

@@ -0,0 +1,78 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::schema::prelude::{Duration, HttpAuth};
use utils::{
Client, HeaderMap,
http::{build_http_client, build_http_headers},
map::vec_map::VecMap,
};
impl HttpAuth {
pub fn build_headers(
&self,
extra_headers: VecMap<String, String>,
content_type: Option<&str>,
) -> Result<HeaderMap, String> {
match self {
HttpAuth::Unauthenticated => {
build_http_headers(extra_headers, None, None, None, content_type)
}
HttpAuth::Basic(auth) => build_http_headers(
extra_headers,
auth.username.as_str().into(),
auth.secret.as_str().into(),
None,
content_type,
),
HttpAuth::Bearer(auth) => build_http_headers(
extra_headers,
None,
None,
auth.bearer_token.as_str().into(),
content_type,
),
}
}
pub fn build_http_client(
&self,
extra_headers: VecMap<String, String>,
content_type: Option<&str>,
timeout: Duration,
allow_invalid_certs: bool,
) -> Result<Client, String> {
match self {
HttpAuth::Unauthenticated => build_http_client(
extra_headers,
None,
None,
None,
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
HttpAuth::Basic(auth) => build_http_client(
extra_headers,
auth.username.as_str().into(),
auth.secret.as_str().into(),
None,
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
HttpAuth::Bearer(auth) => build_http_client(
extra_headers,
None,
None,
auth.bearer_token.as_str().into(),
content_type,
timeout.into_inner(),
allow_invalid_certs,
),
}
}
}

View File

@@ -0,0 +1,11 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
pub mod account;
pub mod cron;
pub mod http;
pub mod report;
pub mod task;

View File

@@ -0,0 +1,971 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
schema::{
enums,
prelude::{Property, UTCDateTime},
structs,
},
types::{index::IndexBuilder, ipaddr::IpAddr},
};
use mail_auth::report::{tlsrpt::*, *};
use std::borrow::Cow;
use types::id::Id;
pub trait ReportIndex {
fn text(&self) -> impl Iterator<Item = &str>;
fn tenant_ids(&self) -> &[Id];
fn expires_at(&self) -> u64;
fn domains(&self) -> impl Iterator<Item = &str>;
fn build_search_index<'x>(&'x self, index: &mut IndexBuilder<'x>) {
for text in self.text() {
index.text(Property::Domain, text);
}
for tenant_id in self.tenant_ids() {
index.search(Property::MemberTenantId, tenant_id.id());
}
index.search(Property::ExpiresAt, self.expires_at());
}
}
impl ReportIndex for structs::ArfExternalReport {
fn domains(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
report
.reported_domains
.iter()
.filter_map(|s| non_empty(s))
.chain(
[
report.dkim_domain.as_deref(),
report.original_mail_from.as_deref(),
]
.into_iter()
.flatten()
.filter_map(non_empty),
)
.chain(self.to.iter().filter_map(|s| non_empty(s)))
.map(|domain| domain.rsplit_once('@').map(|(_, d)| d).unwrap_or(domain))
}
fn text(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
report
.reported_domains
.iter()
.filter_map(|s| non_empty(s))
.chain(
[
report.dkim_domain.as_deref(),
report.reporting_mta.as_deref(),
report.original_mail_from.as_deref(),
report.original_rcpt_to.as_deref(),
]
.into_iter()
.flatten()
.filter_map(non_empty),
)
.chain(non_empty(&self.from))
}
fn tenant_ids(&self) -> &[Id] {
&self.member_tenant_id
}
fn expires_at(&self) -> u64 {
self.expires_at.timestamp() as u64
}
}
impl ReportIndex for structs::DmarcExternalReport {
fn domains(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
non_empty(&report.policy_domain)
.into_iter()
.filter_map(non_empty)
.chain(report.records.iter().flat_map(|r| {
non_empty(&r.envelope_from)
.into_iter()
.filter_map(non_empty)
.chain(non_empty(&r.header_from))
.chain(r.dkim_results.iter().filter_map(|d| non_empty(&d.domain)))
.chain(r.spf_results.iter().filter_map(|s| non_empty(&s.domain)))
}))
.chain(self.to.iter().filter_map(|s| non_empty(s)))
.map(|domain| domain.rsplit_once('@').map(|(_, d)| d).unwrap_or(domain))
}
fn text(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
non_empty(&report.email)
.into_iter()
.filter_map(non_empty)
.chain(non_empty(&report.policy_domain))
.chain(report.records.iter().flat_map(|r| {
r.envelope_to
.as_deref()
.into_iter()
.filter_map(non_empty)
.chain(non_empty(&r.envelope_from))
.chain(non_empty(&r.header_from))
.chain(r.dkim_results.iter().filter_map(|d| non_empty(&d.domain)))
.chain(r.spf_results.iter().filter_map(|s| non_empty(&s.domain)))
}))
.chain(non_empty(&self.from))
}
fn tenant_ids(&self) -> &[Id] {
&self.member_tenant_id
}
fn expires_at(&self) -> u64 {
self.expires_at.timestamp() as u64
}
}
impl ReportIndex for structs::TlsExternalReport {
fn domains(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
report
.policies
.iter()
.flat_map(|p| {
non_empty(&p.policy_domain)
.into_iter()
.chain(p.mx_hosts.iter().filter_map(|s| non_empty(s)))
})
.chain(self.to.iter().filter_map(|s| non_empty(s)))
.map(|domain| domain.rsplit_once('@').map(|(_, d)| d).unwrap_or(domain))
}
fn text(&self) -> impl Iterator<Item = &str> {
let report = &self.report;
report
.policies
.iter()
.flat_map(|p| {
non_empty(&p.policy_domain)
.into_iter()
.chain(p.mx_hosts.iter().filter_map(|s| non_empty(s)))
.chain(p.failure_details.iter().flat_map(|fd| {
non_empty_opt(&fd.receiving_mx_hostname)
.into_iter()
.chain(non_empty_opt(&fd.receiving_mx_helo))
}))
})
.chain(non_empty(&self.from))
}
fn tenant_ids(&self) -> &[Id] {
&self.member_tenant_id
}
fn expires_at(&self) -> u64 {
self.expires_at.timestamp() as u64
}
}
impl From<enums::DmarcAlignment> for Alignment {
fn from(value: enums::DmarcAlignment) -> Self {
match value {
enums::DmarcAlignment::Relaxed => Alignment::Relaxed,
enums::DmarcAlignment::Strict => Alignment::Strict,
enums::DmarcAlignment::Unspecified => Alignment::Unspecified,
}
}
}
impl From<Alignment> for enums::DmarcAlignment {
fn from(value: Alignment) -> Self {
match value {
Alignment::Relaxed => enums::DmarcAlignment::Relaxed,
Alignment::Strict => enums::DmarcAlignment::Strict,
Alignment::Unspecified => enums::DmarcAlignment::Unspecified,
}
}
}
impl From<enums::DmarcDisposition> for Disposition {
fn from(value: enums::DmarcDisposition) -> Self {
match value {
enums::DmarcDisposition::None => Disposition::None,
enums::DmarcDisposition::Quarantine => Disposition::Quarantine,
enums::DmarcDisposition::Reject => Disposition::Reject,
enums::DmarcDisposition::Unspecified => Disposition::Unspecified,
}
}
}
impl From<Disposition> for enums::DmarcDisposition {
fn from(value: Disposition) -> Self {
match value {
Disposition::None => enums::DmarcDisposition::None,
Disposition::Quarantine => enums::DmarcDisposition::Quarantine,
Disposition::Reject => enums::DmarcDisposition::Reject,
Disposition::Unspecified => enums::DmarcDisposition::Unspecified,
}
}
}
impl From<enums::DmarcActionDisposition> for ActionDisposition {
fn from(value: enums::DmarcActionDisposition) -> Self {
match value {
enums::DmarcActionDisposition::None => ActionDisposition::None,
enums::DmarcActionDisposition::Pass => ActionDisposition::Pass,
enums::DmarcActionDisposition::Quarantine => ActionDisposition::Quarantine,
enums::DmarcActionDisposition::Reject => ActionDisposition::Reject,
enums::DmarcActionDisposition::Unspecified => ActionDisposition::Unspecified,
}
}
}
impl From<ActionDisposition> for enums::DmarcActionDisposition {
fn from(value: ActionDisposition) -> Self {
match value {
ActionDisposition::None => enums::DmarcActionDisposition::None,
ActionDisposition::Pass => enums::DmarcActionDisposition::Pass,
ActionDisposition::Quarantine => enums::DmarcActionDisposition::Quarantine,
ActionDisposition::Reject => enums::DmarcActionDisposition::Reject,
ActionDisposition::Unspecified => enums::DmarcActionDisposition::Unspecified,
}
}
}
impl From<enums::DmarcResult> for DmarcResult {
fn from(value: enums::DmarcResult) -> Self {
match value {
enums::DmarcResult::Pass => DmarcResult::Pass,
enums::DmarcResult::Fail => DmarcResult::Fail,
enums::DmarcResult::Unspecified => DmarcResult::Unspecified,
}
}
}
impl From<DmarcResult> for enums::DmarcResult {
fn from(value: DmarcResult) -> Self {
match value {
DmarcResult::Pass => enums::DmarcResult::Pass,
DmarcResult::Fail => enums::DmarcResult::Fail,
DmarcResult::Unspecified => enums::DmarcResult::Unspecified,
}
}
}
impl From<enums::DmarcPolicyOverride> for PolicyOverride {
fn from(value: enums::DmarcPolicyOverride) -> Self {
match value {
enums::DmarcPolicyOverride::Forwarded => PolicyOverride::Forwarded,
enums::DmarcPolicyOverride::SampledOut => PolicyOverride::SampledOut,
enums::DmarcPolicyOverride::TrustedForwarder => PolicyOverride::TrustedForwarder,
enums::DmarcPolicyOverride::MailingList => PolicyOverride::MailingList,
enums::DmarcPolicyOverride::LocalPolicy => PolicyOverride::LocalPolicy,
enums::DmarcPolicyOverride::Other => PolicyOverride::Other,
}
}
}
impl From<PolicyOverride> for enums::DmarcPolicyOverride {
fn from(value: PolicyOverride) -> Self {
match value {
PolicyOverride::Forwarded => enums::DmarcPolicyOverride::Forwarded,
PolicyOverride::SampledOut => enums::DmarcPolicyOverride::SampledOut,
PolicyOverride::TrustedForwarder => enums::DmarcPolicyOverride::TrustedForwarder,
PolicyOverride::MailingList => enums::DmarcPolicyOverride::MailingList,
PolicyOverride::LocalPolicy => enums::DmarcPolicyOverride::LocalPolicy,
PolicyOverride::Other => enums::DmarcPolicyOverride::Other,
}
}
}
impl From<enums::DkimAuthResult> for DkimResult {
fn from(value: enums::DkimAuthResult) -> Self {
match value {
enums::DkimAuthResult::None => DkimResult::None,
enums::DkimAuthResult::Pass => DkimResult::Pass,
enums::DkimAuthResult::Fail => DkimResult::Fail,
enums::DkimAuthResult::Policy => DkimResult::Policy,
enums::DkimAuthResult::Neutral => DkimResult::Neutral,
enums::DkimAuthResult::TempError => DkimResult::TempError,
enums::DkimAuthResult::PermError => DkimResult::PermError,
}
}
}
impl From<DkimResult> for enums::DkimAuthResult {
fn from(value: DkimResult) -> Self {
match value {
DkimResult::None => enums::DkimAuthResult::None,
DkimResult::Pass => enums::DkimAuthResult::Pass,
DkimResult::Fail => enums::DkimAuthResult::Fail,
DkimResult::Policy => enums::DkimAuthResult::Policy,
DkimResult::Neutral => enums::DkimAuthResult::Neutral,
DkimResult::TempError => enums::DkimAuthResult::TempError,
DkimResult::PermError => enums::DkimAuthResult::PermError,
}
}
}
impl From<enums::SpfAuthResult> for SpfResult {
fn from(value: enums::SpfAuthResult) -> Self {
match value {
enums::SpfAuthResult::None => SpfResult::None,
enums::SpfAuthResult::Neutral => SpfResult::Neutral,
enums::SpfAuthResult::Pass => SpfResult::Pass,
enums::SpfAuthResult::Fail => SpfResult::Fail,
enums::SpfAuthResult::SoftFail => SpfResult::SoftFail,
enums::SpfAuthResult::TempError => SpfResult::TempError,
enums::SpfAuthResult::PermError => SpfResult::PermError,
}
}
}
impl From<SpfResult> for enums::SpfAuthResult {
fn from(value: SpfResult) -> Self {
match value {
SpfResult::None => enums::SpfAuthResult::None,
SpfResult::Neutral => enums::SpfAuthResult::Neutral,
SpfResult::Pass => enums::SpfAuthResult::Pass,
SpfResult::Fail => enums::SpfAuthResult::Fail,
SpfResult::SoftFail => enums::SpfAuthResult::SoftFail,
SpfResult::TempError => enums::SpfAuthResult::TempError,
SpfResult::PermError => enums::SpfAuthResult::PermError,
}
}
}
impl From<enums::SpfDomainScope> for SPFDomainScope {
fn from(value: enums::SpfDomainScope) -> Self {
match value {
enums::SpfDomainScope::Helo => SPFDomainScope::Helo,
enums::SpfDomainScope::MailFrom => SPFDomainScope::MailFrom,
enums::SpfDomainScope::Unspecified => SPFDomainScope::Unspecified,
}
}
}
impl From<SPFDomainScope> for enums::SpfDomainScope {
fn from(value: SPFDomainScope) -> Self {
match value {
SPFDomainScope::Helo => enums::SpfDomainScope::Helo,
SPFDomainScope::MailFrom => enums::SpfDomainScope::MailFrom,
SPFDomainScope::Unspecified => enums::SpfDomainScope::Unspecified,
}
}
}
impl From<structs::DmarcPolicyOverrideReason> for PolicyOverrideReason {
fn from(value: structs::DmarcPolicyOverrideReason) -> Self {
PolicyOverrideReason {
type_: value.class.into(),
comment: value.comment,
}
}
}
impl From<PolicyOverrideReason> for structs::DmarcPolicyOverrideReason {
fn from(value: PolicyOverrideReason) -> Self {
structs::DmarcPolicyOverrideReason {
class: value.type_.into(),
comment: value.comment,
}
}
}
impl From<structs::DmarcDkimResult> for DKIMAuthResult {
fn from(value: structs::DmarcDkimResult) -> Self {
DKIMAuthResult {
domain: value.domain,
selector: value.selector,
result: value.result.into(),
human_result: value.human_result,
}
}
}
impl From<DKIMAuthResult> for structs::DmarcDkimResult {
fn from(value: DKIMAuthResult) -> Self {
structs::DmarcDkimResult {
domain: value.domain,
selector: value.selector,
result: value.result.into(),
human_result: value.human_result,
}
}
}
impl From<structs::DmarcSpfResult> for SPFAuthResult {
fn from(value: structs::DmarcSpfResult) -> Self {
SPFAuthResult {
domain: value.domain,
scope: value.scope.into(),
result: value.result.into(),
human_result: value.human_result,
}
}
}
impl From<SPFAuthResult> for structs::DmarcSpfResult {
fn from(value: SPFAuthResult) -> Self {
structs::DmarcSpfResult {
domain: value.domain,
scope: value.scope.into(),
result: value.result.into(),
human_result: value.human_result,
}
}
}
impl From<structs::DmarcExtension> for Extension {
fn from(value: structs::DmarcExtension) -> Self {
Extension {
name: value.name,
definition: value.definition,
}
}
}
impl From<Extension> for structs::DmarcExtension {
fn from(value: Extension) -> Self {
structs::DmarcExtension {
name: value.name,
definition: value.definition,
}
}
}
impl From<structs::DmarcReportRecord> for Record {
fn from(value: structs::DmarcReportRecord) -> Self {
Record {
row: Row {
source_ip: value.source_ip.map(|ip| ip.into_inner()),
count: value.count as u32,
policy_evaluated: PolicyEvaluated {
disposition: value.evaluated_disposition.into(),
dkim: value.evaluated_dkim.into(),
spf: value.evaluated_spf.into(),
reason: value
.policy_override_reasons
.into_iter()
.map(Into::into)
.collect(),
},
},
identifiers: Identifier {
envelope_to: value.envelope_to,
envelope_from: value.envelope_from,
header_from: value.header_from,
},
auth_results: AuthResult {
dkim: value.dkim_results.into_iter().map(Into::into).collect(),
spf: value.spf_results.into_iter().map(Into::into).collect(),
},
extensions: value.extensions.into_iter().map(Into::into).collect(),
}
}
}
impl From<Record> for structs::DmarcReportRecord {
fn from(value: Record) -> Self {
structs::DmarcReportRecord {
count: value.row.count as u64,
source_ip: value.row.source_ip.map(IpAddr),
evaluated_disposition: value.row.policy_evaluated.disposition.into(),
evaluated_dkim: value.row.policy_evaluated.dkim.into(),
evaluated_spf: value.row.policy_evaluated.spf.into(),
policy_override_reasons: value
.row
.policy_evaluated
.reason
.into_iter()
.map(Into::into)
.collect(),
envelope_to: value.identifiers.envelope_to,
envelope_from: value.identifiers.envelope_from,
header_from: value.identifiers.header_from,
dkim_results: value
.auth_results
.dkim
.into_iter()
.map(Into::into)
.collect(),
spf_results: value.auth_results.spf.into_iter().map(Into::into).collect(),
extensions: value.extensions.into_iter().map(Into::into).collect(),
}
}
}
impl From<structs::DmarcReport> for Report {
fn from(value: structs::DmarcReport) -> Self {
Report {
version: value.version as f32,
report_metadata: ReportMetadata {
org_name: value.org_name,
email: value.email,
extra_contact_info: value.extra_contact_info,
report_id: value.report_id,
date_range: mail_auth::report::DateRange {
begin: value.date_range_begin.timestamp() as u64,
end: value.date_range_end.timestamp() as u64,
},
error: value.errors,
},
policy_published: PolicyPublished {
domain: value.policy_domain,
version_published: value.policy_version.as_deref().and_then(|v| v.parse().ok()),
adkim: value.policy_adkim.into(),
aspf: value.policy_aspf.into(),
p: value.policy_disposition.into(),
sp: value.policy_subdomain_disposition.into(),
testing: value.policy_testing_mode,
fo: failure_reporting_options_to_fo(&value.policy_failure_reporting_options),
},
record: value.records.into_iter().map(Into::into).collect(),
extensions: value.extensions.into_iter().map(Into::into).collect(),
}
}
}
impl From<Report> for structs::DmarcReport {
fn from(value: Report) -> Self {
structs::DmarcReport {
version: value.version as f64,
date_range_begin: UTCDateTime::from_timestamp(
value.report_metadata.date_range.begin as i64,
),
date_range_end: UTCDateTime::from_timestamp(
value.report_metadata.date_range.end as i64,
),
email: value.report_metadata.email,
errors: value.report_metadata.error,
extensions: value.extensions.into_iter().map(Into::into).collect(),
extra_contact_info: value.report_metadata.extra_contact_info,
org_name: value.report_metadata.org_name,
policy_adkim: value.policy_published.adkim.into(),
policy_aspf: value.policy_published.aspf.into(),
policy_disposition: value.policy_published.p.into(),
policy_domain: value.policy_published.domain,
policy_failure_reporting_options: fo_to_failure_reporting_options(
&value.policy_published.fo,
),
policy_subdomain_disposition: value.policy_published.sp.into(),
policy_testing_mode: value.policy_published.testing,
policy_version: value
.policy_published
.version_published
.map(|v| v.to_string()),
records: value.record.into_iter().map(Into::into).collect(),
report_id: value.report_metadata.report_id,
}
}
}
impl From<enums::ArfAuthFailureType> for AuthFailureType {
fn from(value: enums::ArfAuthFailureType) -> Self {
match value {
enums::ArfAuthFailureType::Adsp => AuthFailureType::Adsp,
enums::ArfAuthFailureType::BodyHash => AuthFailureType::BodyHash,
enums::ArfAuthFailureType::Revoked => AuthFailureType::Revoked,
enums::ArfAuthFailureType::Signature => AuthFailureType::Signature,
enums::ArfAuthFailureType::Spf => AuthFailureType::Spf,
enums::ArfAuthFailureType::Dmarc => AuthFailureType::Dmarc,
enums::ArfAuthFailureType::Unspecified => AuthFailureType::Unspecified,
}
}
}
impl From<AuthFailureType> for enums::ArfAuthFailureType {
fn from(value: AuthFailureType) -> Self {
match value {
AuthFailureType::Adsp => enums::ArfAuthFailureType::Adsp,
AuthFailureType::BodyHash => enums::ArfAuthFailureType::BodyHash,
AuthFailureType::Revoked => enums::ArfAuthFailureType::Revoked,
AuthFailureType::Signature => enums::ArfAuthFailureType::Signature,
AuthFailureType::Spf => enums::ArfAuthFailureType::Spf,
AuthFailureType::Dmarc => enums::ArfAuthFailureType::Dmarc,
AuthFailureType::Unspecified => enums::ArfAuthFailureType::Unspecified,
}
}
}
impl From<enums::ArfDeliveryResult> for DeliveryResult {
fn from(value: enums::ArfDeliveryResult) -> Self {
match value {
enums::ArfDeliveryResult::Delivered => DeliveryResult::Delivered,
enums::ArfDeliveryResult::Spam => DeliveryResult::Spam,
enums::ArfDeliveryResult::Policy => DeliveryResult::Policy,
enums::ArfDeliveryResult::Reject => DeliveryResult::Reject,
enums::ArfDeliveryResult::Other => DeliveryResult::Other,
enums::ArfDeliveryResult::Unspecified => DeliveryResult::Unspecified,
}
}
}
impl From<DeliveryResult> for enums::ArfDeliveryResult {
fn from(value: DeliveryResult) -> Self {
match value {
DeliveryResult::Delivered => enums::ArfDeliveryResult::Delivered,
DeliveryResult::Spam => enums::ArfDeliveryResult::Spam,
DeliveryResult::Policy => enums::ArfDeliveryResult::Policy,
DeliveryResult::Reject => enums::ArfDeliveryResult::Reject,
DeliveryResult::Other => enums::ArfDeliveryResult::Other,
DeliveryResult::Unspecified => enums::ArfDeliveryResult::Unspecified,
}
}
}
impl From<enums::ArfFeedbackType> for FeedbackType {
fn from(value: enums::ArfFeedbackType) -> Self {
match value {
enums::ArfFeedbackType::Abuse => FeedbackType::Abuse,
enums::ArfFeedbackType::AuthFailure => FeedbackType::AuthFailure,
enums::ArfFeedbackType::Fraud => FeedbackType::Fraud,
enums::ArfFeedbackType::NotSpam => FeedbackType::NotSpam,
enums::ArfFeedbackType::Virus => FeedbackType::Virus,
enums::ArfFeedbackType::Other => FeedbackType::Other,
}
}
}
impl From<FeedbackType> for enums::ArfFeedbackType {
fn from(value: FeedbackType) -> Self {
match value {
FeedbackType::Abuse => enums::ArfFeedbackType::Abuse,
FeedbackType::AuthFailure => enums::ArfFeedbackType::AuthFailure,
FeedbackType::Fraud => enums::ArfFeedbackType::Fraud,
FeedbackType::NotSpam => enums::ArfFeedbackType::NotSpam,
FeedbackType::Virus => enums::ArfFeedbackType::Virus,
FeedbackType::Other => enums::ArfFeedbackType::Other,
}
}
}
impl From<enums::ArfIdentityAlignment> for IdentityAlignment {
fn from(value: enums::ArfIdentityAlignment) -> Self {
match value {
enums::ArfIdentityAlignment::None => IdentityAlignment::None,
enums::ArfIdentityAlignment::Spf => IdentityAlignment::Spf,
enums::ArfIdentityAlignment::Dkim => IdentityAlignment::Dkim,
enums::ArfIdentityAlignment::DkimSpf => IdentityAlignment::DkimSpf,
enums::ArfIdentityAlignment::Unspecified => IdentityAlignment::Unspecified,
}
}
}
impl From<IdentityAlignment> for enums::ArfIdentityAlignment {
fn from(value: IdentityAlignment) -> Self {
match value {
IdentityAlignment::None => enums::ArfIdentityAlignment::None,
IdentityAlignment::Spf => enums::ArfIdentityAlignment::Spf,
IdentityAlignment::Dkim => enums::ArfIdentityAlignment::Dkim,
IdentityAlignment::DkimSpf => enums::ArfIdentityAlignment::DkimSpf,
IdentityAlignment::Unspecified => enums::ArfIdentityAlignment::Unspecified,
}
}
}
impl From<structs::ArfFeedbackReport> for Feedback<'static> {
fn from(value: structs::ArfFeedbackReport) -> Self {
Feedback {
feedback_type: value.feedback_type.into(),
arrival_date: value.arrival_date.map(|d| d.timestamp()),
authentication_results: value
.authentication_results
.into_iter()
.map(Cow::Owned)
.collect(),
incidents: value.incidents as u32,
original_envelope_id: value.original_envelope_id.map(Cow::Owned),
original_mail_from: value.original_mail_from.map(Cow::Owned),
original_rcpt_to: value.original_rcpt_to.map(Cow::Owned),
reported_domain: value.reported_domains.into_iter().map(Cow::Owned).collect(),
reported_uri: value.reported_uris.into_iter().map(Cow::Owned).collect(),
reporting_mta: value.reporting_mta.map(Cow::Owned),
source_ip: value.source_ip.map(|ip| ip.into_inner()),
user_agent: value.user_agent.map(Cow::Owned),
version: value.version as u32,
source_port: value.source_port.unwrap_or(0) as u32,
auth_failure: value.auth_failure.into(),
delivery_result: value.delivery_result.into(),
dkim_adsp_dns: value.dkim_adsp_dns.map(Cow::Owned),
dkim_canonicalized_body: value.dkim_canonicalized_body.map(Cow::Owned),
dkim_canonicalized_header: value.dkim_canonicalized_header.map(Cow::Owned),
dkim_domain: value.dkim_domain.map(Cow::Owned),
dkim_identity: value.dkim_identity.map(Cow::Owned),
dkim_selector: value.dkim_selector.map(Cow::Owned),
dkim_selector_dns: value.dkim_selector_dns.map(Cow::Owned),
spf_dns: value.spf_dns.map(Cow::Owned),
identity_alignment: value.identity_alignment.into(),
message: value.message.map(Cow::Owned),
headers: value.headers.map(Cow::Owned),
}
}
}
impl From<Feedback<'_>> for structs::ArfFeedbackReport {
fn from(value: Feedback<'_>) -> Self {
let port = value.source_port;
structs::ArfFeedbackReport {
arrival_date: value.arrival_date.map(UTCDateTime::from_timestamp),
auth_failure: value.auth_failure.into(),
authentication_results: value
.authentication_results
.into_iter()
.map(|s| s.into_owned())
.collect(),
delivery_result: value.delivery_result.into(),
dkim_adsp_dns: value.dkim_adsp_dns.map(|s| s.into_owned()),
dkim_canonicalized_body: value.dkim_canonicalized_body.map(|s| s.into_owned()),
dkim_canonicalized_header: value.dkim_canonicalized_header.map(|s| s.into_owned()),
dkim_domain: value.dkim_domain.map(|s| s.into_owned()),
dkim_identity: value.dkim_identity.map(|s| s.into_owned()),
dkim_selector: value.dkim_selector.map(|s| s.into_owned()),
dkim_selector_dns: value.dkim_selector_dns.map(|s| s.into_owned()),
feedback_type: value.feedback_type.into(),
headers: value.headers.map(|s| s.into_owned()),
identity_alignment: value.identity_alignment.into(),
incidents: value.incidents as u64,
message: value.message.map(|s| s.into_owned()),
original_envelope_id: value.original_envelope_id.map(|s| s.into_owned()),
original_mail_from: value.original_mail_from.map(|s| s.into_owned()),
original_rcpt_to: value.original_rcpt_to.map(|s| s.into_owned()),
reported_domains: value
.reported_domain
.into_iter()
.map(|s| s.into_owned())
.collect(),
reported_uris: value
.reported_uri
.into_iter()
.map(|s| s.into_owned())
.collect(),
reporting_mta: value.reporting_mta.map(|s| s.into_owned()),
source_ip: value.source_ip.map(IpAddr),
source_port: if port == 0 || port > 65535 {
None
} else {
Some(port as u64)
},
spf_dns: value.spf_dns.map(|s| s.into_owned()),
user_agent: value.user_agent.map(|s| s.into_owned()),
version: value.version as u64,
}
}
}
impl From<enums::TlsPolicyType> for PolicyType {
fn from(value: enums::TlsPolicyType) -> Self {
match value {
enums::TlsPolicyType::Tlsa => PolicyType::Tlsa,
enums::TlsPolicyType::Sts => PolicyType::Sts,
enums::TlsPolicyType::NoPolicyFound => PolicyType::NoPolicyFound,
enums::TlsPolicyType::Other => PolicyType::Other,
}
}
}
impl From<PolicyType> for enums::TlsPolicyType {
fn from(value: PolicyType) -> Self {
match value {
PolicyType::Tlsa => enums::TlsPolicyType::Tlsa,
PolicyType::Sts => enums::TlsPolicyType::Sts,
PolicyType::NoPolicyFound => enums::TlsPolicyType::NoPolicyFound,
PolicyType::Other => enums::TlsPolicyType::Other,
}
}
}
impl From<enums::TlsResultType> for ResultType {
fn from(value: enums::TlsResultType) -> Self {
match value {
enums::TlsResultType::StartTlsNotSupported => ResultType::StartTlsNotSupported,
enums::TlsResultType::CertificateHostMismatch => ResultType::CertificateHostMismatch,
enums::TlsResultType::CertificateExpired => ResultType::CertificateExpired,
enums::TlsResultType::CertificateNotTrusted => ResultType::CertificateNotTrusted,
enums::TlsResultType::ValidationFailure => ResultType::ValidationFailure,
enums::TlsResultType::TlsaInvalid => ResultType::TlsaInvalid,
enums::TlsResultType::DnssecInvalid => ResultType::DnssecInvalid,
enums::TlsResultType::DaneRequired => ResultType::DaneRequired,
enums::TlsResultType::StsPolicyFetchError => ResultType::StsPolicyFetchError,
enums::TlsResultType::StsPolicyInvalid => ResultType::StsPolicyInvalid,
enums::TlsResultType::StsWebpkiInvalid => ResultType::StsWebpkiInvalid,
enums::TlsResultType::Other => ResultType::Other,
}
}
}
impl From<ResultType> for enums::TlsResultType {
fn from(value: ResultType) -> Self {
match value {
ResultType::StartTlsNotSupported => enums::TlsResultType::StartTlsNotSupported,
ResultType::CertificateHostMismatch => enums::TlsResultType::CertificateHostMismatch,
ResultType::CertificateExpired => enums::TlsResultType::CertificateExpired,
ResultType::CertificateNotTrusted => enums::TlsResultType::CertificateNotTrusted,
ResultType::ValidationFailure => enums::TlsResultType::ValidationFailure,
ResultType::TlsaInvalid => enums::TlsResultType::TlsaInvalid,
ResultType::DnssecInvalid => enums::TlsResultType::DnssecInvalid,
ResultType::DaneRequired => enums::TlsResultType::DaneRequired,
ResultType::StsPolicyFetchError => enums::TlsResultType::StsPolicyFetchError,
ResultType::StsPolicyInvalid => enums::TlsResultType::StsPolicyInvalid,
ResultType::StsWebpkiInvalid => enums::TlsResultType::StsWebpkiInvalid,
ResultType::Other => enums::TlsResultType::Other,
}
}
}
impl From<structs::TlsFailureDetails> for FailureDetails {
fn from(value: structs::TlsFailureDetails) -> Self {
FailureDetails {
result_type: value.result_type.into(),
sending_mta_ip: value.sending_mta_ip.map(|ip| ip.into_inner()),
receiving_mx_hostname: value.receiving_mx_hostname,
receiving_mx_helo: value.receiving_mx_helo,
receiving_ip: value.receiving_ip.map(|ip| ip.into_inner()),
failed_session_count: value.failed_session_count as u32,
additional_information: value.additional_information,
failure_reason_code: value.failure_reason_code,
}
}
}
impl From<FailureDetails> for structs::TlsFailureDetails {
fn from(value: FailureDetails) -> Self {
structs::TlsFailureDetails {
result_type: value.result_type.into(),
sending_mta_ip: value.sending_mta_ip.map(IpAddr),
receiving_mx_hostname: value.receiving_mx_hostname,
receiving_mx_helo: value.receiving_mx_helo,
receiving_ip: value.receiving_ip.map(IpAddr),
failed_session_count: value.failed_session_count as u64,
additional_information: value.additional_information,
failure_reason_code: value.failure_reason_code,
}
}
}
impl From<structs::TlsReportPolicy> for Policy {
fn from(value: structs::TlsReportPolicy) -> Self {
Policy {
policy: PolicyDetails {
policy_type: value.policy_type.into(),
policy_string: value.policy_strings,
policy_domain: value.policy_domain,
mx_host: value.mx_hosts,
},
summary: Summary {
total_success: value.total_successful_sessions as u32,
total_failure: value.total_failed_sessions as u32,
},
failure_details: value.failure_details.into_iter().map(Into::into).collect(),
}
}
}
impl From<Policy> for structs::TlsReportPolicy {
fn from(value: Policy) -> Self {
structs::TlsReportPolicy {
policy_type: value.policy.policy_type.into(),
policy_strings: value.policy.policy_string,
policy_domain: value.policy.policy_domain,
mx_hosts: value.policy.mx_host,
total_successful_sessions: value.summary.total_success as u64,
total_failed_sessions: value.summary.total_failure as u64,
failure_details: value.failure_details.into_iter().map(Into::into).collect(),
}
}
}
impl From<structs::TlsReport> for TlsReport {
fn from(value: structs::TlsReport) -> Self {
TlsReport {
organization_name: value.organization_name,
date_range: mail_auth::report::tlsrpt::DateRange::from_timestamps(
value.date_range_start.timestamp(),
value.date_range_end.timestamp(),
),
contact_info: value.contact_info,
report_id: value.report_id,
policies: value.policies.into_iter().map(Into::into).collect(),
}
}
}
impl From<TlsReport> for structs::TlsReport {
fn from(value: TlsReport) -> Self {
structs::TlsReport {
organization_name: value.organization_name,
date_range_start: UTCDateTime::from_timestamp(
value.date_range.start_datetime.to_timestamp(),
),
date_range_end: UTCDateTime::from_timestamp(
value.date_range.end_datetime.to_timestamp(),
),
contact_info: value.contact_info,
report_id: value.report_id,
policies: value.policies.into_iter().map(Into::into).collect(),
}
}
}
fn failure_reporting_options_to_fo(opts: &[enums::FailureReportingOption]) -> Option<String> {
let opts_len = opts.len();
if opts_len > 0 {
let mut out = String::with_capacity(opts_len * 2);
for (i, o) in opts.iter().enumerate() {
if i > 0 {
out.push(':');
}
match o {
enums::FailureReportingOption::All => out.push('0'),
enums::FailureReportingOption::Any => out.push('1'),
enums::FailureReportingOption::DkimFailure => out.push('d'),
enums::FailureReportingOption::SpfFailure => out.push('s'),
}
}
Some(out)
} else {
None
}
}
fn fo_to_failure_reporting_options(fo: &Option<String>) -> Vec<enums::FailureReportingOption> {
match fo {
None => vec![],
Some(s) if s.is_empty() => vec![],
Some(s) => s
.split(':')
.filter_map(|token| match token.trim() {
"0" => Some(enums::FailureReportingOption::All),
"1" => Some(enums::FailureReportingOption::Any),
"d" => Some(enums::FailureReportingOption::DkimFailure),
"s" => Some(enums::FailureReportingOption::SpfFailure),
_ => None,
})
.collect(),
}
}
#[inline(always)]
fn non_empty(s: &str) -> Option<&str> {
if s.is_empty() { None } else { Some(s) }
}
#[inline(always)]
fn non_empty_opt(s: &Option<String>) -> Option<&str> {
s.as_deref().filter(|s| !s.is_empty())
}

View File

@@ -0,0 +1,66 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::schema::prelude::{Task, TaskStatus, TaskStatusPending, UTCDateTime};
impl Task {
pub fn set_status(&mut self, status: TaskStatus) {
match self {
Task::IndexDocument(task) => task.status = status,
Task::UnindexDocument(task) => task.status = status,
Task::IndexTrace(task) => task.status = status,
Task::CalendarAlarmEmail(task) => task.status = status,
Task::CalendarAlarmNotification(task) => task.status = status,
Task::CalendarItipMessage(task) => task.status = status,
Task::MergeThreads(task) => task.status = status,
}
}
pub fn status(&self) -> &TaskStatus {
match self {
Task::IndexDocument(task) => &task.status,
Task::UnindexDocument(task) => &task.status,
Task::IndexTrace(task) => &task.status,
Task::CalendarAlarmEmail(task) => &task.status,
Task::CalendarAlarmNotification(task) => &task.status,
Task::CalendarItipMessage(task) => &task.status,
Task::MergeThreads(task) => &task.status,
}
}
pub fn attempt_number(&self) -> u64 {
match self.status() {
TaskStatus::Pending(_) => 0,
TaskStatus::Retry(status) => status.attempt_number,
TaskStatus::Failed(status) => status.failed_attempt_number,
}
}
pub fn due_timestamp(&self) -> u64 {
match self.status() {
TaskStatus::Pending(status) => status.due.timestamp() as u64,
TaskStatus::Retry(status) => status.due.timestamp() as u64,
TaskStatus::Failed(_) => u64::MAX,
}
}
}
impl TaskStatus {
pub fn now() -> Self {
let now = UTCDateTime::now();
TaskStatus::Pending(TaskStatusPending {
created_at: now,
due: now,
})
}
pub fn at(timestamp: i64) -> Self {
TaskStatus::Pending(TaskStatusPending {
due: UTCDateTime::from_timestamp(timestamp),
created_at: UTCDateTime::now(),
})
}
}

View File

@@ -4,24 +4,31 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashMap;
use common::Server;
use ahash::{AHashMap, AHashSet};
use common::{Server, psl};
use mail_auth::{
flate2::read::GzDecoder,
report::{ActionDisposition, DmarcResult, Feedback, Report, tlsrpt::TlsReport},
zip,
};
use mail_parser::{Message, MimeHeaders, PartType};
use registry::{
pickle::Pickle,
schema::{
prelude::ObjectType,
structs::{ArfExternalReport, DmarcExternalReport, TlsExternalReport},
},
types::{EnumImpl, datetime::UTCDateTime, index::IndexBuilder},
utils::report::ReportIndex,
};
use std::{
borrow::Cow,
collections::hash_map::Entry,
io::{Cursor, Read},
};
use store::{
Serialize,
write::{Archiver, BatchBuilder, ReportClass, ValueClass, now},
};
use store::write::{BatchBuilder, RegistryClass, ValueClass, now};
use trc::IncomingReportEvent;
use types::id::Id;
enum Compression {
None,
@@ -41,16 +48,6 @@ struct ReportData<'x> {
data: &'x [u8],
}
#[derive(
rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Serialize, serde::Deserialize,
)]
pub struct IncomingReport<T> {
pub from: String,
pub to: Vec<String>,
pub subject: String,
pub report: T,
}
pub trait AnalyzeReport: Sync + Send {
fn analyze_report(&self, message: Message<'static>, session_id: u64);
}
@@ -274,50 +271,105 @@ impl AnalyzeReport for Server {
// Store report
if let Some(expires_in) = &core.core.smtp.report.analysis.store {
let expires = now() + expires_in.as_secs();
let id = core.inner.data.queue_id_gen.generate();
let item_id = core.inner.data.queue_id_gen.generate();
let mut batch = BatchBuilder::new();
match report {
Format::Dmarc(report) => {
batch.set(
ValueClass::Report(ReportClass::Dmarc { id, expires }),
Archiver::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize()
.unwrap_or_default(),
);
let object_id = ObjectType::DmarcExternalReport.to_id();
let mut report = DmarcExternalReport {
from,
to,
subject,
member_tenant_id: vec![],
expires_at: UTCDateTime::from_timestamp(expires as i64),
received_at: UTCDateTime::now(),
report: report.into(),
};
report.member_tenant_id = tenant_ids(
&core,
report
.domains()
.filter_map(psl::domain_str)
.collect::<AHashSet<_>>(),
)
.await;
let mut index_builder = IndexBuilder::default();
report.build_search_index(&mut index_builder);
batch
.registry_index(object_id, item_id, index_builder.keys.iter(), true)
.set(
ValueClass::Registry(RegistryClass::Item {
object_id,
item_id,
}),
report.to_pickled_vec(),
);
}
Format::Tls(report) => {
batch.set(
ValueClass::Report(ReportClass::Tls { id, expires }),
Archiver::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize()
.unwrap_or_default(),
);
let object_id = ObjectType::TlsExternalReport.to_id();
let mut report = TlsExternalReport {
from,
to,
subject,
member_tenant_id: vec![],
expires_at: UTCDateTime::from_timestamp(expires as i64),
received_at: UTCDateTime::now(),
report: report.into(),
};
report.member_tenant_id = tenant_ids(
&core,
report
.domains()
.filter_map(psl::domain_str)
.collect::<AHashSet<_>>(),
)
.await;
let mut index_builder = IndexBuilder::default();
report.build_search_index(&mut index_builder);
batch
.registry_index(object_id, item_id, index_builder.keys.iter(), true)
.set(
ValueClass::Registry(RegistryClass::Item {
object_id,
item_id,
}),
report.to_pickled_vec(),
);
}
Format::Arf(report) => {
batch.set(
ValueClass::Report(ReportClass::Arf { id, expires }),
Archiver::new(IncomingReport {
from,
to,
subject,
report,
})
.serialize()
.unwrap_or_default(),
);
let object_id = ObjectType::ArfExternalReport.to_id();
let mut report = ArfExternalReport {
from,
to,
subject,
member_tenant_id: vec![],
expires_at: UTCDateTime::from_timestamp(expires as i64),
received_at: UTCDateTime::now(),
report: report.into(),
};
report.member_tenant_id = tenant_ids(
&core,
report
.domains()
.filter_map(psl::domain_str)
.collect::<AHashSet<_>>(),
)
.await;
let mut index_builder = IndexBuilder::default();
report.build_search_index(&mut index_builder);
batch
.registry_index(object_id, item_id, index_builder.keys.iter(), true)
.set(
ValueClass::Registry(RegistryClass::Item {
object_id,
item_id,
}),
report.to_pickled_vec(),
);
}
}
if let Err(err) = core.core.storage.data.write(batch.build_all()).await {
trc::error!(
err.span_id(session_id)
@@ -332,6 +384,29 @@ impl AnalyzeReport for Server {
}
}
async fn tenant_ids(server: &Server, domains: AHashSet<&str>) -> Vec<Id> {
let mut tenant_ids = Vec::with_capacity(domains.len());
for domain in domains {
if let Some(tenant_id) = server
.domain(domain)
.await
.map_err(|err| {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to lookup domain")
);
})
.unwrap_or_default()
.and_then(|domain| domain.id_tenant)
.map(Id::from)
&& !tenant_ids.contains(&tenant_id)
{
tenant_ids.push(tenant_id);
}
}
tenant_ids
}
trait LogReport {
fn log(&self);
}
@@ -493,12 +568,3 @@ impl LogReport for Feedback<'_> {
);
}
}
impl<T> IncomingReport<T> {
pub fn has_domain(&self, domain: &[String]) -> bool {
self.to
.iter()
.any(|to| domain.iter().any(|d| to.ends_with(d.as_str())))
|| domain.iter().any(|d| self.from.ends_with(d.as_str()))
}
}

View File

@@ -25,7 +25,7 @@ use registry::schema::structs::Rate;
use std::{collections::hash_map::Entry, future::Future};
use store::{
Deserialize, IterateParams, Serialize, ValueKey,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ReportEvent, ValueClass},
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ValueClass},
};
use trc::{AddContext, OutgoingReportEvent};
use utils::DomainPart;
@@ -318,7 +318,6 @@ pub trait DmarcReporting: Sync + Send {
serialized_size: Option<&mut serde_json::Serializer<SerializedSize>>,
span_id: u64,
) -> impl Future<Output = trc::Result<Option<Report>>> + Send;
fn delete_dmarc_report(&self, event: ReportEvent) -> impl Future<Output = ()> + Send;
fn schedule_dmarc(&self, event: Box<DmarcEvent>) -> impl Future<Output = ()> + Send;
}
@@ -576,47 +575,6 @@ impl DmarcReporting for Server {
Ok(Some(report))
}
async fn delete_dmarc_report(&self, event: ReportEvent) {
let from_key = ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: 0,
domain: event.domain.clone(),
};
let to_key = ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: u64::MAX,
domain: event.domain.clone(),
};
if let Err(err) = self
.core
.storage
.data
.delete_range(
ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(from_key))),
ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportEvent(to_key))),
)
.await
{
trc::error!(
err.caused_by(trc::location!())
.details("Failed to delete DMARC report")
);
return;
}
let mut batch = BatchBuilder::new();
batch.clear(ValueClass::Queue(QueueClass::DmarcReportHeader(event)));
if let Err(err) = self.core.storage.data.write(batch.build_all()).await {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to delete DMARC report")
);
}
}
async fn schedule_dmarc(&self, event: Box<DmarcEvent>) {
let created = event.interval.to_timestamp();
let deliver_at = created + event.interval.as_secs();

View File

@@ -21,7 +21,6 @@ use mail_auth::{
};
use mail_parser::DateTime;
use std::{future::Future, io, time::SystemTime};
use store::write::{ReportEvent, key::KeySerializer};
use tokio::io::{AsyncRead, AsyncWrite};
pub mod analysis;
@@ -320,27 +319,3 @@ impl io::Write for SerializedSize {
Ok(())
}
}
pub trait ReportLock {
fn tls_lock(&self) -> Vec<u8>;
fn dmarc_lock(&self) -> Vec<u8>;
}
impl ReportLock for ReportEvent {
fn tls_lock(&self) -> Vec<u8> {
KeySerializer::new(self.domain.len() + std::mem::size_of::<u64>() + 1)
.write(0u8)
.write(self.due)
.write(self.domain.as_bytes())
.finalize()
}
fn dmarc_lock(&self) -> Vec<u8> {
KeySerializer::new(self.domain.len() + (std::mem::size_of::<u64>() * 2) + 1)
.write(1u8)
.write(self.due)
.write(self.policy_hash)
.write(self.domain.as_bytes())
.finalize()
}
}

View File

@@ -4,247 +4,26 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::{AggregateTimestamp, ReportLock, dmarc::DmarcReporting, tls::TlsReporting};
use crate::queue::spool::LOCK_EXPIRY;
use ahash::AHashMap;
use common::{BuildServer, Inner, KV_LOCK_QUEUE_REPORT, Server, ipc::ReportingEvent};
use std::{
future::Future,
sync::Arc,
time::{Duration, SystemTime},
};
use store::{
Deserialize, IterateParams, Store, ValueKey,
write::{BatchBuilder, QueueClass, ReportEvent, ValueClass, now},
};
use super::{dmarc::DmarcReporting, tls::TlsReporting};
use common::{BuildServer, Inner, ipc::ReportingEvent};
use std::sync::Arc;
use tokio::sync::mpsc;
pub const REPORT_REFRESH: Duration = Duration::from_secs(86400);
pub trait SpawnReport {
fn spawn(self, core: Arc<Inner>);
}
impl SpawnReport for mpsc::Receiver<ReportingEvent> {
fn spawn(mut self, inner: Arc<Inner>) {
tokio::spawn(async move {
let mut next_wake_up = REPORT_REFRESH;
let mut refresh_queue = true;
loop {
while let Some(event) = self.recv().await {
let server = inner.build_server();
if refresh_queue {
// Read events
let events = next_report_event(server.store()).await;
let now = now();
next_wake_up = events
.last()
.and_then(|e| {
e.due()
.filter(|due| *due > now)
.map(|due| Duration::from_secs(due - now))
})
.unwrap_or(REPORT_REFRESH);
if events
.first()
.and_then(|e| e.due())
.is_some_and(|due| due <= now)
{
let server_ = server.clone();
tokio::spawn(async move {
let mut tls_reports = AHashMap::new();
for report_event in events {
match report_event {
QueueClass::DmarcReportHeader(event) if event.due <= now => {
let lock_name = event.dmarc_lock();
if server_.try_lock_report(&lock_name).await {
server_.send_dmarc_aggregate_report(event).await;
server_.unlock_report(&lock_name).await;
}
}
QueueClass::TlsReportHeader(event) if event.due <= now => {
tls_reports
.entry(event.domain.clone())
.or_insert_with(Vec::new)
.push(event);
}
_ => (),
}
}
for (_, tls_report) in tls_reports {
let lock_name = tls_report.first().unwrap().tls_lock();
if server_.try_lock_report(&lock_name).await {
server_.send_tls_aggregate_report(tls_report).await;
server_.unlock_report(&lock_name).await;
}
}
});
}
}
match tokio::time::timeout(next_wake_up, self.recv()).await {
Ok(Some(event)) => {
refresh_queue = false;
match event {
ReportingEvent::Dmarc(event) => {
next_wake_up = std::cmp::min(
next_wake_up,
Duration::from_secs(event.interval.due().saturating_sub(now())),
);
server.schedule_dmarc(event).await;
}
ReportingEvent::Tls(event) => {
next_wake_up = std::cmp::min(
next_wake_up,
Duration::from_secs(event.interval.due().saturating_sub(now())),
);
server.schedule_tls(event).await;
}
ReportingEvent::Stop => break,
}
}
Ok(None) => break,
Err(_) => {
refresh_queue = true;
}
match event {
ReportingEvent::Dmarc(event) => server.schedule_dmarc(event).await,
ReportingEvent::Tls(event) => server.schedule_tls(event).await,
ReportingEvent::Stop => break,
}
}
});
}
}
async fn next_report_event(store: &Store) -> Vec<QueueClass> {
let now = now();
let from_key = ValueKey::from(ValueClass::Queue(QueueClass::DmarcReportHeader(
ReportEvent {
due: 0,
policy_hash: 0,
seq_id: 0,
domain: String::new(),
},
)));
let to_key = ValueKey::from(ValueClass::Queue(QueueClass::TlsReportHeader(
ReportEvent {
due: now + REPORT_REFRESH.as_secs(),
policy_hash: 0,
seq_id: 0,
domain: String::new(),
},
)));
let mut events = Vec::new();
let mut old_locks = Vec::new();
let result = store
.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let event = ReportEvent::deserialize(key)?;
// TODO - REMOVEME - Part of v0.11 migration
if event.seq_id == 0 {
old_locks.push(if *key.last().unwrap() == 0 {
QueueClass::DmarcReportHeader(event)
} else {
QueueClass::TlsReportHeader(event)
});
return Ok(true);
}
let do_continue = event.due <= now;
events.push(if *key.last().unwrap() == 0 {
QueueClass::DmarcReportHeader(event)
} else {
QueueClass::TlsReportHeader(event)
});
Ok(do_continue)
},
)
.await;
// TODO - REMOVEME - Part of v0.11 migration
if !old_locks.is_empty() {
let mut batch = BatchBuilder::new();
for event in old_locks {
batch.clear(ValueClass::Queue(event));
}
if let Err(err) = store.write(batch.build_all()).await {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to remove old report events")
);
}
}
if let Err(err) = result {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to read from store")
);
}
events
}
pub trait LockReport: Sync + Send {
fn try_lock_report(&self, lock: &[u8]) -> impl Future<Output = bool> + Send;
fn unlock_report(&self, lock: &[u8]) -> impl Future<Output = ()> + Send;
}
impl LockReport for Server {
async fn try_lock_report(&self, key: &[u8]) -> bool {
match self
.in_memory_store()
.try_lock(KV_LOCK_QUEUE_REPORT, key, LOCK_EXPIRY)
.await
{
Ok(result) => {
if !result {
trc::event!(
OutgoingReport(trc::OutgoingReportEvent::Locked),
Expires = trc::Value::Timestamp(now() + LOCK_EXPIRY),
Key = key
);
}
result
}
Err(err) => {
trc::error!(
err.details("Failed to lock report.")
.caused_by(trc::location!())
);
false
}
}
}
async fn unlock_report(&self, key: &[u8]) {
if let Err(err) = self
.in_memory_store()
.remove_lock(KV_LOCK_QUEUE_REPORT, key)
.await
{
trc::error!(
err.details("Failed to unlock event.")
.caused_by(trc::location!())
);
}
}
}
pub trait ToTimestamp {
fn to_timestamp(&self) -> u64;
}
impl ToTimestamp for Duration {
fn to_timestamp(&self) -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
+ self.as_secs()
}
}
pub trait SpawnReport {
fn spawn(self, core: Arc<Inner>);
}

View File

@@ -6,7 +6,7 @@
use super::{AggregateTimestamp, SerializedSize};
use crate::{queue::RecipientDomain, reporting::SmtpReporting};
use ahash::AHashMap;
use ahash::{AHashMap, AHashSet};
use common::{
Server, USER_AGENT,
config::smtp::{
@@ -23,12 +23,22 @@ use mail_auth::{
},
};
use mail_parser::DateTime;
use registry::{
pickle::Pickle,
schema::{
enums::TlsPolicyType,
prelude::{ObjectType, Property},
structs::{Task, TlsFailureDetails, TlsInternalReport, TlsReport, TlsReportPolicy},
},
types::{EnumImpl, datetime::UTCDateTime},
};
use reqwest::header::CONTENT_TYPE;
use std::fmt::Write;
use std::{collections::hash_map::Entry, future::Future, sync::Arc, time::Duration};
use store::{
Deserialize, IterateParams, Serialize, ValueKey,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ReportEvent, ValueClass},
Deserialize, IterateParams, ValueKey,
registry::RegistryQuery,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, RegistryClass, ValueClass},
};
use trc::{AddContext, OutgoingReportEvent};
@@ -61,7 +71,6 @@ pub trait TlsReporting: Sync + Send {
span_id: u64,
) -> impl Future<Output = trc::Result<Option<TlsReport>>> + Send;
fn schedule_tls(&self, event: Box<TlsEvent>) -> impl Future<Output = ()> + Send;
fn delete_tls_report(&self, events: Vec<ReportEvent>) -> impl Future<Output = ()> + Send;
}
impl TlsReporting for Server {
@@ -397,42 +406,112 @@ impl TlsReporting for Server {
}
async fn schedule_tls(&self, event: Box<TlsEvent>) {
let created = event.interval.to_timestamp();
let deliver_at = created + event.interval.as_secs();
let mut report_event = ReportEvent {
due: deliver_at,
policy_hash: event.policy.to_hash(),
seq_id: created,
domain: event.domain,
// Find the report by domain name
let mut batch = BatchBuilder::new();
let object_id = ObjectType::TlsInternalReport.to_id();
let item_id;
let report = match self
.registry()
.query::<AHashSet<u64>>(
RegistryQuery::new(ObjectType::TlsInternalReport)
.equal(Property::Domain, event.domain.clone()),
)
.await
.map(|ids| ids.into_iter().next())
{
Ok(Some(item_id_)) => {
match self
.store()
.get_value::<TlsInternalReport>(ValueKey::from(ValueClass::Registry(
RegistryClass::Item {
object_id,
item_id: item_id_,
},
)))
.await
{
Ok(Some(report)) => {
item_id = item_id_;
Some(report)
}
Ok(None) => {
batch.clear(ValueClass::Registry(RegistryClass::Index {
index_id: Property::Domain.to_id(),
object_id,
item_id: item_id_,
key: event.domain.as_bytes().to_vec(),
}));
item_id = self.inner.data.queue_id_gen.generate();
None
}
Err(err) => {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to query registry for TLS report")
);
return;
}
}
}
Ok(None) => {
item_id = self.inner.data.queue_id_gen.generate();
None
}
Err(err) => {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to query registry for TLS report")
);
return;
}
};
// Write policy if missing
let mut builder = BatchBuilder::new();
if self
.core
.storage
.data
.get_value::<()>(ValueKey::from(ValueClass::Queue(
QueueClass::TlsReportHeader(report_event.clone()),
)))
.await
.unwrap_or_default()
.is_none()
// Generate policy if missing
let mut report = if let Some(report) = report {
report
} else {
batch.set(
ValueClass::Registry(RegistryClass::Index {
index_id: Property::Domain.to_id(),
object_id,
item_id,
key: event.domain.as_bytes().to_vec(),
}),
vec![],
);
let todo = "schedule task";
TlsInternalReport {
created_at: UTCDateTime::now(),
deliver_at: UTCDateTime::from_timestamp(
(event.interval.to_timestamp() + event.interval.as_secs()) as i64,
),
domain: event.domain,
..Default::default()
}
};
let policy_hash = event.policy.to_hash();
let policy = if let Some(policy) = report
.policy_identifiers
.iter()
.position(|id| *id == policy_hash)
.and_then(|idx| report.report.policies.get_mut(idx))
{
policy
} else {
// Serialize report
let mut policy = PolicyDetails {
policy_type: PolicyType::NoPolicyFound,
policy_string: vec![],
policy_domain: report_event.domain.clone(),
mx_host: vec![],
let mut policy = TlsReportPolicy {
policy_type: TlsPolicyType::NoPolicyFound,
policy_domain: report.domain.clone(),
..Default::default()
};
match event.policy {
common::ipc::PolicyType::Tlsa(tlsa) => {
policy.policy_type = PolicyType::Tlsa;
policy.policy_type = TlsPolicyType::Tlsa;
if let Some(tlsa) = tlsa {
for entry in &tlsa.entries {
policy.policy_string.push(format!(
policy.policy_strings.push(format!(
"{} {} {} {}",
if entry.is_end_entity { 3 } else { 2 },
i32::from(entry.is_spki),
@@ -449,10 +528,10 @@ impl TlsReporting for Server {
}
}
common::ipc::PolicyType::Sts(sts) => {
policy.policy_type = PolicyType::Sts;
policy.policy_type = TlsPolicyType::Sts;
if let Some(sts) = sts {
policy.policy_string.push("version: STSv1".to_string());
policy.policy_string.push(format!(
policy.policy_strings.push("version: STSv1".to_string());
policy.policy_strings.push(format!(
"mode: {}",
match sts.mode {
Mode::Enforce => "enforce",
@@ -461,113 +540,65 @@ impl TlsReporting for Server {
}
));
policy
.policy_string
.policy_strings
.push(format!("max_age: {}", sts.max_age));
for mx in &sts.mx {
let mx = match mx {
MxPattern::Equals(mx) => mx.to_string(),
MxPattern::StartsWith(mx) => format!("*.{mx}"),
};
policy.policy_string.push(format!("mx: {mx}"));
policy.mx_host.push(mx);
policy.policy_strings.push(format!("mx: {mx}"));
policy.mx_hosts.push(mx);
}
}
}
_ => (),
}
// Create report entry
let entry = TlsFormat {
rua: event.tls_record.rua.clone(),
policy,
records: vec![],
};
// Write report
builder.set(
ValueClass::Queue(QueueClass::TlsReportHeader(report_event.clone())),
match Archiver::new(entry).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to serialize TLS report")
);
return;
for rua in &event.tls_record.rua {
match rua {
ReportUri::Mail(mail) => {
if !report.mail_rua.contains(mail) {
report.mail_rua.push(mail.clone());
}
}
},
);
ReportUri::Http(uri) => {
if !report.http_rua.contains(uri) {
report.http_rua.push(uri.clone());
}
}
}
}
report.policy_identifiers.push(policy_hash);
report.report.policies.push(policy);
report.report.policies.last_mut().unwrap()
};
// Add failure details
if let Some(failure) = event.failure.map(TlsFailureDetails::from) {
if let Some(idx) = policy.failure_details.iter().position(|d| d == &failure) {
policy.failure_details[idx].failed_session_count += 1;
} else {
policy.failure_details.push(failure);
}
policy.total_failed_sessions += 1;
} else {
policy.total_successful_sessions += 1;
}
// Write entry
report_event.seq_id = self.inner.data.queue_id_gen.generate();
builder.set(
ValueClass::Queue(QueueClass::TlsReportEvent(report_event)),
match Archiver::new(event.failure).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to serialize TLS report")
);
return;
}
},
batch.set(
ValueClass::Registry(RegistryClass::Item { object_id, item_id }),
report.to_pickled_vec(),
);
if let Err(err) = self.core.storage.data.write(builder.build_all()).await {
if let Err(err) = self.core.storage.data.write(batch.build_all()).await {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to write TLS report")
);
}
}
async fn delete_tls_report(&self, events: Vec<ReportEvent>) {
let mut batch = BatchBuilder::new();
for event in events {
let from_key = ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: 0,
domain: event.domain.clone(),
};
let to_key = ReportEvent {
due: event.due,
policy_hash: event.policy_hash,
seq_id: u64::MAX,
domain: event.domain.clone(),
};
// Remove report events
if let Err(err) = self
.core
.storage
.data
.delete_range(
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(from_key))),
ValueKey::from(ValueClass::Queue(QueueClass::TlsReportEvent(to_key))),
)
.await
{
trc::error!(
err.caused_by(trc::location!())
.details("Failed to delete TLS reports")
);
return;
}
// Remove report header
batch.clear(ValueClass::Queue(QueueClass::TlsReportHeader(event)));
}
if let Err(err) = self.core.storage.data.write(batch.build_all()).await {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to delete TLS reports")
);
}
}
}

View File

@@ -203,7 +203,7 @@ impl SpamClassifier for Server {
.iterate(
IterateParams::new(from_key, to_key).descending(),
|key, value| {
let id = key.deserialize_be_u64(U16_LEN + 1)?;
let id = key.deserialize_be_u64(U16_LEN)?;
let sample = SpamTrainingSample::deserialize(value)?;
let until = sample.expires_at.timestamp() as u64;

View File

@@ -38,8 +38,10 @@ impl FdbStore {
let trx = self.read_trx().await?;
match read_chunked_value(&key, &trx, true).await? {
ChunkedValue::Single(bytes) => U::deserialize(&bytes).map(Some),
ChunkedValue::Chunked { bytes, .. } => U::deserialize_owned(bytes).map(Some),
ChunkedValue::Single(bytes) => U::deserialize_with_key(&key, &bytes).map(Some),
ChunkedValue::Chunked { bytes, .. } => {
U::deserialize_owned_with_key(&key, bytes).map(Some)
}
ChunkedValue::None => Ok(None),
}
}

View File

@@ -93,6 +93,7 @@ impl MysqlStore {
SUBSPACE_IN_MEMORY_VALUE,
SUBSPACE_PROPERTY,
SUBSPACE_REGISTRY,
SUBSPACE_DIRECTORY,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
@@ -124,7 +125,11 @@ impl MysqlStore {
.await
.map_err(into_error)?;
for table in [SUBSPACE_INDEXES] {
for table in [
SUBSPACE_INDEXES,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
] {
let table = char::from(table);
conn.query_drop(format!(
"CREATE TABLE IF NOT EXISTS {table} (

View File

@@ -28,7 +28,7 @@ impl MysqlStore {
.map_err(into_error)
.and_then(|r| {
if let Some(r) = r {
Ok(Some(U::deserialize_owned(r)?))
Ok(Some(U::deserialize_owned_with_key(&key, r)?))
} else {
Ok(None)
}

View File

@@ -13,7 +13,7 @@ use crate::{
},
*,
};
use ::registry::schema::structs;
use ::registry::schema::{enums::PostgreSqlRecyclingMethod, structs};
use deadpool::managed::Object;
use deadpool_postgres::{Config, Manager, ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
use tokio_postgres::NoTls;
@@ -30,7 +30,11 @@ impl PostgresStore {
cfg.connect_timeout = config.timeout.map(|t| t.into_inner());
cfg.options = config.options;
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Clean,
recycling_method: match config.pool_recycling_method {
PostgreSqlRecyclingMethod::Fast => RecyclingMethod::Fast,
PostgreSqlRecyclingMethod::Verified => RecyclingMethod::Verified,
PostgreSqlRecyclingMethod::Clean => RecyclingMethod::Clean,
},
});
if let Some(max_conn) = config.pool_max_connections {
cfg.pool = PoolConfig::new(max_conn as usize).into();
@@ -101,6 +105,7 @@ impl PostgresStore {
SUBSPACE_REPORT_IN,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
SUBSPACE_DIRECTORY,
SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TELEMETRY_METRIC,
] {
@@ -118,7 +123,11 @@ impl PostgresStore {
.map_err(into_error)?;
}
for table in [SUBSPACE_INDEXES] {
for table in [
SUBSPACE_INDEXES,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
] {
let table = char::from(table);
conn.execute(
&format!(

View File

@@ -30,7 +30,7 @@ impl PostgresStore {
.map_err(into_error)
.and_then(|r| {
if let Some(r) = r {
Ok(Some(U::deserialize(r.get(0))?))
Ok(Some(U::deserialize_with_key(&key, r.get(0))?))
} else {
Ok(None)
}

View File

@@ -61,8 +61,9 @@ impl RocksDbStore {
SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_SEARCH_INDEX,
SUBSPACE_SPAM_SAMPLES,
LEGACY_SUBSPACE_BITMAP_ID,
LEGACY_SUBSPACE_BITMAP_TAG,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
SUBSPACE_DIRECTORY,
LEGACY_SUBSPACE_BITMAP_TEXT,
LEGACY_SUBSPACE_FTS_INDEX,
] {

View File

@@ -17,15 +17,17 @@ impl RocksDbStore {
{
let db = self.db.clone();
self.spawn_worker(move || {
let subspace = &[key.subspace()];
let key = key.serialize(0);
db.get_pinned_cf(
&db.cf_handle(std::str::from_utf8(&[key.subspace()]).unwrap())
&db.cf_handle(unsafe { std::str::from_utf8_unchecked(subspace.as_slice()) })
.unwrap(),
key.serialize(0),
&key,
)
.map_err(into_error)
.and_then(|value| {
if let Some(value) = value {
U::deserialize(&value).map(Some)
U::deserialize_with_key(&key, &value).map(Some)
} else {
Ok(None)
}

View File

@@ -6,13 +6,11 @@
use super::DocumentSet;
use crate::{
Deserialize, IterateParams, Key, QueryResult, SUBSPACE_COUNTER, SUBSPACE_DELETED_ITEMS,
SUBSPACE_INDEXES, SUBSPACE_LOGS, Store, U32_LEN, Value, ValueKey,
Deserialize, IterateParams, Key, QueryResult, SUBSPACE_COUNTER, SUBSPACE_INDEXES,
SUBSPACE_LOGS, Store, U32_LEN, Value, ValueKey,
write::{
AnyClass, AnyKey, AssignedIds, Batch, BatchBuilder, Operation, ReportClass, ValueClass,
ValueOp,
AnyClass, AnyKey, AssignedIds, Batch, BatchBuilder, Operation, ValueClass, ValueOp,
key::{DeserializeBigEndian, KeySerializer},
now,
},
};
use compact_str::ToCompactString;
@@ -189,36 +187,6 @@ impl Store {
}
pub async fn purge_store(&self) -> trc::Result<()> {
// Delete expired reports
let now = now();
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Dmarc { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Dmarc {
id: u64::MAX,
expires: now,
})),
)
.await
.caused_by(trc::location!())?;
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Tls { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Tls {
id: u64::MAX,
expires: now,
})),
)
.await
.caused_by(trc::location!())?;
self.delete_range(
ValueKey::from(ValueClass::Report(ReportClass::Arf { id: 0, expires: 0 })),
ValueKey::from(ValueClass::Report(ReportClass::Arf {
id: u64::MAX,
expires: now,
})),
)
.await
.caused_by(trc::location!())?;
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.purge_store().await,

View File

@@ -29,18 +29,29 @@ pub use roaring;
use utils::snowflake::SnowflakeIdGenerator;
pub use xxhash_rust;
use crate::backend::{elastic::ElasticSearchStore, meili::MeiliSearchStore};
use ahash::{AHashMap, AHashSet};
use backend::{fs::FsStore, http::HttpStore, memory::StaticMemoryStore};
use std::{borrow::Cow, path::PathBuf, sync::Arc};
use write::ValueClass;
use crate::backend::{elastic::ElasticSearchStore, meili::MeiliSearchStore};
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> trc::Result<Self>;
#[inline(always)]
fn deserialize_owned(bytes: Vec<u8>) -> trc::Result<Self> {
Self::deserialize(&bytes)
}
#[inline(always)]
fn deserialize_with_key(_: &[u8], bytes: &[u8]) -> trc::Result<Self> {
Self::deserialize(bytes)
}
#[inline(always)]
fn deserialize_owned_with_key(key: &[u8], bytes: Vec<u8>) -> trc::Result<Self> {
Self::deserialize_with_key(key, &bytes)
}
}
pub trait Serialize {
@@ -105,7 +116,9 @@ pub const SUBSPACE_IN_MEMORY_VALUE: u8 = b'm';
pub const SUBSPACE_IN_MEMORY_COUNTER: u8 = b'y';
pub const SUBSPACE_PROPERTY: u8 = b'p';
pub const SUBSPACE_REGISTRY: u8 = b's';
pub const SUBSPACE_REGISTRY_DIRECTORY: u8 = b'd';
pub const SUBSPACE_REGISTRY_IDX: u8 = b'b';
pub const SUBSPACE_REGISTRY_IDX_GLOBAL: u8 = b'c';
pub const SUBSPACE_DIRECTORY: u8 = b'd';
pub const SUBSPACE_QUEUE_MESSAGE: u8 = b'e';
pub const SUBSPACE_QUEUE_EVENT: u8 = b'q';
pub const SUBSPACE_QUOTA: u8 = b'u';
@@ -118,8 +131,6 @@ pub const SUBSPACE_DELETED_ITEMS: u8 = b'j';
pub const SUBSPACE_SPAM_SAMPLES: u8 = b'w';
// TODO: Remove in v1.0
pub const LEGACY_SUBSPACE_BITMAP_ID: u8 = b'b';
pub const LEGACY_SUBSPACE_BITMAP_TAG: u8 = b'c';
pub const LEGACY_SUBSPACE_BITMAP_TEXT: u8 = b'v';
pub const LEGACY_SUBSPACE_FTS_INDEX: u8 = b'g';

View File

@@ -10,7 +10,7 @@ use crate::{
write::{AnyClass, RegistryClass, ValueClass, key::KeySerializer},
};
use registry::{
pickle::{Pickle, PickledStream},
pickle::PickledStream,
schema::prelude::Object,
types::{EnumImpl, ObjectImpl, id::ObjectId},
};
@@ -107,22 +107,19 @@ impl RegistryStore {
.ctx(trc::Key::Key, key)
})?;
let mut stream = PickledStream::new(value);
let _ = u16::unpickle(&mut stream);
let (object, revision) = T::unpickle(&mut stream)
.and_then(|item| u32::unpickle(&mut stream).map(|rev| (item, rev)))
.ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.id(id)
.details(object_type.as_str())
.ctx(trc::Key::Value, value)
})?;
let object = T::unpickle(&mut stream).ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.id(id)
.details(object_type.as_str())
.ctx(trc::Key::Value, value)
})?;
results.push(RegistryObject {
id: ObjectId::new(object_type, Id::new(id)),
object,
revision,
revision: xxhash_rust::xxh3::xxh3_64(value),
});
Ok(true)

View File

@@ -8,7 +8,7 @@ use crate::{RegistryStore, RegistryStoreInner, Store};
use ahash::AHashMap;
use parking_lot::RwLock;
use registry::{
schema::prelude::{OBJ_SINGLETON, Object, ObjectType},
schema::prelude::{OBJ_SINGLETON, Object, ObjectInner, ObjectType},
types::{EnumImpl, id::ObjectId},
};
use serde_json::{Map, Value, map::Entry};
@@ -48,9 +48,10 @@ impl RegistryStoreInner {
));
}
if local_registry
.insert(ObjectId::new(object_type, Id::new(id)), Object::deserialize(object_type, value).map_err(|err| {
.insert(ObjectId::new(object_type, Id::new(id)), ObjectInner::deserialize(object_type, value).map_err(|err| {
format!("{error_msg}: Failed to parse object {key:?} with id {id}: {err}")
}).and_then(|obj| {
}).and_then(|inner| {
let obj = Object { inner, revision: 0 };
let mut errors = Vec::new();
obj.validate(&mut errors);
if errors.is_empty() {
@@ -76,11 +77,12 @@ impl RegistryStoreInner {
} else if local_registry
.insert(
ObjectId::new(object_type, Id::singleton()),
Object::deserialize(object_type, object)
ObjectInner::deserialize(object_type, object)
.map_err(|err| {
format!("{error_msg}: Failed to parse object {key:?}: {err}")
})
.and_then(|obj| {
.and_then(|inner| {
let obj = Object { inner, revision: 0 };
let mut errors = Vec::new();
obj.validate(&mut errors);
if errors.is_empty() {

View File

@@ -17,8 +17,8 @@ use crate::{
use registry::{
pickle::{Pickle, PickledStream},
schema::{
prelude::{Object, ObjectType, Property},
structs::{DeletedItem, SpamTrainingSample, Task},
prelude::{Object, ObjectInner, ObjectType, Property},
structs::{DeletedItem, DmarcInternalReport, SpamTrainingSample, Task, TlsInternalReport},
},
types::{EnumImpl, ObjectImpl, id::ObjectId},
};
@@ -27,14 +27,12 @@ use types::id::Id;
pub struct RegistryObject<T: ObjectImpl> {
pub id: ObjectId,
pub object: T,
pub revision: u32,
pub revision: u64,
}
pub struct RegistryQuery {
pub object_type: ObjectType,
pub filters: Vec<RegistryFilter>,
pub account_id: Option<u32>,
pub tenant_id: Option<u32>,
}
pub struct RegistryFilter {
@@ -61,14 +59,21 @@ pub enum RegistryFilterValue {
}
impl Deserialize for Object {
fn deserialize(bytes: &[u8]) -> trc::Result<Self> {
let mut stream = PickledStream::new(bytes);
Object::unpickle(&mut stream).ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.ctx(trc::Key::Value, bytes)
})
fn deserialize_with_key(key: &[u8], bytes: &[u8]) -> trc::Result<Self> {
let revision = xxhash_rust::xxh3::xxh3_64(bytes);
ObjectType::from_id(key.deserialize_be_u16(0)?)
.and_then(|object_id| ObjectInner::unpickle(object_id, &mut PickledStream::new(bytes)))
.map(|inner| Object { revision, inner })
.ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.ctx(trc::Key::Value, bytes)
})
}
fn deserialize(_: &[u8]) -> trc::Result<Self> {
unreachable!("Object deserialization requires the object type from the key")
}
}
@@ -108,6 +113,30 @@ impl Deserialize for DeletedItem {
}
}
impl Deserialize for TlsInternalReport {
fn deserialize(bytes: &[u8]) -> trc::Result<Self> {
let mut stream = PickledStream::new(bytes);
TlsInternalReport::unpickle(&mut stream).ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.ctx(trc::Key::Value, bytes)
})
}
}
impl Deserialize for DmarcInternalReport {
fn deserialize(bytes: &[u8]) -> trc::Result<Self> {
let mut stream = PickledStream::new(bytes);
DmarcInternalReport::unpickle(&mut stream).ok_or_else(|| {
trc::EventType::Registry(trc::RegistryEvent::DeserializationError)
.into_err()
.caused_by(trc::location!())
.ctx(trc::Key::Value, bytes)
})
}
}
impl SerializeInfallible for ObjectId {
fn serialize(&self) -> Vec<u8> {
KeySerializer::new(U16_LEN + U64_LEN)

View File

@@ -5,7 +5,7 @@
*/
use crate::{
IterateParams, RegistryStore, SUBSPACE_REGISTRY, Store, U16_LEN, U64_LEN, ValueKey,
IterateParams, RegistryStore, SUBSPACE_REGISTRY_IDX, Store, U16_LEN, U64_LEN, ValueKey,
registry::{RegistryFilter, RegistryFilterOp, RegistryFilterValue, RegistryQuery},
write::{
AnyClass, RegistryClass, ValueClass,
@@ -49,41 +49,15 @@ impl RegistryStore {
}
}
return Ok(results);
}
let mut results = if (flags & OBJ_FILTER_ACCOUNT != 0)
&& let Some(account_id) = query.account_id
{
range_to_set::<T>(
&self.0.store,
query.object_type,
Property::AccountId.to_id(),
&account_id.to_be_bytes(),
RegistryFilterOp::Equal,
)
.await?
} else if (flags & OBJ_FILTER_TENANT != 0)
&& let Some(tenant_id) = query.tenant_id
{
range_to_set::<T>(
&self.0.store,
query.object_type,
Property::MemberTenantId.to_id(),
&tenant_id.to_be_bytes(),
RegistryFilterOp::Equal,
)
.await?
} else {
all_ids::<T>(&self.0.store, query.object_type).await?
};
if !results.has_items() || query.filters.is_empty() {
return Ok(results);
} else if query.filters.is_empty() {
return all_ids::<T>(&self.0.store, query.object_type).await;
}
let mut u64_buffer;
let mut u16_buffer;
let mut bool_buffer = [0u8; 1];
let mut results = T::default();
for filter in query.filters {
if filter.op == RegistryFilterOp::TextMatch {
if let RegistryFilterValue::String(text) = filter.value {
@@ -120,7 +94,11 @@ impl RegistryStore {
}
}
results.intersect(&matches);
if !results.has_items() {
results = matches;
} else {
results.intersect(&matches);
}
} else {
return Err(trc::EventType::Registry(trc::RegistryEvent::NotSupported)
.into_err()
@@ -150,7 +128,11 @@ impl RegistryStore {
)
.await?;
results.intersect(&result);
if !results.has_items() {
results = result;
} else {
results.intersect(&result);
}
}
if !results.has_items() {
@@ -201,23 +183,40 @@ impl RegistryQuery {
Self {
object_type,
filters: Vec::new(),
account_id: None,
tenant_id: None,
}
}
pub fn with_account(mut self, account_id: u32) -> Self {
self.account_id = Some(account_id);
if self.object_type.flags() & OBJ_FILTER_ACCOUNT != 0 {
let filter = RegistryFilter::equal(Property::AccountId, account_id);
if self.filters.is_empty() {
self.filters.push(filter);
} else {
self.filters.insert(0, filter);
}
}
self
}
pub fn with_account_opt(mut self, account_id: Option<u32>) -> Self {
self.account_id = account_id;
self
pub fn with_account_opt(self, account_id: Option<u32>) -> Self {
if let Some(account_id) = account_id {
self.with_account(account_id)
} else {
self
}
}
pub fn with_tenant(mut self, tenant_id: Option<u32>) -> Self {
self.tenant_id = tenant_id;
if let Some(tenant_id) = tenant_id
&& self.object_type.flags() & OBJ_FILTER_TENANT != 0
{
let filter = RegistryFilter::equal(Property::MemberTenantId, tenant_id);
if self.filters.is_empty() {
self.filters.push(filter);
} else {
self.filters.insert(0, filter);
}
}
self
}
@@ -387,7 +386,9 @@ async fn all_ids<T: RegistryQueryResults>(store: &Store, object: ObjectType) ->
.no_values()
.ascending(),
|key, _| {
bm.push(key.deserialize_be_u64(key.len() - U64_LEN)?);
if key.len() == U64_LEN + U16_LEN {
bm.push(key.deserialize_be_u64(key.len() - U64_LEN)?);
}
Ok(true)
},
@@ -426,9 +427,8 @@ async fn range_to_set<T: RegistryQueryResults>(
};
let begin = ValueKey::from(ValueClass::Any(AnyClass {
subspace: SUBSPACE_REGISTRY,
key: KeySerializer::new((U16_LEN * 2) + U64_LEN + 1 + from_value.len())
.write(2u8)
subspace: SUBSPACE_REGISTRY_IDX,
key: KeySerializer::new((U16_LEN * 2) + U64_LEN + from_value.len())
.write(object_id)
.write(from_index_id)
.write(from_value)
@@ -436,9 +436,8 @@ async fn range_to_set<T: RegistryQueryResults>(
.finalize(),
}));
let end = ValueKey::from(ValueClass::Any(AnyClass {
subspace: SUBSPACE_REGISTRY,
key: KeySerializer::new((U16_LEN * 2) + U64_LEN + 1 + end_value.len())
.write(2u8)
subspace: SUBSPACE_REGISTRY_IDX,
key: KeySerializer::new((U16_LEN * 2) + U64_LEN + end_value.len())
.write(object_id)
.write(end_index_id)
.write(end_value)
@@ -447,8 +446,7 @@ async fn range_to_set<T: RegistryQueryResults>(
}));
let mut bm = T::default();
let prefix = KeySerializer::new((U16_LEN * 2) + 1)
.write(2u8)
let prefix = KeySerializer::new(U16_LEN * 2)
.write(object_id)
.write(index_id)
.finalize();

View File

@@ -5,8 +5,8 @@
*/
use crate::{
IterateParams, RegistryStore, SUBSPACE_REGISTRY, SerializeInfallible, U16_LEN, U64_LEN,
ValueKey,
IterateParams, RegistryStore, SUBSPACE_REGISTRY_IDX_GLOBAL, SerializeInfallible, U16_LEN,
U64_LEN, ValueKey,
write::{
AnyClass, BatchBuilder, RegistryClass, ValueClass,
assert::AssertValue,
@@ -147,7 +147,7 @@ impl RegistryStore {
item_id = id.id();
batch.assert_value(
ValueClass::Registry(RegistryClass::Item { object_id, item_id }),
AssertValue::U32(old_object.revision),
AssertValue::Hash(old_object.revision),
);
}
RegistryWriteOp::Delete { object_id, object } => {
@@ -317,7 +317,7 @@ impl RegistryStore {
// It's pickle time!
let mut out = Vec::with_capacity(256);
object.pickle(&mut out);
object.inner.pickle(&mut out);
// Build batch
if write_id {
@@ -379,23 +379,23 @@ impl RegistryStore {
// Validate relationships
let mut linked = Vec::new();
let key = KeySerializer::new(U64_LEN + U16_LEN + 1)
.write(1u8)
.write(0u8)
.write(object_type_id)
.write(item_id)
.finalize();
let prefix_len = key.len();
let from_key = ValueKey::from(ValueClass::Any(AnyClass {
subspace: SUBSPACE_REGISTRY,
subspace: SUBSPACE_REGISTRY_IDX_GLOBAL,
key,
}));
let key = KeySerializer::new((U64_LEN * 2) + U16_LEN + 1)
.write(1u8)
.write(0u8)
.write(object_type_id)
.write(item_id)
.write(u64::MAX)
.finalize();
let to_key = ValueKey::from(ValueClass::Any(AnyClass {
subspace: SUBSPACE_REGISTRY,
subspace: SUBSPACE_REGISTRY_IDX_GLOBAL,
key,
}));
self.0
@@ -444,7 +444,7 @@ impl RegistryStore {
object_id: object_type_id,
item_id,
}),
AssertValue::U32(object.revision),
AssertValue::Hash(object.revision),
)
.clear(ValueClass::Registry(RegistryClass::Item {
object_id: object_type_id,
@@ -546,13 +546,13 @@ impl RegistryClass {
}
impl BatchBuilder {
fn registry_index<'x>(
pub fn registry_index<'x>(
&mut self,
object_id: u16,
item_id: u64,
index_keys: impl Iterator<Item = &'x IndexKey<'x>>,
is_set: bool,
) {
) -> &mut Self {
for key in index_keys {
if is_set {
self.set(
@@ -565,6 +565,7 @@ impl BatchBuilder {
)));
}
}
self
}
}

View File

@@ -11,6 +11,7 @@ use crate::{U32_LEN, U64_LEN};
pub enum AssertValue {
U32(u32),
U64(u64),
Hash(u64),
Archive(ArchiveVersion),
Some,
None,
@@ -66,6 +67,7 @@ impl AssertValue {
AssertValue::U64(v) => bytes
.get(bytes.len() - U64_LEN..)
.is_some_and(|b| b == v.to_be_bytes()),
AssertValue::Hash(v) => xxhash_rust::xxh3::xxh3_64(bytes) == *v,
AssertValue::Archive(v) => match v {
ArchiveVersion::Versioned { hash, .. } => bytes
.get(bytes.len() - U32_LEN - U64_LEN - 1..bytes.len() - U64_LEN - 1)

View File

@@ -5,17 +5,16 @@
*/
use super::{
AnyKey, BlobOp, InMemoryClass, QueueClass, ReportClass, ReportEvent, TaskQueueClass,
TelemetryClass, ValueClass,
AnyKey, BlobOp, InMemoryClass, QueueClass, TaskQueueClass, TelemetryClass, ValueClass,
};
use crate::{
Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, SUBSPACE_ACL, SUBSPACE_BLOB_LINK,
SUBSPACE_COUNTER, SUBSPACE_DELETED_ITEMS, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE,
SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT,
SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REGISTRY, SUBSPACE_REGISTRY_DIRECTORY,
SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT, SUBSPACE_SEARCH_INDEX, SUBSPACE_SPAM_SAMPLES,
SUBSPACE_TASK_QUEUE, SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_SPAN, U16_LEN, U32_LEN,
U64_LEN, ValueKey, WITH_SUBSPACE,
IndexKey, IndexKeyPrefix, Key, LogKey, SUBSPACE_ACL, SUBSPACE_BLOB_LINK, SUBSPACE_COUNTER,
SUBSPACE_DELETED_ITEMS, SUBSPACE_DIRECTORY, SUBSPACE_IN_MEMORY_COUNTER,
SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_PROPERTY,
SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REGISTRY,
SUBSPACE_REGISTRY_IDX, SUBSPACE_REGISTRY_IDX_GLOBAL, SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT,
SUBSPACE_SEARCH_INDEX, SUBSPACE_SPAM_SAMPLES, SUBSPACE_TASK_QUEUE, SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_TELEMETRY_SPAN, U16_LEN, U32_LEN, U64_LEN, ValueKey, WITH_SUBSPACE,
write::{
BlobLink, IndexPropertyClass, RegistryClass, SearchIndex, SearchIndexId, SearchIndexType,
},
@@ -110,52 +109,34 @@ impl KeySerialize for u64 {
impl DeserializeBigEndian for &[u8] {
fn deserialize_be_u16(&self, index: usize) -> trc::Result<u16> {
self.get(index..index + U16_LEN)
.and_then(|bytes| bytes.try_into().ok())
.ok_or_else(|| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
.and_then(|bytes| {
bytes.try_into().map_err(|_| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
})
.map(u16::from_be_bytes)
}
fn deserialize_be_u32(&self, index: usize) -> trc::Result<u32> {
self.get(index..index + U32_LEN)
.and_then(|bytes| bytes.try_into().ok())
.ok_or_else(|| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
.and_then(|bytes| {
bytes.try_into().map_err(|_| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
})
.map(u32::from_be_bytes)
}
fn deserialize_be_u64(&self, index: usize) -> trc::Result<u64> {
self.get(index..index + U64_LEN)
.and_then(|bytes| bytes.try_into().ok())
.ok_or_else(|| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
.and_then(|bytes| {
bytes.try_into().map_err(|_| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, *self)
})
})
.map(u64::from_be_bytes)
}
}
@@ -320,46 +301,44 @@ impl ValueClass {
InMemoryClass::Counter(key) => serializer.write(key.as_slice()),
},
ValueClass::Registry(registry) => match registry {
RegistryClass::Item { object_id, item_id } => serializer
.write(0u8)
.write(*object_id)
.write_leb128(*item_id),
RegistryClass::Reference {
to_object_id,
to_item_id,
from_object_id,
from_item_id,
} => serializer
.write(1u8)
.write(*to_object_id)
.write_leb128(*to_item_id)
.write(*from_object_id)
.write_leb128(*from_item_id),
RegistryClass::Item { object_id, item_id } => {
serializer.write(*object_id).write_leb128(*item_id)
}
RegistryClass::Id { object_id, item_id } => {
serializer.write(*object_id).write(*item_id)
}
RegistryClass::Index {
index_id,
object_id,
item_id,
key,
} => serializer
.write(2u8)
.write(*object_id)
.write(*index_id)
.write(key.as_slice())
.write(*item_id),
RegistryClass::Reference {
to_object_id,
to_item_id,
from_object_id,
from_item_id,
} => serializer
.write(0u8)
.write(*to_object_id)
.write(*to_item_id)
.write(*from_object_id)
.write_leb128(*from_item_id),
RegistryClass::IndexGlobal {
index_id,
object_id,
item_id,
key,
} => serializer
.write(3u8)
.write(1u8)
.write(*index_id)
.write(key.as_slice())
.write(*object_id)
.write(*item_id),
RegistryClass::Id { object_id, item_id } => {
serializer.write(4u8).write(*object_id).write(*item_id)
}
RegistryClass::IdCounter { object_id } => serializer.write(*object_id),
},
ValueClass::Queue(queue) => match queue {
@@ -368,46 +347,9 @@ impl ValueClass {
.write(event.due)
.write(event.queue_id)
.write(event.queue_name.as_slice()),
QueueClass::DmarcReportHeader(event) => serializer
.write(0u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id)
.write(0u8),
QueueClass::TlsReportHeader(event) => serializer
.write(0u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id)
.write(1u8),
QueueClass::DmarcReportEvent(event) => serializer
.write(1u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id),
QueueClass::TlsReportEvent(event) => serializer
.write(2u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id),
QueueClass::QuotaCount(key) => serializer.write(0u8).write(key.as_slice()),
QueueClass::QuotaSize(key) => serializer.write(1u8).write(key.as_slice()),
},
ValueClass::Report(report) => match report {
ReportClass::Tls { id, expires } => {
serializer.write(0u8).write(*expires).write(*id)
}
ReportClass::Dmarc { id, expires } => {
serializer.write(1u8).write(*expires).write(*id)
}
ReportClass::Arf { id, expires } => {
serializer.write(2u8).write(*expires).write(*id)
}
},
ValueClass::Telemetry(telemetry) => match telemetry {
TelemetryClass::Span { span_id } => serializer.write(*span_id),
TelemetryClass::Metric {
@@ -544,6 +486,13 @@ const REG_OAUTH_CLIENT: u16 = ObjectType::OAuthClient as u16;
const REG_MAILING_LIST: u16 = ObjectType::MailingList as u16;
const REG_MASKED_EMAIL: u16 = ObjectType::MaskedEmail as u16;
const REG_PUBLIC_KEY: u16 = ObjectType::PublicKey as u16;
const REG_TRACE: u16 = ObjectType::Trace as u16;
const REG_METRIC: u16 = ObjectType::Metric as u16;
const REPORT_EXTERNAL_ARF: u16 = ObjectType::ArfExternalReport as u16;
const REPORT_EXTERNAL_DMARC: u16 = ObjectType::DmarcExternalReport as u16;
const REPORT_EXTERNAL_TLS: u16 = ObjectType::TlsExternalReport as u16;
const REPORT_INTERNAL_DMARC: u16 = ObjectType::DmarcInternalReport as u16;
const REPORT_INTERNAL_TLS: u16 = ObjectType::TlsInternalReport as u16;
impl ValueClass {
pub fn serialized_size(&self) -> usize {
@@ -556,11 +505,10 @@ impl ValueClass {
ValueClass::Acl(_) => U32_LEN * 3 + 2,
ValueClass::InMemory(InMemoryClass::Counter(v) | InMemoryClass::Key(v)) => v.len(),
ValueClass::Registry(registry) => match registry {
RegistryClass::Item { .. } => U16_LEN + U64_LEN + 2,
RegistryClass::Item { .. } => U16_LEN + U64_LEN + 1,
RegistryClass::Reference { .. } => ((U16_LEN + U64_LEN) * 2) + 2,
RegistryClass::Index { key, .. } | RegistryClass::IndexGlobal { key, .. } => {
(U16_LEN * 2) + U64_LEN + key.len() + 2
}
RegistryClass::Index { key, .. } => (U16_LEN * 2) + U64_LEN + key.len() + 1,
RegistryClass::IndexGlobal { key, .. } => (U16_LEN * 2) + U64_LEN + key.len() + 2,
RegistryClass::Id { .. } => U16_LEN + U64_LEN + 1,
RegistryClass::IdCounter { .. } => U16_LEN + 1,
},
@@ -582,15 +530,8 @@ impl ValueClass {
ValueClass::Queue(q) => match q {
QueueClass::Message(_) => U64_LEN,
QueueClass::MessageEvent(_) => U64_LEN * 3,
QueueClass::DmarcReportEvent(event) | QueueClass::TlsReportEvent(event) => {
event.domain.len() + U64_LEN * 3
}
QueueClass::DmarcReportHeader(event) | QueueClass::TlsReportHeader(event) => {
event.domain.len() + (U64_LEN * 3) + 1
}
QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(),
},
ValueClass::Report(_) => U64_LEN * 2 + 1,
ValueClass::Telemetry(telemetry) => match telemetry {
TelemetryClass::Span { .. } => U64_LEN + 1,
TelemetryClass::Metric { .. } => U64_LEN * 2 + 1,
@@ -626,22 +567,23 @@ impl ValueClass {
BlobOp::Commit { .. } | BlobOp::Link { .. } => SUBSPACE_BLOB_LINK,
},
ValueClass::Registry(registry) => match registry {
RegistryClass::Item { object_id, .. }
| RegistryClass::Id { object_id, .. }
| RegistryClass::Index { object_id, .. }
| RegistryClass::Reference {
to_object_id: object_id,
..
} => match *object_id {
RegistryClass::Item { object_id, .. } => match *object_id {
REG_ACCOUNT | REG_DOMAIN | REG_TENANT | REG_ROLE | REG_OAUTH_CLIENT
| REG_MAILING_LIST | REG_MASKED_EMAIL | REG_PUBLIC_KEY => {
SUBSPACE_REGISTRY_DIRECTORY
}
| REG_MAILING_LIST | REG_MASKED_EMAIL | REG_PUBLIC_KEY => SUBSPACE_DIRECTORY,
REG_DELETED_ITEM => SUBSPACE_DELETED_ITEMS,
REG_SPAM_SAMPLE => SUBSPACE_SPAM_SAMPLES,
REG_TRACE => SUBSPACE_TELEMETRY_SPAN,
REG_METRIC => SUBSPACE_TELEMETRY_METRIC,
REPORT_EXTERNAL_ARF | REPORT_EXTERNAL_DMARC | REPORT_EXTERNAL_TLS => {
SUBSPACE_REPORT_IN
}
REPORT_INTERNAL_DMARC | REPORT_INTERNAL_TLS => SUBSPACE_REPORT_OUT,
_ => SUBSPACE_REGISTRY,
},
RegistryClass::IndexGlobal { .. } => SUBSPACE_REGISTRY,
RegistryClass::Id { .. } | RegistryClass::Index { .. } => SUBSPACE_REGISTRY_IDX,
RegistryClass::Reference { .. } | RegistryClass::IndexGlobal { .. } => {
SUBSPACE_REGISTRY_IDX_GLOBAL
}
RegistryClass::IdCounter { .. } => SUBSPACE_COUNTER,
},
ValueClass::InMemory(lookup) => match lookup {
@@ -651,13 +593,8 @@ impl ValueClass {
ValueClass::Queue(queue) => match queue {
QueueClass::Message(_) => SUBSPACE_QUEUE_MESSAGE,
QueueClass::MessageEvent(_) => SUBSPACE_QUEUE_EVENT,
QueueClass::DmarcReportHeader(_)
| QueueClass::TlsReportHeader(_)
| QueueClass::DmarcReportEvent(_)
| QueueClass::TlsReportEvent(_) => SUBSPACE_REPORT_OUT,
QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA,
},
ValueClass::Report(_) => SUBSPACE_REPORT_IN,
ValueClass::Telemetry(telemetry) => match telemetry {
TelemetryClass::Span { .. } => SUBSPACE_TELEMETRY_SPAN,
TelemetryClass::Metric { .. } => SUBSPACE_TELEMETRY_METRIC,
@@ -707,25 +644,6 @@ impl From<BlobOp> for ValueClass {
}
}
impl Deserialize for ReportEvent {
fn deserialize(key: &[u8]) -> trc::Result<Self> {
Ok(ReportEvent {
due: key.deserialize_be_u64(1)?,
policy_hash: key.deserialize_be_u64(key.len() - (U64_LEN * 2 + 1))?,
seq_id: key.deserialize_be_u64(key.len() - (U64_LEN + 1))?,
domain: key
.get(U64_LEN + 1..key.len() - (U64_LEN * 2 + 1))
.and_then(|domain| std::str::from_utf8(domain).ok())
.map(|s| s.to_string())
.ok_or_else(|| {
trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Key, key)
})?,
})
}
}
impl SearchIndex {
pub fn to_u8(&self) -> u8 {
match self {

View File

@@ -172,7 +172,6 @@ pub enum ValueClass {
Blob(BlobOp),
Registry(RegistryClass),
Queue(QueueClass),
Report(ReportClass),
Telemetry(TelemetryClass),
SearchIndex(SearchIndexClass),
Any(AnyClass),
@@ -285,21 +284,10 @@ pub enum RegistryClass {
pub enum QueueClass {
Message(u64),
MessageEvent(QueueEvent),
DmarcReportHeader(ReportEvent),
DmarcReportEvent(ReportEvent),
TlsReportHeader(ReportEvent),
TlsReportEvent(ReportEvent),
QuotaCount(Vec<u8>),
QuotaSize(Vec<u8>),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum ReportClass {
Tls { id: u64, expires: u64 },
Dmarc { id: u64, expires: u64 },
Arf { id: u64, expires: u64 },
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum TelemetryClass {
Span {
@@ -319,14 +307,6 @@ pub struct QueueEvent {
pub queue_name: [u8; 8],
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub struct ReportEvent {
pub due: u64,
pub policy_hash: u64,
pub seq_id: u64,
pub domain: String,
}
#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub enum ValueOp {
Set(Vec<u8>),
@@ -486,16 +466,6 @@ impl AssignedIds {
}
}
impl QueueClass {
pub fn due(&self) -> Option<u64> {
match self {
QueueClass::DmarcReportHeader(report_event) => report_event.due.into(),
QueueClass::TlsReportHeader(report_event) => report_event.due.into(),
_ => None,
}
}
}
impl<T: AsRef<[u8]>> AsRef<[u8]> for Archive<T> {
fn as_ref(&self) -> &[u8] {
self.inner.as_ref()

View File

@@ -182,6 +182,7 @@ impl_unsigned_leb128!(u64, [0, 7, 14, 21, 28, 35, 42, 49, 56, 63]);
impl_unsigned_leb128!(usize, [0, 7, 14, 21, 28, 35, 42, 49, 56, 63]);
impl Leb128Writer for Vec<u8> {
#[inline(always)]
fn write_leb128<T: Leb128_>(&mut self, value: T) -> std::io::Result<usize> {
T::to_leb128_writer(value, self)
}

View File

@@ -37,6 +37,9 @@ pub async fn store_destroy(store: &Store) {
SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_SEARCH_INDEX,
SUBSPACE_REGISTRY_IDX,
SUBSPACE_REGISTRY_IDX_GLOBAL,
SUBSPACE_DIRECTORY,
] {
if subspace == SUBSPACE_SEARCH_INDEX && store.is_pg_or_mysql() {
continue;
@@ -247,6 +250,9 @@ pub async fn store_assert_is_empty(store: &Store, blob_store: BlobStore, include
(SUBSPACE_TELEMETRY_SPAN, true),
(SUBSPACE_TELEMETRY_METRIC, true),
(SUBSPACE_SEARCH_INDEX, true),
(SUBSPACE_REGISTRY_IDX, false),
(SUBSPACE_REGISTRY_IDX_GLOBAL, false),
(SUBSPACE_DIRECTORY, true),
] {
if (subspace == SUBSPACE_SEARCH_INDEX && store.is_pg_or_mysql())
//|| (subspace == directory && !include_directory)