1
+ using Imazen . Abstractions . BlobCache ;
2
+ using Imazen . Abstractions . Blobs ;
3
+ using Imazen . Abstractions . Logging ;
4
+ using Imazen . Abstractions . Resulting ;
5
+ using Imazen . Common . Concurrency . BoundedTaskCollection ;
6
+ using Imazen . Routing . Caching ;
7
+ using Imazen . Routing . Health ;
8
+ using Imazen . Routing . Requests ;
9
+ using Microsoft . Extensions . Hosting ;
10
+
11
+ namespace Imazen . Routing . Promises . Pipelines ;
12
+
13
+ public record BlobCachingTestHarnessOptions (
14
+ long ? MaxUploadQueueBytes ,
15
+ MemoryCacheOptions ? MemoryCacheOptions ,
16
+ bool DelayRequestUntilUploadsComplete ,
17
+ List < List < IBlobCache > > SeriesOfCacheGroups ,
18
+ List < IBlobCache > SaveToCaches ,
19
+ Func < IRequestSnapshot , CancellationToken , ValueTask < CodeResult < IBlobWrapper > > > BlobProvider ,
20
+ LatencyTrackingZone BlobProviderLatencyZone ,
21
+ IReLogger Logger ,
22
+ bool LockByUniqueRequest ,
23
+ bool ShutdownServices )
24
+ {
25
+ public static BlobCachingTestHarnessOptions TestSingleCacheSync ( IBlobCache cache , Func < IRequestSnapshot , CancellationToken , ValueTask < CodeResult < IBlobWrapper > > > blobProvider , IReLogger logger )
26
+ {
27
+ return new BlobCachingTestHarnessOptions (
28
+ null ,
29
+ null ,
30
+ true ,
31
+ new List < List < IBlobCache > > { new List < IBlobCache > { cache } } ,
32
+ new List < IBlobCache > { cache } ,
33
+ blobProvider ,
34
+ new LatencyTrackingZone ( "TestBlobProvider" , 10000 , true ) ,
35
+ logger ,
36
+ false ,
37
+ true
38
+ ) ;
39
+ }
40
+ }
41
+
42
+
43
+ public class BlobCachingTestHarness : IHostedService
44
+ {
45
+ BlobCachingTestHarnessOptions options ;
46
+ BlobPipelineHarness blobPipelineHarness ;
47
+ BoundedTaskCollection < BlobTaskItem > ? uploadQueue ;
48
+ CacheHealthTracker cacheHealthTracker ;
49
+ CancellationTokenSource CancellationTokenSource { get ; } = new CancellationTokenSource ( ) ;
50
+ public BlobCachingTestHarness ( BlobCachingTestHarnessOptions options )
51
+ {
52
+ this . options = options ;
53
+ if ( options . MaxUploadQueueBytes != null )
54
+ {
55
+ uploadQueue = new BoundedTaskCollection < BlobTaskItem > ( options . MaxUploadQueueBytes . Value , CancellationTokenSource ) ;
56
+ // Now ensure caches wait for uploads to write before shutting down.
57
+ foreach ( var c in options . SaveToCaches )
58
+ c . Initialize ( new BlobCacheSupportData ( ( ) => uploadQueue ! . AwaitAllCurrentTasks ( ) ) ) ;
59
+ }
60
+ cacheHealthTracker = new CacheHealthTracker ( options . Logger ) ;
61
+ var cacheEngineOptions = new CacheEngineOptions
62
+ {
63
+ HealthTracker = cacheHealthTracker ,
64
+ SeriesOfCacheGroups = options . SeriesOfCacheGroups ,
65
+ SaveToCaches = options . SaveToCaches ,
66
+ Logger = options . Logger ,
67
+ UploadQueue = uploadQueue ,
68
+ DelayRequestUntilUploadsComplete = options . DelayRequestUntilUploadsComplete ,
69
+ LockByUniqueRequest = options . LockByUniqueRequest ,
70
+ BlobFactory = new SimpleReusableBlobFactory ( )
71
+ } ;
72
+ var cacheEngine = new CacheEngine ( null , cacheEngineOptions ) ;
73
+ blobPipelineHarness = new BlobPipelineHarness ( new BlobPipelineHarnessOptions (
74
+ cacheEngine ,
75
+ options . BlobProvider ,
76
+ options . Logger ,
77
+ options . BlobProviderLatencyZone ) ) ;
78
+
79
+ }
80
+
81
+ public async ValueTask < CodeResult < IBlobWrapper > > RequestBlobWrapper ( string path , string query = "" ,
82
+ CancellationToken cancellationToken = default )
83
+ {
84
+ return await blobPipelineHarness . RequestBlobWrapper ( path , query , cancellationToken ) ;
85
+ }
86
+
87
+ public Task StartAsync ( CancellationToken cancellationToken )
88
+ {
89
+ return Task . CompletedTask ;
90
+ }
91
+
92
+ public async Task StopAsync ( CancellationToken cancellationToken )
93
+ {
94
+ if ( uploadQueue != null )
95
+ {
96
+ await uploadQueue . StopAsync ( cancellationToken ) ;
97
+ }
98
+
99
+ await cacheHealthTracker . StopAsync ( cancellationToken ) ;
100
+ if ( options . ShutdownServices )
101
+ {
102
+ var allCaches = options . SeriesOfCacheGroups . SelectMany ( x => x ) . Concat ( options . SaveToCaches ) ;
103
+ foreach ( var cache in allCaches )
104
+ {
105
+ if ( cache is IHostedService service )
106
+ {
107
+ await service . StopAsync ( cancellationToken ) ;
108
+ }
109
+ }
110
+ }
111
+ }
112
+
113
+ public async Task AwaitEnqueuedTasks ( )
114
+ {
115
+ if ( uploadQueue != null )
116
+ {
117
+ await uploadQueue . AwaitAllCurrentTasks ( ) ;
118
+ }
119
+ }
120
+ }
0 commit comments