Skip to content

Commit 1d82686

Browse files
committed
Begin adding health tracking integration, sync/bench harness support/fix double use of blob wrapper
1 parent 5e70dc5 commit 1d82686

File tree

7 files changed

+118
-78
lines changed

7 files changed

+118
-78
lines changed

.cursorignore

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv)
2+
*.txt
3+
*.dll
4+
*.pdb
5+
*.user
6+
*.png
7+
*.jpg
8+
*.jpeg
9+
*.bmp
10+
*.ico
11+
*.webp
12+
*.nupkg
13+
*.json
14+
*.info
15+
.idea/
16+
.vscode/
17+
bin/
18+
examples/
19+
NuGetPackages/
20+
obj/
21+
packages/
22+

.vscode/settings.json

+6-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,10 @@
55
],
66
"cSpell.ignoreWords": [
77
"sthree"
8-
]
8+
],
9+
"editor.fontSize": 14,
10+
"window.zoomLevel": 1,
11+
"window.zoomPerWindow": true,
12+
"editor.mouseWheelZoom": true,
13+
"terminal.integrated.mouseWheelZoom": true
914
}

src/Imazen.Routing/Engine/RoutingEngine.cs

+6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ public bool MightHandleRequest<TQ>(string path, TQ query) where TQ : IReadOnlyQu
5858
}
5959

6060

