Skip to content

Commit 9d33e26

Browse files
authored
stability: add blockWriter case for inserting data (#321)
* add blockWriter case to insert data
1 parent fb7184c commit 9d33e26

File tree

6 files changed

+376
-1
lines changed

6 files changed

+376
-1
lines changed

tests/actions.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
3535
"github.com/pingcap/tidb-operator/pkg/controller"
3636
"github.com/pingcap/tidb-operator/pkg/label"
37+
"github.com/pingcap/tidb-operator/tests/pkg/blockwriter"
38+
"github.com/pingcap/tidb-operator/tests/pkg/util"
3739
"k8s.io/api/apps/v1beta1"
3840
batchv1 "k8s.io/api/batch/v1"
3941
corev1 "k8s.io/api/core/v1"
@@ -43,6 +45,13 @@ import (
4345
"k8s.io/client-go/kubernetes"
4446
)
4547

48+
const (
49+
defaultTableNum int = 64
50+
defaultConcurrency = 512
51+
defaultBatchSize = 100
52+
defaultRawSize = 100
53+
)
54+
4655
func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface, logDir string) OperatorActions {
4756
return &operatorActions{
4857
cli: cli,
@@ -138,6 +147,7 @@ type TidbClusterInfo struct {
138147
InsertBatchSize string
139148
Resources map[string]string
140149
Args map[string]string
150+
blockWriter *blockwriter.BlockWriterCase
141151
Monitor bool
142152
}
143153

@@ -242,6 +252,14 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterInfo) error {
242252
info.Namespace, info.ClusterName, err, string(res))
243253
}
244254

255+
// init blockWriter case
256+
info.blockWriter = blockwriter.NewBlockWriterCase(blockwriter.Config{
257+
TableNum: defaultTableNum,
258+
Concurrency: defaultConcurrency,
259+
BatchSize: defaultBatchSize,
260+
RawSize: defaultRawSize,
261+
})
262+
245263
return nil
246264
}
247265

@@ -369,10 +387,17 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterInfo) error {
369387
}
370388

371389
func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterInfo) error {
372-
return nil
390+
dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password)
391+
db, err := util.OpenDB(dsn, defaultConcurrency)
392+
if err != nil {
393+
return err
394+
}
395+
396+
return info.blockWriter.Start(db)
373397
}
374398

375399
func (oa *operatorActions) StopInsertDataTo(info *TidbClusterInfo) error {
400+
info.blockWriter.Stop()
376401
return nil
377402
}
378403

tests/cmd/e2e/main.go

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ package main
1515

