-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreplay.go
120 lines (96 loc) · 2.44 KB
/
replay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Package replay provides a framework for replaying a timestamped
// sequence of events at a relative time.
package replay
import (
"log"
"time"
)
// An Event represents individual event found in the source.
type Event interface {
TS() time.Time // The time this event occurred.
}
// A Source of events (e.g. log reader, etc...)
type Source interface {
// Next event from the log, nil if there are no more
Next() Event
}
// Action to process on each event.
type Action interface {
// Process the event.
Process(ev Event)
}
// Replay is the primary replay type. Build a new one with New.
type Replay struct {
timeScale float64
firstEvent time.Time
realStart time.Time
now func() time.Time
sleep func(time.Duration)
}
func (r *Replay) timeOffset(eventTime time.Time) time.Duration {
now := r.now()
eventElapsed := eventTime.Sub(r.firstEvent)
localElapsed := time.Duration(float64(now.Sub(r.realStart)) * r.timeScale)
return time.Duration(float64(eventElapsed-localElapsed) / r.timeScale)
}
func (r *Replay) syncTime(eventTime time.Time) {
toSleep := r.timeOffset(eventTime)
if toSleep > 0 {
r.sleep(toSleep)
}
}
type functionAction func(Event)
// Process the event.
func (f functionAction) Process(ev Event) {
f(ev)
}
// FunctionAction wraps a function as an Action.
func FunctionAction(f func(Event)) Action {
return functionAction(f)
}
type functionSource func() Event
func (f functionSource) Next() Event {
return f()
}
// FunctionSource creates a source from a function.
func FunctionSource(f func() Event) Source {
return functionSource(f)
}
// CollectionSource emits events from an input slice
func CollectionSource(evs []Event) Source {
return FunctionSource(func() Event {
if len(evs) == 0 {
return nil
}
rv := evs[0]
evs = evs[1:]
return rv
})
}
// New creates a new Replay with time scaled to the given amount.
//
// scale should be > 0
func New(scale float64) *Replay {
if scale <= 0 {
log.Panic("Timescale must be > 0")
}
return &Replay{timeScale: scale, now: time.Now, sleep: time.Sleep}
}
// Run the replay.
//
// Returns the amount of time we were "off" of the target.
func (r *Replay) Run(s Source, action Action) time.Duration {
event := s.Next()
if event == nil {
return 0
}
r.realStart = r.now()
r.firstEvent = event.TS()
eventTime := r.firstEvent
for ; event != nil; event = s.Next() {
action.Process(event)
eventTime = event.TS()
r.syncTime(eventTime)
}
return r.timeOffset(eventTime)
}