61+
/// <summary>
62+
/// Errors if the route isn't a cachable blob; returns null if there's no match.
63+
/// </summary>
64+
/// <param name="request"></param>
65+
/// <param name="cancellationToken"></param>
66+
/// <returns></returns>
6167
public async ValueTask<CodeResult<ICacheableBlobPromise>?> RouteToPromiseAsync(MutableRequest request, CancellationToken cancellationToken = default)
6268
{
6369
// log info about the request

src/Imazen.Routing/Promises/Pipelines/CacheEngine.cs

+64-44
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Imazen.Abstractions.Resulting;
99
using Imazen.Common.Concurrency;
1010
using Imazen.Common.Concurrency.BoundedTaskCollection;
11+
using Imazen.Routing.Health;
1112
using Imazen.Routing.Helpers;
1213
using Imazen.Routing.HttpAbstractions;
1314
using Imazen.Routing.Requests;
@@ -21,6 +22,7 @@ namespace Imazen.Routing.Promises.Pipelines;
2122
public class CacheEngine: IBlobPromisePipeline
2223
{
2324

25+
2426
public CacheEngine(IBlobPromisePipeline? next, CacheEngineOptions options)
2527
{
2628
Options = options;
@@ -30,6 +32,7 @@ public CacheEngine(IBlobPromisePipeline? next, CacheEngineOptions options)
3032
{
3133
Locks = new AsyncLockProvider();
3234
}
35+
HealthTracker = options.HealthTracker as CacheHealthTracker;
3336
}
3437

3538
public async ValueTask<CodeResult<ICacheableBlobPromise>> GetFinalPromiseAsync(ICacheableBlobPromise promise, IBlobRequestRouter router,
@@ -49,7 +52,7 @@ public async ValueTask<CodeResult<ICacheableBlobPromise>> GetFinalPromiseAsync(I
4952
return CodeResult<ICacheableBlobPromise>.Ok(
5053
new ServerlessCachePromise(wrappedPromise.FinalRequest, wrappedPromise, this));
5154
}
52-
55+
private CacheHealthTracker? HealthTracker { get; }
5356
private IBlobPromisePipeline? Next { get; }
5457
private AsyncLockProvider? Locks { get; }
5558

@@ -70,7 +73,7 @@ private async Task FinishUpload(IBlobCacheRequest cacheReq, ICacheableBlobPromis
7073
await Task.WhenAll(Options.SaveToCaches.Select(x => x.CachePut(cacheEventDetails, cancellationToken)));
7174
}
7275

73-
public async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise promise,IBlobRequestRouter router, CancellationToken cancellationToken = default)
76+
internal async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise promise,IBlobRequestRouter router, CancellationToken cancellationToken = default)
7477
{
7578
if (!promise.ReadyToWriteCacheKeyBasisData)
7679
{
@@ -95,16 +98,15 @@ public async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise pro
9598
return await FetchInner(cacheRequest, promise, router, cancellationToken);
9699
}
97100
}
98-
99101

100-
101-
public async ValueTask<CodeResult<IBlobWrapper>> FetchInner(IBlobCacheRequest cacheRequest, ICacheableBlobPromise promise, IBlobRequestRouter router, CancellationToken cancellationToken = default)
102+
103+
private async ValueTask<CodeResult<IBlobWrapper>> FetchInner(IBlobCacheRequest cacheRequest, ICacheableBlobPromise promise, IBlobRequestRouter router, CancellationToken cancellationToken = default)
102104
{
103105
// First check the upload queue.
104106
if (Options.UploadQueue?.TryGet(cacheRequest.CacheKeyHashString, out var uploadTask) == true)
105107
{
106108
Options.Logger.LogTrace("Located requested resource from the upload queue {CacheKeyHashString}", cacheRequest.CacheKeyHashString);
107-
return CodeResult<IBlobWrapper>.Ok(uploadTask.Blob);
109+
return CodeResult<IBlobWrapper>.Ok(uploadTask.Blob.ForkReference());
108110
}
109111
// Then check the caches
110112
List<KeyValuePair<IBlobCache,Task<CacheFetchResult>>>? allFetchAttempts = null;
@@ -266,9 +268,9 @@ private void LogFetchTaskStatus(bool isFresh,
266268
LogFetchTaskStatus(isFresh, cacheHit, fetchTasks);
267269
}
268270

269-
if (Options.UploadQueue == null)
271+
if (Options.UploadQueue == null && !Options.DelayRequestUntilUploadsComplete)
270272
{
271-
Log.LogWarning("No upload queue configured");
273+
Log.LogWarning("No upload queue configured, and synchronous mode disabled. Not saving to any caches.");
272274
return null;
273275
}
274276

@@ -323,23 +325,27 @@ private void LogFetchTaskStatus(bool isFresh,
323325

324326

325327

326-
private async Task BufferAndEnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper blob, bool isFresh,
328+
private async ValueTask BufferAndEnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper blob, bool isFresh,
327329
IBlobCache? cacheHit, List<KeyValuePair<IBlobCache, Task<CacheFetchResult>>>? fetchTasks, CancellationToken bufferCancellationToken = default)
328330
{
329-
if (EnqueueSaveToCaches(cacheRequest, blob, isFresh, cacheHit, fetchTasks))
331+
// Here's also a good place to handle the cachefetchresults; \
332+
// HealthTracker[cacheHit].ReportBehavior();
333+
//
334+
335+
if (await EnqueueSaveToCaches(cacheRequest, blob, isFresh, cacheHit, fetchTasks))
330336
{
331337
await blob.EnsureReusable(bufferCancellationToken);
332338
Log.LogTrace("Called EnsureReusable on {CacheKeyHashString}", cacheRequest.CacheKeyHashString);
333339
}
334340
}
335341

336-
private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper mainBlob, bool isFresh,
342+
private ValueTask<bool> EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper mainBlob, bool isFresh,
337343
IBlobCache? cacheHit, List<KeyValuePair<IBlobCache,Task<CacheFetchResult>>>? fetchTasks)
338344
{
339345

340346
var cachesToSaveTo = GetUploadCacheCandidates(isFresh, ref cacheHit, fetchTasks);
341347

342-
if (cachesToSaveTo == null || Options.UploadQueue == null) return false; // Nothing to do
348+
if (cachesToSaveTo == null || (Options.UploadQueue == null && !Options.DelayRequestUntilUploadsComplete)) return new ValueTask<bool>(false); // Nothing to do
343349

344350
var blob = mainBlob.ForkReference();
345351
mainBlob.IndicateInterest();
@@ -360,39 +366,56 @@ private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper ma
360366
throw new InvalidOperationException();
361367
}
362368

363-
364-
var enqueueResult = Options.UploadQueue.Queue(new BlobTaskItem(cacheRequest.CacheKeyHashString,blob), async (taskItem, cancellationToken) =>
369+
var blobTaskItem = new BlobTaskItem(cacheRequest.CacheKeyHashString,blob);
370+
371+
Task<PutResult[]> BulkUploader(BlobTaskItem taskItem, CancellationToken cancellationToken)
365372
{
366373
// We need to dispose of the blob wrapper after all uploads are complete.
367374
using (taskItem.Blob)
368375
{
369-
var tasks = cachesToSaveTo.Select(async cache =>
370-
{
371-
var waitingInQueue = DateTime.UtcNow - taskItem.JobCreatedAt;
376+
var tasks = cachesToSaveTo.Select(PerCacheUpload)
377+
.ToArray();
378+
return Task.WhenAll(tasks);
372379

373-
var sw = Stopwatch.StartNew();
374-
try
375-
{
376-
Log.LogTrace("[put started] CachePut {key} to {CacheName}", taskItem.UniqueKey,
377-
cache.UniqueName);
378-
var result = await cache.CachePut(eventDetails, cancellationToken);
379-
sw.Stop();
380-
var r = new PutResult(cache, eventDetails, result, null, sw.Elapsed, waitingInQueue);
381-
LogPutResult(r);
382-
return r;
383-
}
384-
catch (Exception e)
385-
{
386-
sw.Stop();
387-
var r = new PutResult(cache, eventDetails, null, e, sw.Elapsed, waitingInQueue);
388-
LogPutResult(r);
389-
return r;
390-
}
380+
async Task<PutResult> PerCacheUpload(IBlobCache cache)
381+
{
382+
var waitingInQueue = DateTime.UtcNow - taskItem.JobCreatedAt;
383+
384+
var sw = Stopwatch.StartNew();
385+
try
386+
{
387+
Log.LogTrace("[put started] CachePut {key} to {CacheName}", taskItem.UniqueKey, cache.UniqueName);
388+
var result = await cache.CachePut(eventDetails, cancellationToken);
389+
sw.Stop();
390+
var r = new PutResult(cache, eventDetails, result, null, sw.Elapsed, waitingInQueue);
391+
LogPutResult(r);
392+
return r;
391393
}
392-
).ToArray();
393-
HandleUploadAnswers(await Task.WhenAll(tasks));
394+
catch (Exception e)
395+
{
396+
sw.Stop();
397+
var r = new PutResult(cache, eventDetails, null, e, sw.Elapsed, waitingInQueue);
398+
LogPutResult(r);
399+
return r;
400+
}
401+
}
394402
}
395-
});
403+
}
404+
405+
async Task BulkUploaderAsync(BlobTaskItem item, CancellationToken ct) => HandleUploadAnswers(await BulkUploader(item, ct));
406+
407+
if (Options.DelayRequestUntilUploadsComplete)
408+
{
409+
var uploadTask = BulkUploader(blobTaskItem, default);
410+
var finalTask = uploadTask.ContinueWith(t =>
411+
{
412+
HandleUploadAnswers(t.Result);
413+
return true;
414+
}, TaskContinuationOptions.ExecuteSynchronously);
415+
return new ValueTask<bool>(finalTask);
416+
}
417+
if (Options.UploadQueue == null) return new ValueTask<bool>(false);
418+
var enqueueResult = Options.UploadQueue.Queue(blobTaskItem, BulkUploaderAsync);
396419
if (enqueueResult == TaskEnqueueResult.QueueFull)
397420
{
398421
Log.LogWarning("[CACHE PUT ERROR] Upload queue is full, not enqueuing {CacheKeyHashString} for upload to {Caches}", cacheRequest.CacheKeyHashString, string.Join(", ", cachesToSaveTo.Select(x => x.UniqueName)));
@@ -409,7 +432,8 @@ private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper ma
409432
{
410433
Log.LogTrace("Enqueued {CacheKeyHashString} for upload to {Caches}", cacheRequest.CacheKeyHashString, string.Join(", ", cachesToSaveTo.Select(x => x.UniqueName)));
411434
}
412-
return enqueueResult == TaskEnqueueResult.Enqueued;
435+
436+
return new ValueTask<bool>(enqueueResult == TaskEnqueueResult.Enqueued);
413437
}
414438
record struct PutResult(IBlobCache Cache, CacheEventDetails EventDetails, CodeResult? Result, Exception? Exception, TimeSpan Executing, TimeSpan Waiting);
415439

@@ -432,13 +456,9 @@ private void LogPutResult(PutResult result)
432456

433457
private void HandleUploadAnswers(PutResult[] results)
434458
{
435-
//TODO?
459+
//TODO? Cache put failures should probably affect health??
436460

437461
}
438-
439-
440-
441-
442462
}
443463

