-
-
Notifications
You must be signed in to change notification settings - Fork 326
/
Copy pathAmazonQldbDataProvider.cs
191 lines (173 loc) · 8.05 KB
/
AmazonQldbDataProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
using Amazon.IonDotnet.Builders;
using Amazon.IonDotnet.Tree;
using Amazon.IonDotnet.Tree.Impl;
using Amazon.QLDB.Driver;
using Amazon.QLDBSession.Model;
using Audit.AmazonQLDB.ConfigurationApi;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Audit.Core;
using Newtonsoft.Json;
namespace Audit.AmazonQLDB.Providers
{
/// <summary>
/// Amazon QLDB data provider for Audit.NET. Store the audit events into Amazon QLDB tables.
/// </summary>
public class AmazonQldbDataProvider : AuditDataProvider
{
/// <summary>
/// Top-level attributes to be added to the event and document before saving.
/// </summary>
public Dictionary<string, Func<AuditEvent, object>> CustomAttributes { get; set; } = new Dictionary<string, Func<AuditEvent, object>>();
///// <summary>
///// Factory that creates the QLDB Driver.
///// </summary>
public Lazy<IAsyncQldbDriver> QldbDriver { get; set; }
/// <summary>
/// The table name to use when saving an audit event in the QLDB table.
/// </summary>
public Setting<string> TableName { get; set; }
/// <summary>
/// Gets or sets the JSON serializer settings.
/// </summary>
public JsonSerializerSettings JsonSettings { get; set; } = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
/// <summary>
/// Creates a new AmazonQLDB data provider using the given driver.
/// </summary>
/// <param name="driver">The Amazon QLDB driver instance.</param>
public AmazonQldbDataProvider(IAsyncQldbDriver driver)
{
QldbDriver = new Lazy<IAsyncQldbDriver>(() => driver);
}
/// <summary>
/// Creates a new AmazonQLDB data provider using the given driver.
/// </summary>
/// <param name="driver">The Amazon QLDB driver instance.</param>
public AmazonQldbDataProvider(AsyncQldbDriver driver)
{
QldbDriver = new Lazy<IAsyncQldbDriver>(() => driver);
}
/// <summary>
/// Creates a new AmazonQLDB data provider.
/// </summary>
public AmazonQldbDataProvider()
{
}
/// <summary>
/// Creates a new AmazonQLDB data provider with the given configuration options.
/// </summary>
public AmazonQldbDataProvider(Action<IAmazonQldbProviderConfigurator> config)
{
var amazonQldbProviderConfigurator = new AmazonQldbProviderConfigurator();
if (config != null)
{
config.Invoke(amazonQldbProviderConfigurator);
TableName = amazonQldbProviderConfigurator._tableConfigurator._tableName;
CustomAttributes = amazonQldbProviderConfigurator._tableConfigurator._attrConfigurator?._attributes;
}
}
public override object CloneValue<T>(T value, AuditEvent auditEvent)
{
if (value == null)
{
return null;
}
if (value is string)
{
return value;
}
return JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value, JsonSettings), value.GetType(), JsonSettings);
}
/// <summary>
/// Inserts an event into AmazonQLDB
/// </summary>
public override object InsertEvent(AuditEvent auditEvent) => InsertEventAsync(auditEvent).GetAwaiter().GetResult();
/// <summary>
/// Asynchronously inserts an event into AmazonQLDB
/// </summary>
public override async Task<object> InsertEventAsync(AuditEvent auditEvent, CancellationToken cancellationToken = default)
{
var driver = QldbDriver.Value;
var tableName = GetTableName(auditEvent);
IIonValue inserted = null;
await driver.Execute(async txn =>
{
var json = JsonConvert.SerializeObject(auditEvent, JsonSettings);
var insertInto = $@"INSERT INTO {tableName} VALUE ?";
try
{
inserted = await (await txn.Execute(insertInto, IonLoader.Default.Load(json))).FirstAsync(cancellationToken);
}
catch (BadRequestException e) when (e.Message.Contains($"No such variable named '{tableName}'"))
{
await txn.Execute($"CREATE TABLE {tableName}");
inserted = await(await txn.Execute(insertInto, IonLoader.Default.Load(json))).FirstAsync(cancellationToken);
}
}, cancellationToken);
var insertDocumentId = inserted?.GetField("documentId").StringValue;
return (insertDocumentId, tableName);
}
/// <summary>
/// Replaces an event into AmazonQLDB
/// </summary>
public override void ReplaceEvent(object eventId, AuditEvent auditEvent) => ReplaceEventAsync(eventId, auditEvent).GetAwaiter().GetResult();
/// <summary>
/// Asynchronously replaces an event into AmazonQLDB
/// </summary>
public override Task ReplaceEventAsync(object eventId, AuditEvent auditEvent, CancellationToken cancellationToken = default)
{
var driver = QldbDriver.Value;
var (insertDocumentId, tableName) = (ValueTuple<string, string>)eventId;
return driver.Execute(trx => trx.Execute(
$@"UPDATE {tableName} AS e BY eid
SET e = ?
WHERE eid = ?",
IonLoader.Default.Load(JsonConvert.SerializeObject(auditEvent, JsonSettings)), new ValueFactory().NewString(insertDocumentId)), cancellationToken);
}
/// <summary>
/// Gets an audit event from its primary key
/// </summary>
/// <typeparam name="T">The audit event type</typeparam>
/// <param name="eventId">The event ID to retrieve.
/// Must be a Primitive, a AmazonQLDBEntry or an array of any of these two types. The first (or only) element must be the Hash key, and the second element is the range key.
/// </param>
public override T GetEvent<T>(object eventId) => GetFromQldb<T>(eventId).GetAwaiter().GetResult();
/// <summary>
/// Asynchronously gets an audit event from its primary key
/// </summary>
/// <typeparam name="T">The audit event type</typeparam>
/// <param name="eventId">The event ID to retrieve.
/// Must be a Primitive, a AmazonQLDBEntry or an array of any of these two types. The first (or only) element must be the Hash key, and the second element is the range key.
/// </param>
/// <param name="cancellationToken">The Cancellation Token.</param>
public override Task<T> GetEventAsync<T>(object eventId, CancellationToken cancellationToken = default) => GetFromQldb<T>(eventId, cancellationToken);
private async Task<T> GetFromQldb<T>(object eventId, CancellationToken cancellationToken = default) where T : AuditEvent
{
var driver = QldbDriver.Value;
var (insertDocumentId, tableName) = (ValueTuple<string, string>)eventId;
IIonValue selectedEvent = null;
await driver.Execute(async trx =>
{
selectedEvent = await (await trx.Execute(
$@"SELECT e.*
FROM {tableName} AS e BY eid
WHERE eid = ?",
new ValueFactory().NewString(insertDocumentId))).FirstAsync(cancellationToken);
}, cancellationToken);
var json = selectedEvent.ToPrettyString();
var selectedAuditEvent = JsonConvert.DeserializeObject<T>(json, JsonSettings);
return selectedAuditEvent;
}
private string GetTableName(AuditEvent auditEvent)
{
return TableName.GetValue(auditEvent) ?? auditEvent.GetType().Name;
}
}
}