19
19
import java .nio .ByteBuffer ;
20
20
import java .util .ArrayList ;
21
21
import java .util .HashMap ;
22
- import java .util .LinkedList ;
23
22
import java .util .List ;
24
23
import java .util .Map ;
25
24
import java .util .Queue ;
26
25
import java .util .concurrent .CountDownLatch ;
26
+ import org .slf4j .Logger ;
27
+ import org .slf4j .LoggerFactory ;
28
+ import com .google .code .yanf4j .buffer .IoBuffer ;
29
+ import com .google .code .yanf4j .core .impl .FutureImpl ;
27
30
import net .rubyeye .xmemcached .MemcachedOptimizer ;
28
31
import net .rubyeye .xmemcached .buffer .BufferAllocator ;
29
32
import net .rubyeye .xmemcached .command .AssocCommandAware ;
43
46
import net .rubyeye .xmemcached .utils .ByteUtils ;
44
47
import net .rubyeye .xmemcached .utils .OpaqueGenerater ;
45
48
import net .rubyeye .xmemcached .utils .Protocol ;
46
- import org .slf4j .Logger ;
47
- import org .slf4j .LoggerFactory ;
48
- import com .google .code .yanf4j .buffer .IoBuffer ;
49
- import com .google .code .yanf4j .core .impl .FutureImpl ;
50
49
51
50
/**
52
- * Memcached command optimizer,merge single-get comands to multi-get command,merge ByteBuffers to
53
- * fit the socket's sendBufferSize etc.
54
- *
51
+ * Memcached command optimizer,merge single-get comands to multi-get command,
52
+ * merge ByteBuffers to fit the socket's sendBufferSize etc.
53
+ *
55
54
* @author dennis
56
55
*/
57
56
public class Optimizer implements OptimizerMBean , MemcachedOptimizer {
@@ -64,21 +63,21 @@ public class Optimizer implements OptimizerMBean, MemcachedOptimizer {
64
63
private static final Logger log = LoggerFactory .getLogger (Optimizer .class );
65
64
private Protocol protocol = Protocol .Binary ;
66
65
67
- public Optimizer (Protocol protocol ) {
66
+ public Optimizer (final Protocol protocol ) {
68
67
XMemcachedMbeanServer .getInstance ().registMBean (this , this .getClass ().getPackage ().getName ()
69
68
+ ":type=" + this .getClass ().getSimpleName () + "-" + MemcachedClientNameHolder .getName ());
70
69
this .protocol = protocol ;
71
70
}
72
71
73
- public void setBufferAllocator (BufferAllocator bufferAllocator ) {
72
+ public void setBufferAllocator (final BufferAllocator bufferAllocator ) {
74
73
75
74
}
76
75
77
76
public int getMergeFactor () {
78
77
return this .mergeFactor ;
79
78
}
80
79
81
- public void setMergeFactor (int mergeFactor ) {
80
+ public void setMergeFactor (final int mergeFactor ) {
82
81
if (this .mergeFactor != mergeFactor ) {
83
82
log .warn ("change mergeFactor from " + this .mergeFactor + " to " + mergeFactor );
84
83
}
@@ -90,7 +89,7 @@ public boolean isOptimizeGet() {
90
89
return this .optimiezeGet ;
91
90
}
92
91
93
- public void setOptimizeGet (boolean optimiezeGet ) {
92
+ public void setOptimizeGet (final boolean optimiezeGet ) {
94
93
log .warn (optimiezeGet ? "Enable merge get commands" : "Disable merge get commands" );
95
94
this .optimiezeGet = optimiezeGet ;
96
95
}
@@ -99,47 +98,45 @@ public boolean isOptimizeMergeBuffer() {
99
98
return this .optimiezeMergeBuffer ;
100
99
}
101
100
102
- public void setOptimizeMergeBuffer (boolean optimiezeMergeBuffer ) {
101
+ public void setOptimizeMergeBuffer (final boolean optimiezeMergeBuffer ) {
103
102
log .warn (optimiezeMergeBuffer ? "Enable merge buffers" : "Disable merge buffers" );
104
103
this .optimiezeMergeBuffer = optimiezeMergeBuffer ;
105
104
}
106
105
107
106
@ SuppressWarnings ("unchecked" )
108
107
public Command optimize (final Command currentCommand , final Queue writeQueue ,
109
- final Queue <Command > executingCmds , int sendBufferSize ) {
108
+ final Queue <Command > executingCmds , final int sendBufferSize ) {
110
109
Command optimiezeCommand = currentCommand ;
111
- optimiezeCommand = this .optimiezeGet (writeQueue , executingCmds , optimiezeCommand );
110
+ optimiezeCommand = optimiezeGet (writeQueue , executingCmds , optimiezeCommand );
111
+ optimiezeCommand = optimiezeSet (writeQueue , executingCmds , optimiezeCommand , sendBufferSize );
112
112
optimiezeCommand =
113
- this .optimiezeSet (writeQueue , executingCmds , optimiezeCommand , sendBufferSize );
114
- optimiezeCommand =
115
- this .optimiezeMergeBuffer (optimiezeCommand , writeQueue , executingCmds , sendBufferSize );
113
+ optimiezeMergeBuffer (optimiezeCommand , writeQueue , executingCmds , sendBufferSize );
116
114
return optimiezeCommand ;
117
115
}
118
116
119
117
/**
120
118
* merge buffers to fit socket's send buffer size
121
- *
119
+ *
122
120
* @param currentCommand
123
121
* @return
124
122
* @throws InterruptedException
125
123
*/
126
124
@ SuppressWarnings ("unchecked" )
127
125
public final Command optimiezeMergeBuffer (Command optimiezeCommand , final Queue writeQueue ,
128
- final Queue <Command > executingCmds , int sendBufferSize ) {
126
+ final Queue <Command > executingCmds , final int sendBufferSize ) {
129
127
if (log .isDebugEnabled ()) {
130
128
log .debug ("Optimieze merge buffer:" + optimiezeCommand .toString ());
131
129
}
132
130
if (this .optimiezeMergeBuffer
133
131
&& optimiezeCommand .getIoBuffer ().remaining () < sendBufferSize - 24 ) {
134
- optimiezeCommand =
135
- this .mergeBuffer (optimiezeCommand , writeQueue , executingCmds , sendBufferSize );
132
+ optimiezeCommand = mergeBuffer (optimiezeCommand , writeQueue , executingCmds , sendBufferSize );
136
133
}
137
134
return optimiezeCommand ;
138
135
}
139
136
140
137
/**
141
138
* Merge get operation to multi-get operation
142
- *
139
+ *
143
140
* @param currentCmd
144
141
* @param mergeCommands
145
142
* @return
@@ -151,18 +148,18 @@ public final Command optimiezeGet(final Queue writeQueue, final Queue<Command> e
151
148
if (optimiezeCommand .getCommandType () == CommandType .GET_ONE
152
149
|| optimiezeCommand .getCommandType () == CommandType .GETS_ONE ) {
153
150
if (this .optimiezeGet ) {
154
- optimiezeCommand = this . mergeGetCommands (optimiezeCommand , writeQueue , executingCmds ,
151
+ optimiezeCommand = mergeGetCommands (optimiezeCommand , writeQueue , executingCmds ,
155
152
optimiezeCommand .getCommandType ());
156
153
}
157
154
}
158
155
return optimiezeCommand ;
159
156
}
160
157
161
158
public final Command optimiezeSet (final Queue writeQueue , final Queue <Command > executingCmds ,
162
- Command optimiezeCommand , int sendBufferSize ) {
159
+ Command optimiezeCommand , final int sendBufferSize ) {
163
160
if (this .optimiezeSet && optimiezeCommand .getCommandType () == CommandType .SET
164
161
&& !optimiezeCommand .isNoreply () && this .protocol == Protocol .Binary ) {
165
- optimiezeCommand = this . mergeSetCommands (optimiezeCommand , writeQueue , executingCmds ,
162
+ optimiezeCommand = mergeSetCommands (optimiezeCommand , writeQueue , executingCmds ,
166
163
optimiezeCommand .getCommandType (), sendBufferSize );
167
164
}
168
165
return optimiezeCommand ;
@@ -177,7 +174,7 @@ private final Command mergeBuffer(final Command firstCommand, final Queue writeQ
177
174
return lastCommand ;
178
175
}
179
176
180
- final List <Command > commands = this . getLocalList ();
177
+ final List <Command > commands = getLocalList ();
181
178
final ByteBuffer firstBuffer = firstCommand .getIoBuffer ().buf ();
182
179
int totalBytes = firstBuffer .remaining ();
183
180
commands .add (firstCommand );
@@ -201,8 +198,7 @@ private final Command mergeBuffer(final Command firstCommand, final Queue writeQ
201
198
// if it is get_one command,try to merge get commands
202
199
if ((nextCmd .getCommandType () == CommandType .GET_ONE
203
200
|| nextCmd .getCommandType () == CommandType .GETS_ONE ) && this .optimiezeGet ) {
204
- nextCmd =
205
- this .mergeGetCommands (nextCmd , writeQueue , executingCmds , nextCmd .getCommandType ());
201
+ nextCmd = mergeGetCommands (nextCmd , writeQueue , executingCmds , nextCmd .getCommandType ());
206
202
}
207
203
208
204
commands .add (nextCmd );
@@ -269,17 +265,17 @@ public Object getResult() {
269
265
return new String (this .buf , 0 , this .count );
270
266
}
271
267
272
- public void visit (Command command ) {
268
+ public void visit (final Command command ) {
273
269
if (this .wasFirst ) {
274
- this . append (command .getKey ());
270
+ append (command .getKey ());
275
271
this .wasFirst = false ;
276
272
} else {
277
- this . append (" " );
278
- this . append (command .getKey ());
273
+ append (" " );
274
+ append (command .getKey ());
279
275
}
280
276
}
281
277
282
- private void expandCapacity (int minimumCapacity ) {
278
+ private void expandCapacity (final int minimumCapacity ) {
283
279
int newCapacity = (this .buf .length + 1 ) * 2 ;
284
280
if (newCapacity < 0 ) {
285
281
newCapacity = Integer .MAX_VALUE ;
@@ -291,14 +287,14 @@ private void expandCapacity(int minimumCapacity) {
291
287
this .buf = copy ;
292
288
}
293
289
294
- private void append (String str ) {
290
+ private void append (final String str ) {
295
291
int len = str .length ();
296
292
if (len == 0 ) {
297
293
return ;
298
294
}
299
295
int newCount = this .count + len ;
300
296
if (newCount > this .buf .length ) {
301
- this . expandCapacity (newCount );
297
+ expandCapacity (newCount );
302
298
}
303
299
str .getChars (0 , len , this .buf , this .count );
304
300
this .count = newCount ;
@@ -341,7 +337,7 @@ public Object getResult() {
341
337
return resultCommand ;
342
338
}
343
339
344
- public void visit (Command command ) {
340
+ public void visit (final Command command ) {
345
341
346
342
// Encode prev command
347
343
if (this .prevCommand != null ) {
@@ -422,7 +418,7 @@ public Object getResult() {
422
418
return resultCommand ;
423
419
}
424
420
425
- public void visit (Command command ) {
421
+ public void visit (final Command command ) {
426
422
// Encode prev command
427
423
if (this .prevCommand != null ) {
428
424
// first n-1 send getq command
@@ -450,10 +446,10 @@ public void finish() {
450
446
451
447
@ SuppressWarnings ("unchecked" )
452
448
private final Command mergeGetCommands (final Command currentCmd , final Queue writeQueue ,
453
- final Queue <Command > executingCmds , CommandType expectedCommandType ) {
449
+ final Queue <Command > executingCmds , final CommandType expectedCommandType ) {
454
450
Map <Object , Command > mergeCommands = null ;
455
451
int mergeCount = 1 ;
456
- final CommandCollector commandCollector = this . createGetCommandCollector ();
452
+ final CommandCollector commandCollector = createGetCommandCollector ();
457
453
currentCmd .setStatus (OperationStatus .WRITING );
458
454
459
455
commandCollector .visit (currentCmd );
@@ -480,10 +476,12 @@ private final Command mergeGetCommands(final Command currentCmd, final Queue wri
480
476
if (mergeCommands .containsKey (removedCommand .getKey ())) {
481
477
final AssocCommandAware mergedGetCommand =
482
478
(AssocCommandAware ) mergeCommands .get (removedCommand .getKey ());
483
- if (mergedGetCommand .getAssocCommands () == null ) {
484
- mergedGetCommand .setAssocCommands (new ArrayList <Command >(5 ));
479
+ List <Command > assocCommands = mergedGetCommand .getAssocCommands ();
480
+ if (assocCommands == null ) {
481
+ assocCommands = new ArrayList <Command >(5 );
482
+ mergedGetCommand .setAssocCommands (assocCommands );
485
483
}
486
- mergedGetCommand . getAssocCommands () .add (removedCommand );
484
+ assocCommands .add (removedCommand );
487
485
} else {
488
486
commandCollector .visit (nextCmd );
489
487
mergeCommands .put (removedCommand .getKey (), removedCommand );
@@ -500,8 +498,7 @@ private final Command mergeGetCommands(final Command currentCmd, final Queue wri
500
498
if (log .isDebugEnabled ()) {
501
499
log .debug ("Merge optimieze:merge " + mergeCount + " get commands" );
502
500
}
503
- return this .newMergedCommand (mergeCommands , mergeCount , commandCollector ,
504
- expectedCommandType );
501
+ return newMergedCommand (mergeCommands , mergeCount , commandCollector , expectedCommandType );
505
502
}
506
503
}
507
504
@@ -516,7 +513,8 @@ protected BinarySetQCollector initialValue() {
516
513
};
517
514
518
515
private final Command mergeSetCommands (final Command currentCmd , final Queue writeQueue ,
519
- final Queue <Command > executingCmds , CommandType expectedCommandType , int sendBufferSize ) {
516
+ final Queue <Command > executingCmds , final CommandType expectedCommandType ,
517
+ final int sendBufferSize ) {
520
518
int mergeCount = 1 ;
521
519
final CommandCollector commandCollector = BIN_SET_CMD_COLLECTOR_THREAD_LOCAL .get ().reset ();
522
520
currentCmd .setStatus (OperationStatus .WRITING );
@@ -582,7 +580,7 @@ private CommandCollector createGetCommandCollector() {
582
580
}
583
581
}
584
582
585
- private Command newMergedCommand (final Map <Object , Command > mergeCommands , int mergeCount ,
583
+ private Command newMergedCommand (final Map <Object , Command > mergeCommands , final int mergeCount ,
586
584
final CommandCollector commandCollector , final CommandType commandType ) {
587
585
if (this .protocol == Protocol .Text ) {
588
586
String resultKey = (String ) commandCollector .getResult ();
0 commit comments