444464
internal record ServerlessCachePromise(IRequestSnapshot FinalRequest, ICacheableBlobPromise FreshPromise, CacheEngine CacheEngine): ICacheableBlobPromise

src/Imazen.Routing/Promises/Pipelines/CacheEngineOptions.cs

+10-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Imazen.Abstractions.Blobs;
33
using Imazen.Abstractions.Logging;
44
using Imazen.Common.Concurrency.BoundedTaskCollection;
5+
using Imazen.Routing.Health;
56
using Imazen.Routing.Requests;
67

78
namespace Imazen.Routing.Promises.Pipelines;
@@ -20,15 +21,21 @@ public CacheEngineOptions(List<IBlobCache> simultaneousFetchAndPut, BoundedTaskC
2021
// Each cache group is a list of caches that can be queried in parallel
2122
public required List<List<IBlobCache>> SeriesOfCacheGroups { get; init; }
2223

24+
// All caches we want to enable writing to
2325
public required List<IBlobCache> SaveToCaches { get; init; }
2426

25-
[Obsolete("Use the parameterized one local to the request")]
26-
public IBlobRequestRouter? RequestRouter { get; init; }
27-
27+
// TODO: maybe remove?
2828
public required IReusableBlobFactory BlobFactory { get; init; }
2929

3030
public required BoundedTaskCollection<BlobTaskItem>? UploadQueue { get; init; }
3131

32+
public required object? HealthTracker { get; init; }
33+
34+
/// <summary>
35+
/// Disables background upload queue, and instead uploads blobs immediately before responding to the client.
36+
/// </summary>
37+
public bool DelayRequestUntilUploadsComplete { get; init; }
38+
3239
public required IReLogger Logger { get; init; }
3340

3441
/// <summary>
@@ -40,6 +47,4 @@ public CacheEngineOptions(List<IBlobCache> simultaneousFetchAndPut, BoundedTaskC
4047
/// How long to wait for fetching and generation of the same request by another thread.
4148
/// </summary>
4249
public int LockTimeoutMs { get; init; } = 2000;
43-
44-
4550
}

src/Imazen.Routing/Promises/Pipelines/PipelineBuilder.cs

-26
This file was deleted.

src/Imazen.Routing/Serving/ImageServer.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Imazen.Common.Licensing;
1313
using Imazen.Routing.Caching;
1414
using Imazen.Routing.Engine;
15+
using Imazen.Routing.Health;
1516
using Imazen.Routing.Helpers;
1617
using Imazen.Routing.HttpAbstractions;
1718
using Imazen.Routing.Layers;
@@ -37,6 +38,7 @@ internal class ImageServer<TRequest, TResponse, TContext> : IImageServer<TReques
3738
private readonly BoundedTaskCollection<BlobTaskItem> uploadQueue;
3839
private readonly bool shutdownRegisteredServices;
3940
private readonly IImageServerContainer container;
41+
private readonly CacheHealthTracker cacheHealthTracker;
4042
public ImageServer(IImageServerContainer container,
4143
ILicenseChecker licenseChecker,
4244
LicenseOptions licenseOptions,
@@ -95,11 +97,15 @@ public ImageServer(IImageServerContainer container,
9597
}
9698

9799
var allCachesExceptMemory = allCaches?.Where(c => c != memoryCache)?.ToList();
100+
101+
cacheHealthTracker = container.GetService<CacheHealthTracker>();
102+
cacheHealthTracker ??= new CacheHealthTracker(logger);
98103

99104
var watermarkingLogic = container.GetService<WatermarkingLogicOptions>() ??
100105
new WatermarkingLogicOptions(null, null);
101106
var sourceCacheOptions = new CacheEngineOptions
102107
{
108+
HealthTracker = cacheHealthTracker,
103109
SeriesOfCacheGroups =
104110
[
105111
..new[] { [memoryCache], allCachesExceptMemory ?? [] }
@@ -304,16 +310,18 @@ await SmallHttpResponse.Text(404, "The specified resource does not exist.\r\n" +
304310

305311
}
306312

307-
public Task StartAsync(CancellationToken cancellationToken)
313+
public async Task StartAsync(CancellationToken cancellationToken)
308314
{
309-
return uploadQueue.StartAsync(cancellationToken);
315+
await uploadQueue.StartAsync(cancellationToken);
316+
await cacheHealthTracker.StartAsync(cancellationToken);
310317
}
311318

312319
public async Task StopAsync(CancellationToken cancellationToken)
313320
{
314321
//TODO: error handling or no?
315322
//await uploadCancellationTokenSource.CancelAsync();
316323
await uploadQueue.StopAsync(cancellationToken);
324+
await cacheHealthTracker.StopAsync(cancellationToken);
317325
if (shutdownRegisteredServices)
318326
{
319327
var services = this.container.GetInstanceOfEverythingLocal<IHostedService>();

0 commit comments

Comments
 (0)