Fix tracing indexing when using separate stores

This commit is contained in:
mdecimus
2026-02-05 09:21:34 +01:00
parent f1974af7d6
commit 1b3ac0a6eb
3 changed files with 506 additions and 473 deletions

858
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -216,8 +216,8 @@ pub struct LegacyMessageData {
}
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)]
pub struct LegacyMessageMetadata {
pub contents: Vec<LegacyMessageMetadataContents>,
pub struct LegacyMessageMetadata<'x> {
pub contents: Vec<LegacyMessageMetadataContents<'x>>,
pub blob_hash: BlobHash,
pub size: u32,
pub received_at: u64,
@@ -226,8 +226,8 @@ pub struct LegacyMessageMetadata {
pub raw_headers: Vec<u8>,
}
impl From<LegacyMessageMetadata> for MessageMetadata {
fn from(legacy: LegacyMessageMetadata) -> Self {
impl<'x> From<LegacyMessageMetadata<'x>> for MessageMetadata {
fn from(legacy: LegacyMessageMetadata<'x>) -> Self {
MessageMetadata {
blob_body_offset: legacy
.contents
@@ -251,14 +251,14 @@ impl From<LegacyMessageMetadata> for MessageMetadata {
}
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)]
pub struct LegacyMessageMetadataContents {
pub struct LegacyMessageMetadataContents<'x> {
pub html_body: Vec<u16>,
pub text_body: Vec<u16>,
pub attachments: Vec<u16>,
pub parts: Vec<LegacyMessageMetadataPart>,
pub parts: Vec<LegacyMessageMetadataPart<'x>>,
}
impl From<LegacyMessageMetadataContents> for MessageMetadataContents {
impl<'x> From<LegacyMessageMetadataContents<'x>> for MessageMetadataContents {
fn from(contents: LegacyMessageMetadataContents) -> Self {
MessageMetadataContents {
html_body: contents.html_body.into_boxed_slice(),
@@ -270,8 +270,8 @@ impl From<LegacyMessageMetadataContents> for MessageMetadataContents {
}
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)]
pub struct LegacyMessageMetadataPart {
pub headers: Vec<Header<'static>>,
pub struct LegacyMessageMetadataPart<'x> {
pub headers: Vec<Header<'x>>,
pub is_encoding_problem: bool,
pub body: LegacyMetadataPartType,
pub encoding: Encoding,
@@ -281,8 +281,8 @@ pub struct LegacyMessageMetadataPart {
pub offset_end: u32,
}
impl From<LegacyMessageMetadataPart> for MessageMetadataPart {
fn from(part: LegacyMessageMetadataPart) -> Self {
impl<'x> From<LegacyMessageMetadataPart<'x>> for MessageMetadataPart {
fn from(part: LegacyMessageMetadataPart<'x>) -> Self {
let flags = match part.encoding {
Encoding::None => 0,
Encoding::QuotedPrintable => PART_ENCODING_QP,

View File

@@ -380,48 +380,63 @@ impl ReindexIndexTask for Server {
}
}
SearchIndex::Tracing => {
let mut spans = Vec::new();
self.store()
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span {
span_id: 0,
})),
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span {
span_id: u64::MAX,
})),
)
.no_values(),
|key, _| {
spans.push(key.deserialize_be_u64(0)?);
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
// SPDX-SnippetBegin
// SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
// SPDX-License-Identifier: LicenseRef-SEL
let mut batch = BatchBuilder::new();
for span_id in spans {
batch
.with_account_id((span_id >> 32) as u32) // TODO: This is hacky, improve
.with_document(span_id as u32)
.set(
ValueClass::TaskQueue(TaskQueueClass::UpdateIndex {
due: TaskEpoch::now(),
index: SearchIndex::Tracing,
is_insert: true,
}),
vec![],
);
if batch.len() >= 2000 {
#[cfg(feature = "enterprise")]
if let Some(store) = self
.core
.enterprise
.as_ref()
.and_then(|e| e.trace_store.as_ref())
{
let mut spans = Vec::new();
store
.store
.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span {
span_id: 0,
})),
ValueKey::from(ValueClass::Telemetry(TelemetryClass::Span {
span_id: u64::MAX,
})),
)
.no_values(),
|key, _| {
spans.push(key.deserialize_be_u64(0)?);
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
let mut batch = BatchBuilder::new();
for span_id in spans {
batch
.with_account_id((span_id >> 32) as u32) // TODO: This is hacky, improve
.with_document(span_id as u32)
.set(
ValueClass::TaskQueue(TaskQueueClass::UpdateIndex {
due: TaskEpoch::now(),
index: SearchIndex::Tracing,
is_insert: true,
}),
vec![],
);
if batch.len() >= 2000 {
self.core.storage.data.write(batch.build_all()).await?;
batch = BatchBuilder::new();
}
}
if !batch.is_empty() {
self.core.storage.data.write(batch.build_all()).await?;
batch = BatchBuilder::new();
}
}
if !batch.is_empty() {
self.core.storage.data.write(batch.build_all()).await?;
}
// SPDX-SnippetEnd
}
SearchIndex::File | SearchIndex::InMemory => (),
}
@@ -561,9 +576,17 @@ async fn build_tracing_span_document(
let Some(index_fields) = server.core.jmap.index_fields.get(&SearchIndex::Tracing) else {
return Ok(None);
};
let Some(store) = server
.core
.enterprise
.as_ref()
.and_then(|e| e.trace_store.as_ref())
else {
return Ok(None);
};
let span_id = ((account_id as u64) << 32) | document_id as u64;
let span = server.store().get_span(span_id).await?;
let span = store.store.get_span(span_id).await?;
if !span.is_empty() {
Ok(Some(build_span_document(span_id, span, index_fields)))