more cleanups/fixes

This commit is contained in:
Scott Cutler
2026-04-24 15:41:18 -07:00
parent 892b2e388d
commit b34adf4805
4 changed files with 110 additions and 109 deletions

3
.gitattributes vendored
View File

@@ -1,6 +1,3 @@
# Force LF line endings everywhere (prevent Windows CRLF conversion).
* text=auto eol=lf
# Treat the generated single-file WebUI build as binary for diff purposes.
# Git's pack-file delta compression still works (byte-level), but this prevents
# git diff from printing the entire minified file on every change.

View File

@@ -233,6 +233,10 @@ static __global__ void ggml_cuda_ar_kernel(
// in-flight depth (single digits in practice) while keeping init cost low.
static constexpr int GGML_CUDA_AR_POOL_SIZE = 128;
// Maximum chunk size (bytes per GPU) handled by one internal kernel launch.
// Larger tensors are reduced by issuing multiple chunked launches.
static constexpr size_t GGML_CUDA_AR_MAX_BYTES = 256 * 1024; // 256 KB
// Byte spacing between adjacent arrival ints. 128 bytes (two cache lines)
// ensures the arrival slots for the two GPUs never share a cache line,
// preventing false-sharing stalls on the polling GPU.
@@ -279,6 +283,41 @@ static int * ggml_cuda_ar_arrival_ptr(const ggml_cuda_ar_pipeline * p, int slot,
return reinterpret_cast<int *>(p->arrival + offset);
}
static int ggml_cuda_ar_acquire_slot(ggml_cuda_ar_pipeline * p) {
const int slot = static_cast<int>(p->call_count % GGML_CUDA_AR_POOL_SIZE);
const bool pool_lapped = p->call_count >= GGML_CUDA_AR_POOL_SIZE;
p->call_count++;
if (pool_lapped) {
for (int i = 0; i < p->n_devices; ++i) {
ggml_cuda_set_device(p->devices[i]);
CUDA_CHECK(cudaEventSynchronize(p->ev_pool[i][slot].ker));
}
}
for (int i = 0; i < p->n_devices; ++i) {
*ggml_cuda_ar_arrival_ptr(p, slot, i) = 0;
}
return slot;
}
static void ggml_cuda_ar_wait_for_compute(
ggml_cuda_ar_pipeline * p, ggml_backend_cuda_context * cuda_ctx, int rank, int slot) {
ggml_cuda_ar_event_slot & ev = p->ev_pool[rank][slot];
CUDA_CHECK(cudaEventRecord(ev.app, cuda_ctx->stream()));
CUDA_CHECK(cudaStreamWaitEvent(p->streams[rank], ev.app));
}
static void ggml_cuda_ar_record_chunk_done(
ggml_cuda_ar_pipeline * p, ggml_backend_cuda_context * cuda_ctx, int rank, int slot, bool last_chunk) {
ggml_cuda_ar_event_slot & ev = p->ev_pool[rank][slot];
CUDA_CHECK(cudaEventRecord(ev.ker, p->streams[rank]));
if (last_chunk) {
CUDA_CHECK(cudaStreamWaitEvent(cuda_ctx->stream(), ev.ker));
}
}
// ---------------------------------------------------------------------------
// Background watchdog thread — monitors per-GPU debug ring buffers for new
// bailout records. The kernel writes a record when it hits the spin limit;
@@ -315,14 +354,51 @@ static void ggml_cuda_ar_wdog_thread(ggml_cuda_ar_pipeline * p) {
std::this_thread::sleep_for(std::chrono::milliseconds(GGML_CUDA_AR_WDOG_POLL_MS));
}
}
static bool ggml_cuda_ar_wdog_init(ggml_cuda_ar_pipeline * p) {
for (int i = 0; i < p->n_devices; ++i) {
if (cudaHostAlloc(reinterpret_cast<void **>(&p->debug_ring[i]),
sizeof(ggml_cuda_ar_debug_ring),
cudaHostAllocPortable) != cudaSuccess) {
GGML_LOG_ERROR("%s: cudaHostAlloc for debug ring failed on device %d\n",
__func__, p->devices[i]);
return false;
}
memset(p->debug_ring[i], 0, sizeof(ggml_cuda_ar_debug_ring));
}
const char * spin_env = getenv("GGML_CUDA_AR_MAX_SPIN");
p->wdog_max_spin = (spin_env && spin_env[0]) ? atoi(spin_env) : 0;
GGML_LOG_INFO("%s: AR watchdog enabled — max_spin=%d "
"(set GGML_CUDA_AR_MAX_SPIN=<n> to adjust)\n",
__func__, p->wdog_max_spin);
p->wdog_stop.store(false);
p->wdog_thr = std::thread(ggml_cuda_ar_wdog_thread, p);
return true;
}
static void ggml_cuda_ar_wdog_stop(ggml_cuda_ar_pipeline * p) {
p->wdog_stop.store(true);
if (p->wdog_thr.joinable()) {
p->wdog_thr.join();
}
}
static void ggml_cuda_ar_wdog_free(ggml_cuda_ar_pipeline * p) {
for (int i = 0; i < p->n_devices; ++i) {
if (p->debug_ring[i]) {
cudaFreeHost(p->debug_ring[i]);
}
}
}
#endif // GGML_CUDA_AR_WATCHDOG
// ---------------------------------------------------------------------------
// Init / free
// ---------------------------------------------------------------------------
ggml_cuda_ar_pipeline * ggml_cuda_ar_pipeline_init(
const int * devices, size_t n_devices, size_t max_bytes) {
ggml_cuda_ar_pipeline * ggml_cuda_ar_pipeline_init(const int * devices, size_t n_devices) {
if ((n_devices != 2) || (n_devices > GGML_CUDA_MAX_DEVICES)) {
return nullptr;
@@ -386,49 +462,28 @@ ggml_cuda_ar_pipeline * ggml_cuda_ar_pipeline_init(
memset(p->arrival, 0, arrival_bytes);
// Per-device pinned staging buffers.
p->buf_bytes = max_bytes;
p->buf_bytes = GGML_CUDA_AR_MAX_BYTES;
for (int i = 0; i < n_devices; ++i) {
if (cudaHostAlloc(&p->host_buf[i], max_bytes, cudaHostAllocPortable) != cudaSuccess) {
if (cudaHostAlloc(&p->host_buf[i], p->buf_bytes, cudaHostAllocPortable) != cudaSuccess) {
GGML_LOG_ERROR("%s: cudaHostAlloc for staging failed (%zu bytes)\n",
__func__, max_bytes);
__func__, p->buf_bytes);
ggml_cuda_ar_pipeline_free(p);
return nullptr;
}
memset(p->host_buf[i], 0, max_bytes);
memset(p->host_buf[i], 0, p->buf_bytes);
}
#if GGML_CUDA_AR_WATCHDOG
// Per-GPU debug ring buffers: written by the kernel on spin-limit bailout,
// polled by the background watchdog thread. Each ring is pinned host
// memory accessed only by its owning GPU (single-GPU host atomics OK).
{
for (int i = 0; i < n_devices; ++i) {
if (cudaHostAlloc(reinterpret_cast<void **>(&p->debug_ring[i]),
sizeof(ggml_cuda_ar_debug_ring),
cudaHostAllocPortable) != cudaSuccess) {
GGML_LOG_ERROR("%s: cudaHostAlloc for debug ring failed on device %d\n",
__func__, p->devices[i]);
ggml_cuda_ar_pipeline_free(p);
return nullptr;
}
memset(p->debug_ring[i], 0, sizeof(ggml_cuda_ar_debug_ring));
}
const char * spin_env = getenv("GGML_CUDA_AR_MAX_SPIN");
p->wdog_max_spin = (spin_env && spin_env[0]) ? atoi(spin_env) : 0;
GGML_LOG_INFO("%s: AR watchdog enabled — max_spin=%d "
"(set GGML_CUDA_AR_MAX_SPIN=<n> to adjust)\n",
__func__, p->wdog_max_spin);
p->wdog_stop.store(false);
p->wdog_thr = std::thread(ggml_cuda_ar_wdog_thread, p);
if (!ggml_cuda_ar_wdog_init(p)) {
ggml_cuda_ar_pipeline_free(p);
return nullptr;
}
#endif
GGML_LOG_INFO("%s: initialized AllReduce pipeline: %d GPUs, "
"%zu KB staging per GPU\n",
__func__, n_devices, max_bytes >> 10);
__func__, n_devices, p->buf_bytes >> 10);
return p;
}
@@ -440,10 +495,7 @@ void ggml_cuda_ar_pipeline_free(ggml_cuda_ar_pipeline * p) {
#if GGML_CUDA_AR_WATCHDOG
// Stop the watchdog thread first — it only reads pinned host memory,
// no GPU resources, so this is safe and returns within ~1ms.
p->wdog_stop.store(true);
if (p->wdog_thr.joinable()) {
p->wdog_thr.join();
}
ggml_cuda_ar_wdog_stop(p);
#endif
// Drain all in-flight kernels before tearing down resources.
@@ -475,11 +527,7 @@ void ggml_cuda_ar_pipeline_free(ggml_cuda_ar_pipeline * p) {
cudaFreeHost(p->arrival);
}
#if GGML_CUDA_AR_WATCHDOG
for (int i = 0; i < p->n_devices; ++i) {
if (p->debug_ring[i]) {
cudaFreeHost(p->debug_ring[i]);
}
}
ggml_cuda_ar_wdog_free(p);
#endif
delete p;
}
@@ -495,31 +543,18 @@ bool ggml_cuda_ar_allreduce(
GGML_ASSERT(p != nullptr);
const int n = p->n_devices;
// Only the 2-GPU path is implemented; fall back for larger communicators.
if (n != 2) {
return false;
}
GGML_ASSERT(n == 2);
const ggml_type type = tensors[0]->type;
const size_t type_size = ggml_type_size(type);
// Only float, half, and bfloat16 tensors are handled by the kernel.
if (type != GGML_TYPE_F32 && type != GGML_TYPE_F16 && type != GGML_TYPE_BF16) {
return false;
}
GGML_ASSERT(type == GGML_TYPE_F32 || type == GGML_TYPE_F16 || type == GGML_TYPE_BF16);
const int64_t ne = ggml_nelements(tensors[0]);
if (ne == 0) {
return true;
}
if (p->buf_bytes < type_size) {
return false;
}
GGML_ASSERT(ne > 0);
GGML_ASSERT(p->buf_bytes >= type_size);
const size_t max_chunk_elems = p->buf_bytes / type_size;
GGML_ASSERT(max_chunk_elems > 0);
// Insert chunked kernels into each GPU's existing compute stream via events:
// record(app, compute_stream) — capture "upstream done"
@@ -532,36 +567,17 @@ bool ggml_cuda_ar_allreduce(
const size_t chunk_elems = remaining_elems < max_chunk_elems ? remaining_elems : max_chunk_elems;
const size_t chunk_bytes = chunk_elems * type_size;
// Cycle through the event pool. On the second pass through the ring,
// synchronise on the slot's ker event before touching arrival ints —
// the event and arrival pools wrap in lock-step so this guarantees the
// kernels which last used this slot have finished.
const int slot = static_cast<int>(p->call_count % GGML_CUDA_AR_POOL_SIZE);
const bool pool_lapped = p->call_count >= GGML_CUDA_AR_POOL_SIZE;
p->call_count++;
if (pool_lapped) {
for (int i = 0; i < n; ++i) {
ggml_cuda_set_device(p->devices[i]);
CUDA_CHECK(cudaEventSynchronize(p->ev_pool[i][slot].ker));
}
}
// Reset the arrival ints for this slot before any kernel can read them.
for (int i = 0; i < n; ++i) {
*ggml_cuda_ar_arrival_ptr(p, slot, i) = 0;
}
const int slot = ggml_cuda_ar_acquire_slot(p);
const bool last_chunk = chunk_start + (int64_t) chunk_elems == ne;
for (int i = 0; i < n; ++i) {
const int peer = 1 - i; // valid for n == 2 only
ggml_cuda_set_device(p->devices[i]);
auto * cuda_ctx = static_cast<ggml_backend_cuda_context *>(backends[i]->context);
ggml_cuda_ar_event_slot & ev = p->ev_pool[i][slot];
const bool compute = (tensors[i]->flags & GGML_TENSOR_FLAG_COMPUTE) != 0;
if (chunk_start == 0) {
CUDA_CHECK(cudaEventRecord(ev.app, cuda_ctx->stream()));
CUDA_CHECK(cudaStreamWaitEvent(p->streams[i], ev.app));
ggml_cuda_ar_wait_for_compute(p, cuda_ctx, i, slot);
}
char * data = static_cast<char *>(tensors[i]->data) + chunk_start * (int64_t) type_size;
@@ -600,10 +616,7 @@ bool ggml_cuda_ar_allreduce(
#undef GGML_CUDA_AR_WDOG_EXTRA_ARGS
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaEventRecord(ev.ker, p->streams[i]));
if (chunk_start + (int64_t) chunk_elems == ne) {
CUDA_CHECK(cudaStreamWaitEvent(cuda_ctx->stream(), ev.ker));
}
ggml_cuda_ar_record_chunk_done(p, cuda_ctx, i, slot, last_chunk);
}
}

View File

@@ -5,21 +5,14 @@
#include <cstddef>
// Maximum chunk size (bytes per GPU) handled by one internal kernel launch.
// Larger tensors are reduced by issuing multiple chunked launches.
static constexpr size_t GGML_CUDA_AR_MAX_BYTES = 256 * 1024; // 256 KB
// Opaque pipeline context — owns all pinned buffers, streams, and events.
struct ggml_cuda_ar_pipeline;
// Allocate a pipeline for n_devices GPUs.
// devices[] holds the CUDA device IDs in rank order.
// max_bytes is the staging buffer size per device, which also bounds the
// per-launch chunk size; tensors larger than this are reduced via multiple
// chunked launches.
// Returns nullptr on allocation failure.
ggml_cuda_ar_pipeline * ggml_cuda_ar_pipeline_init(
const int * devices, size_t n_devices, size_t max_bytes);
const int * devices, size_t n_devices);
// Release all resources owned by the pipeline.
void ggml_cuda_ar_pipeline_free(ggml_cuda_ar_pipeline * pipeline);
@@ -27,9 +20,8 @@ void ggml_cuda_ar_pipeline_free(ggml_cuda_ar_pipeline * pipeline);
// Execute an in-place AllReduce (sum) across tensors[0..n_devices-1].
// tensors[i] must live on the device managed by backends[i] and be
// contiguous F32, F16, or BF16.
// Returns true on success. Returns false when the tensor type or size is
// outside the currently supported range; the caller should fall back to
// another provider (NCCL or the meta-backend CPU reduce).
// Preconditions are checked by the CUDA comm dispatcher before calling this.
// Returns true once the reduction work has been enqueued successfully.
bool ggml_cuda_ar_allreduce(
ggml_cuda_ar_pipeline * pipeline,
ggml_backend_t * backends,

View File

@@ -1223,8 +1223,7 @@ static void * ggml_backend_cuda_comm_init(ggml_backend_t * backends, size_t n_ba
// Try to allocate the internal pipeline unless the user forced NCCL.
if (!force_nccl) {
ret->ar_pipeline = ggml_cuda_ar_pipeline_init(
ret->dev_ids.data(), n_backends, GGML_CUDA_AR_MAX_BYTES);
ret->ar_pipeline = ggml_cuda_ar_pipeline_init(ret->dev_ids.data(), n_backends);
if (ret->ar_pipeline == nullptr) {
// Clear any sticky CUDA error from the failed init so it can't
// leak into a later NCCL call.
@@ -1336,7 +1335,7 @@ enum ggml_cuda_comm_allreduce_result {
static ggml_cuda_comm_allreduce_result ggml_backend_cuda_comm_try_allreduce_internal(
ggml_backend_cuda_comm_context * comm_ctx, struct ggml_tensor ** tensors) {
if (comm_ctx->ar_pipeline == nullptr) {
GGML_LOG_WARN("%s: internal unsupported: pipeline unavailable\n", __func__);
GGML_LOG_DEBUG("%s: internal unsupported: pipeline unavailable\n", __func__);
return GGML_CUDA_COMM_ALLREDUCE_UNSUPPORTED;
}
@@ -1348,12 +1347,12 @@ static ggml_cuda_comm_allreduce_result ggml_backend_cuda_comm_try_allreduce_inte
const ggml_type type = tensors[0]->type;
if (n_backends != 2) {
GGML_LOG_WARN("%s: internal unsupported: n_backends=%zu\n", __func__, n_backends);
GGML_LOG_DEBUG("%s: internal unsupported: n_backends=%zu\n", __func__, n_backends);
return GGML_CUDA_COMM_ALLREDUCE_UNSUPPORTED;
}
if (type != GGML_TYPE_F32 && type != GGML_TYPE_F16 && type != GGML_TYPE_BF16) {
GGML_LOG_WARN("%s: internal unsupported: type=%d\n", __func__, (int) type);
GGML_LOG_DEBUG("%s: internal unsupported: type=%d\n", __func__, (int) type);
return GGML_CUDA_COMM_ALLREDUCE_UNSUPPORTED;
}
@@ -1372,14 +1371,14 @@ static ggml_cuda_comm_allreduce_result ggml_backend_cuda_comm_try_allreduce_inte
return GGML_CUDA_COMM_ALLREDUCE_FAILED;
}
if (!ggml_is_contiguously_allocated(tensors[i])) {
GGML_LOG_WARN("%s: internal unsupported: tensor[%zu] is not contiguously allocated: ne=%" PRId64 " nbytes=%zu packed=%zu type=%d\n",
__func__, i, ne, ggml_nbytes(tensors[i]),
(size_t) ne * ggml_type_size(type) / ggml_blck_size(type), (int) type);
GGML_LOG_DEBUG("%s: internal unsupported: tensor[%zu] is not contiguously allocated: ne=%" PRId64 " nbytes=%zu packed=%zu type=%d\n",
__func__, i, ne, ggml_nbytes(tensors[i]),
(size_t) ne * ggml_type_size(type) / ggml_blck_size(type), (int) type);
return GGML_CUDA_COMM_ALLREDUCE_UNSUPPORTED;
}
if (((uintptr_t) tensors[i]->data & 0xF) != 0) {
GGML_LOG_WARN("%s: internal unsupported: tensor[%zu] data pointer is not 16-byte aligned: %p type=%d ne=%" PRId64 "\n",
__func__, i, tensors[i]->data, (int) type, ne);
GGML_LOG_DEBUG("%s: internal unsupported: tensor[%zu] data pointer is not 16-byte aligned: %p type=%d ne=%" PRId64 "\n",
__func__, i, tensors[i]->data, (int) type, ne);
return GGML_CUDA_COMM_ALLREDUCE_UNSUPPORTED;
}
}