1616
import (
1717
"flag"
18+
"net/http"
19+
_ "net/http/pprof"
1820

1921
"github.com/golang/glog"
2022
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
@@ -36,6 +38,10 @@ func main() {
3638
logs.InitLogs()
3739
defer logs.FlushLogs()
3840

41+
go func() {
42+
glog.Info(http.ListenAndServe("localhost:6060", nil))
43+
}()
44+
3945
cfg, err := rest.InClusterConfig()
4046
if err != nil {
4147
glog.Fatalf("failed to get config: %v", err)

tests/pkg/blockWriter/blockWriter.go

+273
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.package spec
13+
14+
package blockwriter
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"math/rand"
21+
"strings"
22+
"sync"
23+
"sync/atomic"
24+
"time"
25+
26+
"github.com/golang/glog"
27+
"github.com/pingcap/tidb-operator/tests/pkg/util"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
)
30+
31+
const (
32+
queryChanSize int = 10000
33+
)
34+
35+
// BlockWriterCase is for concurrent writing blocks.
36+
type BlockWriterCase struct {
37+
cfg Config
38+
bws []*blockWriter
39+
40+
isRunning uint32
41+
isInit uint32
42+
stopChan chan struct{}
43+
44+
sync.RWMutex
45+
}
46+
47+
// Config defines the config of BlockWriterCase
48+
type Config struct {
49+
TableNum int
50+
Concurrency int
51+
BatchSize int
52+
RawSize int
53+
}
54+
55+
type blockWriter struct {
56+
rawSize int
57+
values []string
58+
batchSize int
59+
}
60+
61+
// NewBlockWriterCase returns the BlockWriterCase.
62+
func NewBlockWriterCase(cfg Config) *BlockWriterCase {
63+
c := &BlockWriterCase{
64+
cfg: cfg,
65+
stopChan: make(chan struct{}, 1),
66+
}
67+
68+
if c.cfg.TableNum < 1 {
69+
c.cfg.TableNum = 1
70+
}
71+
c.initBlocks()
72+
73+
return c
74+
}
75+
76+
func (c *BlockWriterCase) initBlocks() {
77+
c.bws = make([]*blockWriter, c.cfg.Concurrency)
78+
for i := 0; i < c.cfg.Concurrency; i++ {
79+
c.bws[i] = c.newBlockWriter()
80+
}
81+
}
82+
83+
func (c *BlockWriterCase) newBlockWriter() *blockWriter {
84+
return &blockWriter{
85+
rawSize: c.cfg.RawSize,
86+
values: make([]string, c.cfg.BatchSize),
87+
batchSize: c.cfg.BatchSize,
88+
}
89+
}
90+
91+
func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) {
92+
defer func() {
93+
glog.Infof("[%s] [action: generate Query] stopped", c)
94+
wg.Done()
95+
}()
96+
97+
for {
98+
tableN := rand.Intn(c.cfg.TableNum)
99+
var index string
100+
if tableN > 0 {
101+
index = fmt.Sprintf("%d", tableN)
102+
}
103+
104+
var querys []string
105+
for i := 0; i < 100; i++ {
106+
values := make([]string, c.cfg.BatchSize)
107+
for i := 0; i < c.cfg.BatchSize; i++ {
108+
blockData := util.RandString(c.cfg.RawSize)
109+
values[i] = fmt.Sprintf("('%s')", blockData)
110+
}
111+
112+
querys = append(querys, fmt.Sprintf(
113+
"INSERT INTO block_writer%s(raw_bytes) VALUES %s",
114+
index, strings.Join(values, ",")))
115+
}
116+
117+
select {
118+
case <-ctx.Done():
119+
return
120+
default:
121+
if len(queryChan) < queryChanSize {
122+
queryChan <- querys
123+
} else {
124+
glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c)
125+
util.Sleep(ctx, 10*time.Second)
126+
}
127+
}
128+
}
129+
}
130+
131+
func (bw *blockWriter) batchExecute(db *sql.DB, query string) error {
132+
_, err := db.Exec(query)
133+
if err != nil {
134+
glog.Errorf("[block_writer] exec sql [%s] failed, err: %v", query, err)
135+
return err
136+
}
137+
138+
return nil
139+
}
140+
141+
func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []string) {
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return
146+
default:
147+
}
148+
149+
querys, ok := <-queryChan
150+
if !ok {
151+
// No more query
152+
return
153+
}
154+
155+
for _, query := range querys {
156+
select {
157+
case <-ctx.Done():
158+
return
159+
default:
160+
if err := bw.batchExecute(db, query); err != nil {
161+
glog.Fatal(err)
162+
}
163+
}
164+
}
165+
}
166+
}
167+
168+
// Initialize inits case
169+
func (c *BlockWriterCase) initialize(db *sql.DB) error {
170+
glog.Infof("[%s] start to init...", c)
171+
defer func() {
172+
atomic.StoreUint32(&c.isInit, 1)
173+
glog.Infof("[%s] init end...", c)
174+
}()
175+
176+
for i := 0; i < c.cfg.TableNum; i++ {
177+
var s string
178+
if i > 0 {
179+
s = fmt.Sprintf("%d", i)
180+
}
181+
182+
tmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS block_writer%s %s", s, `
183+
(
184+
id BIGINT NOT NULL AUTO_INCREMENT,
185+
raw_bytes BLOB NOT NULL,
186+
PRIMARY KEY (id)
187+
)`)
188+
189+
err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
190+
_, err := db.Exec(tmt)
191+
if err != nil {
192+
glog.Warningf("[%s] exec sql [%s] failed, err: %v, retry...", c, tmt, err)
193+
return false, nil
194+
}
195+
196+
return true, nil
197+
})
198+
199+
if err != nil {
200+
glog.Errorf("[%s] exec sql [%s] failed, err: %v", c, tmt, err)
201+
return err
202+
}
203+
}
204+
205+
return nil
206+
}
207+
208+
// Start starts to run cases
209+
func (c *BlockWriterCase) Start(db *sql.DB) error {
210+
if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) {
211+
err := fmt.Errorf("[%s] is running, you can't start it again", c)
212+
glog.Error(err)
213+
return err
214+
}
215+
216+
defer func() {
217+
c.RLock()
218+
glog.Infof("[%s] stopped", c)
219+
atomic.SwapUint32(&c.isRunning, 0)
220+
}()
221+
222+
if c.isInit == 0 {
223+
if err := c.initialize(db); err != nil {
224+
return err
225+
}
226+
}
227+
228+
glog.Infof("[%s] start to execute case...", c)
229+
230+
var wg sync.WaitGroup
231+
232+
ctx, cancel := context.WithCancel(context.Background())
233+
234+
queryChan := make(chan []string, queryChanSize)
235+
236+
for i := 0; i < c.cfg.Concurrency; i++ {
237+
wg.Add(1)
238+
go func(i int) {
239+
defer wg.Done()
240+
c.bws[i].run(ctx, db, queryChan)
241+
}(i)
242+
}
243+
244+
wg.Add(1)
245+
go c.generateQuery(ctx, queryChan, &wg)
246+
247+
loop:
248+
for {
249+
select {
250+
case <-c.stopChan:
251+
glog.Infof("[%s] stoping...", c)
252+
cancel()
253+
break loop
254+
default:
255+
util.Sleep(context.Background(), 2*time.Second)
256+
}
257+
}
258+
259+
wg.Wait()
260+
close(queryChan)
261+
262+
return nil
263+
}
264+
265+
// Stop stops cases
266+
func (c *BlockWriterCase) Stop() {
267+
c.stopChan <- struct{}{}
268+
}
269+
270+
// String implements fmt.Stringer interface.
271+
func (c *BlockWriterCase) String() string {
272+
return "block_writer"
273+
}

tests/pkg/util/db.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package util
2+
3+
import (
4+
"database/sql"
5+
6+
"github.com/golang/glog"
7+
)
8+
9+
// OpenDB opens db
10+
func OpenDB(dsn string, maxIdleConns int) (*sql.DB, error) {
11+
db, err := sql.Open("mysql", dsn)
12+
if err != nil {
13+
return nil, err
14+
}
15+
16+
db.SetMaxIdleConns(maxIdleConns)
17+
glog.Info("DB opens successfully")
18+
return db, nil
19+
}

0 commit comments

Comments
 (0)