-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMemcachedClient.cs
755 lines (682 loc) · 35 KB
/
MemcachedClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen
//Permission is hereby granted, free of charge, to any person
//obtaining a copy of this software and associated documentation
//files (the "Software"), to deal in the Software without
//restriction, including without limitation the rights to use,
//copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the
//Software is furnished to do so, subject to the following
//conditions:
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
//OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
//HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
//WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
//FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
//OTHER DEALINGS IN THE SOFTWARE.
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Configuration;
using System.Globalization;
using System.Text;
namespace BeIT.MemCached{
/// <summary>
/// Memcached client main class.
/// Use the static methods Setup and GetInstance to setup and get an instance of the client for use.
/// </summary>
public class MemcachedClient {
#region Static fields and methods.
private static Dictionary<string, MemcachedClient> instances = new Dictionary<string, MemcachedClient>();
private static LogAdapter logger = LogAdapter.GetLogger(typeof(MemcachedClient));
/// <summary>
/// Static method for creating an instance. This method will throw an exception if the name already exists.
/// </summary>
/// <param name="name">The name of the instance.</param>
/// <param name="servers">A list of memcached servers in standard notation: host:port.
/// If port is omitted, the default value of 11211 is used.
/// Both IP addresses and host names are accepted, for example:
/// "localhost", "127.0.0.1", "cache01.example.com:12345", "127.0.0.1:12345", etc.</param>
public static void Setup(string name, string[] servers) {
if (instances.ContainsKey(name)) {
throw new ConfigurationErrorsException("Trying to configure MemcachedClient instance \"" + name + "\" twice.");
}
instances[name] = new MemcachedClient(name, servers);
}
/// <summary>
/// Static method which checks if a given named MemcachedClient instance exists.
/// </summary>
/// <param name="name">The name of the instance.</param>
/// <returns></returns>
public static bool Exists(string name) {
return instances.ContainsKey(name);
}
/// <summary>
/// Static method for getting the default instance named "default".
/// </summary>
private static MemcachedClient defaultInstance = null;
public static MemcachedClient GetInstance() {
return defaultInstance ?? (defaultInstance = GetInstance("default"));
}
/// <summary>
/// Static method for getting an instance.
/// This method will first check for named instances that has been set up programmatically.
/// If no such instance exists, it will check the "beitmemcached" section of the standard
/// config file and see if it can find configuration info for it there.
/// If that also fails, an exception is thrown.
/// </summary>
/// <param name="name">The name of the instance.</param>
/// <returns>The named instance.</returns>
public static MemcachedClient GetInstance(string name) {
MemcachedClient c;
if (instances.TryGetValue(name, out c)) {
return c;
} else {
NameValueCollection config = ConfigurationManager.GetSection("beitmemcached") as NameValueCollection;
if (config != null && !String.IsNullOrEmpty(config.Get(name))) {
Setup(name, config.Get(name).Split(new char[] { ',' }));
return GetInstance(name);
}
throw new ConfigurationErrorsException("Unable to find MemcachedClient instance \"" + name + "\".");
}
}
#endregion
#region Fields, constructors, and private methods.
public readonly string Name;
private readonly ServerPool serverPool;
/// <summary>
/// If you specify a key prefix, it will be appended to all keys before they are sent to the memcached server.
/// They key prefix is not used when calculating which server a key belongs to.
/// </summary>
public string KeyPrefix { get { return keyPrefix; } set { keyPrefix = value; } }
private string keyPrefix = "";
/// <summary>
/// The send receive timeout is used to determine how long the client should wait for data to be sent
/// and received from the server, specified in milliseconds. The default value is 2000.
/// </summary>
public int SendReceiveTimeout { get { return serverPool.SendReceiveTimeout; } set { serverPool.SendReceiveTimeout = value; } }
/// <summary>
/// The min pool size determines the number of sockets the socket pool will keep.
/// Note that no sockets will be created on startup, only on use, so the socket pool will only
/// contain this amount of sockets if the amount of simultaneous requests goes above it.
/// The default value is 5.
/// </summary>
public uint MinPoolSize {
get { return serverPool.MinPoolSize; }
set {
if (value > MaxPoolSize) { throw new ConfigurationErrorsException("MinPoolSize (" + value + ") may not be larger than the MaxPoolSize (" + MaxPoolSize + ")."); }
serverPool.MinPoolSize = value;
}
}
/// <summary>
/// The max pool size determines how large the socket connection pool is allowed to grow.
/// There can be more sockets in use than this amount, but when the extra sockets are returned, they will be destroyed.
/// The default value is 10.
/// </summary>
public uint MaxPoolSize {
get { return serverPool.MaxPoolSize; }
set {
if (value < MinPoolSize) { throw new ConfigurationErrorsException("MaxPoolSize (" + value + ") may not be smaller than the MinPoolSize (" + MinPoolSize + ")."); }
serverPool.MaxPoolSize = value;
}
}
/// <summary>
/// If the pool contains more than the minimum amount of sockets, and a socket is returned that is older than this recycle age
/// that socket will be destroyed instead of put back in the pool. This allows the pool to shrink back to the min pool size after a peak in usage.
/// The default value is 30 minutes.
/// </summary>
public TimeSpan SocketRecycleAge { get { return serverPool.SocketRecycleAge; } set { serverPool.SocketRecycleAge = value; } }
private uint compressionThreshold = 1024*128; //128kb
/// <summary>
/// If an object being stored is larger in bytes than the compression threshold, it will internally be compressed before begin stored,
/// and it will transparently be decompressed when retrieved. Only strings, byte arrays and objects can be compressed.
/// The default value is 1048576 bytes = 1MB.
/// </summary>
public uint CompressionThreshold { get { return compressionThreshold; } set { compressionThreshold = value; } }
//Private constructor
private MemcachedClient(string name, string[] hosts) {
if (String.IsNullOrEmpty(name)) {
throw new ConfigurationErrorsException("Name of MemcachedClient instance cannot be empty.");
}
if (hosts == null || hosts.Length == 0) {
throw new ConfigurationErrorsException("Cannot configure MemcachedClient with empty list of hosts.");
}
Name = name;
serverPool = new ServerPool(hosts);
}
/// <summary>
/// Private key hashing method that uses the modified FNV hash.
/// </summary>
/// <param name="key">The key to hash.</param>
/// <returns>The hashed key.</returns>
private uint hash(string key) {
checkKey(key);
return BitConverter.ToUInt32(new ModifiedFNV1_32().ComputeHash(Encoding.UTF8.GetBytes(key)), 0);
}
/// <summary>
/// Private hashing method for user-supplied hash values.
/// </summary>
/// <param name="hashvalue">The user-supplied hash value to hash.</param>
/// <returns>The hashed value</returns>
private uint hash(uint hashvalue) {
return BitConverter.ToUInt32(new ModifiedFNV1_32().ComputeHash(BitConverter.GetBytes(hashvalue)), 0);
}
/// <summary>
/// Private multi-hashing method.
/// </summary>
/// <param name="keys">An array of keys to hash.</param>
/// <returns>An arrays of hashes.</returns>
private uint[] hash(string[] keys) {
uint[] result = new uint[keys.Length];
for (int i = 0; i < keys.Length; i++) {
result[i] = hash(keys[i]);
}
return result;
}
/// <summary>
/// Private multi-hashing method for user-supplied hash values.
/// </summary>
/// <param name="hashvalues">An array of keys to hash.</param>
/// <returns>An arrays of hashes.</returns>
private uint[] hash(uint[] hashvalues) {
uint[] result = new uint[hashvalues.Length];
for (int i = 0; i < hashvalues.Length; i++) {
result[i] = hash(hashvalues[i]);
}
return result;
}
/// <summary>
/// Private key-checking method.
/// Throws an exception if the key does not conform to memcached protocol requirements:
/// It may not contain whitespace, it may not be null or empty, and it may not be longer than 250 characters.
/// </summary>
/// <param name="key">The key to check.</param>
private void checkKey(string key) {
if (key == null) {
throw new ArgumentNullException("Key may not be null.");
}
if (key.Length == 0) {
throw new ArgumentException("Key may not be empty.");
}
if (key.Length > 250) {
throw new ArgumentException("Key may not be longer than 250 characters.");
}
if (key.Contains(" ") || key.Contains("\n") || key.Contains("\r") || key.Contains("\t") || key.Contains("\f") || key.Contains("\v")) {
throw new ArgumentException("Key may not contain whitespace or control characters.");
}
}
//Private Unix-time converter
private static DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static int getUnixTime(DateTime datetime) {
return (int)(datetime.ToUniversalTime() - epoch).TotalSeconds;
}
#endregion
#region Set, Add, and Replace.
/// <summary>
/// This method corresponds to the "set" command in the memcached protocol.
/// It will unconditionally set the given key to the given value.
/// Using the overloads it is possible to specify an expiry time, either relative as a TimeSpan or
/// absolute as a DateTime. It is also possible to specify a custom hash to override server selection.
/// This method returns true if the value was successfully set.
/// </summary>
public bool Set(string key, object value) { return store("set", key, true, value, hash(key), 0); }
public bool Set(string key, object value, uint hash) { return store("set", key, false, value, this.hash(hash), 0); }
public bool Set(string key, object value, TimeSpan expiry) { return store("set", key, true, value, hash(key), (int)expiry.TotalSeconds); }
public bool Set(string key, object value, uint hash, TimeSpan expiry) { return store("set", key, false, value, this.hash(hash), (int)expiry.TotalSeconds); }
public bool Set(string key, object value, DateTime expiry) { return store("set", key, true, value, hash(key), getUnixTime(expiry)); }
public bool Set(string key, object value, uint hash, DateTime expiry) { return store("set", key, false, value, this.hash(hash), getUnixTime(expiry)); }
/// <summary>
/// This method corresponds to the "add" command in the memcached protocol.
/// It will set the given key to the given value only if the key does not already exist.
/// Using the overloads it is possible to specify an expiry time, either relative as a TimeSpan or
/// absolute as a DateTime. It is also possible to specify a custom hash to override server selection.
/// This method returns true if the value was successfully added.
/// </summary>
public bool Add(string key, object value) { return store("add", key, true, value, hash(key), 0); }
public bool Add(string key, object value, uint hash) { return store("add", key, false, value, this.hash(hash), 0); }
public bool Add(string key, object value, TimeSpan expiry) { return store("add", key, true, value, hash(key), (int)expiry.TotalSeconds); }
public bool Add(string key, object value, uint hash, TimeSpan expiry) { return store("add", key, false, value, this.hash(hash), (int)expiry.TotalSeconds); }
public bool Add(string key, object value, DateTime expiry) { return store("add", key, true, value, hash(key), getUnixTime(expiry)); }
public bool Add(string key, object value, uint hash, DateTime expiry) { return store("add", key, false, value, this.hash(hash), getUnixTime(expiry)); }
/// <summary>
/// This method corresponds to the "replace" command in the memcached protocol.
/// It will set the given key to the given value only if the key already exists.
/// Using the overloads it is possible to specify an expiry time, either relative as a TimeSpan or
/// absolute as a DateTime. It is also possible to specify a custom hash to override server selection.
/// This method returns true if the value was successfully replaced.
/// </summary>
public bool Replace(string key, object value) { return store("replace", key, true, value, hash(key), 0); }
public bool Replace(string key, object value, uint hash) { return store("replace", key, false, value, this.hash(hash), 0); }
public bool Replace(string key, object value, TimeSpan expiry) { return store("replace", key, true, value, hash(key), (int)expiry.TotalSeconds); }
public bool Replace(string key, object value, uint hash, TimeSpan expiry) { return store("replace", key, false, value, this.hash(hash), (int)expiry.TotalSeconds); }
public bool Replace(string key, object value, DateTime expiry) { return store("replace", key, true, value, hash(key), getUnixTime(expiry)); }
public bool Replace(string key, object value, uint hash, DateTime expiry) { return store("replace", key, false, value, this.hash(hash), getUnixTime(expiry)); }
/// <summary>
/// This method corresponds to the "append" command in the memcached protocol.
/// It will append the given value to the given key, if the key already exists.
/// Modifying a key with this command will not change its expiry time.
/// Using the overload it is possible to specify a custom hash to override server selection.
/// </summary>
public bool Append(string key, object value) { return store("append", key, true, value, hash(key)); }
public bool Append(string key, object value, uint hash) { return store("append", key, false, value, this.hash(hash)); }
/// <summary>
/// This method corresponds to the "prepend" command in the memcached protocol.
/// It will prepend the given value to the given key, if the key already exists.
/// Modifying a key with this command will not change its expiry time.
/// Using the overload it is possible to specify a custom hash to override server selection.
/// </summary>
public bool Prepend(string key, object value) { return store("prepend", key, true, value, hash(key)); }
public bool Prepend(string key, object value, uint hash) { return store("prepend", key, false, value, this.hash(hash)); }
public enum CasResult {
Stored = 0,
NotStored = 1,
Exists = 2,
NotFound = 3
}
public CasResult CheckAndSet(string key, object value, ulong unique) { return store(key, true, value, hash(key), 0, unique); }
public CasResult CheckAndSet(string key, object value, uint hash, ulong unique) { return store(key, false, value, this.hash(hash), 0, unique); }
public CasResult CheckAndSet(string key, object value, TimeSpan expiry, ulong unique) { return store(key, true, value, hash(key), (int)expiry.TotalSeconds, unique); }
public CasResult CheckAndSet(string key, object value, uint hash, TimeSpan expiry, ulong unique) { return store(key, false, value, this.hash(hash), (int)expiry.TotalSeconds, unique); }
public CasResult CheckAndSet(string key, object value, DateTime expiry, ulong unique) { return store(key, true, value, hash(key), getUnixTime(expiry), unique); }
public CasResult CheckAndSet(string key, object value, uint hash, DateTime expiry, ulong unique) { return store(key, false, value, this.hash(hash), getUnixTime(expiry), unique); }
//Private overload for the Set, Add and Replace commands.
private bool store(string command, string key, bool keyIsChecked, object value, uint hash, int expiry) {
return store(command, key, keyIsChecked, value, hash, expiry, 0).StartsWith("STORED");
}
//Private overload for the Append and Prepend commands.
private bool store(string command, string key, bool keyIsChecked, object value, uint hash) {
return store(command, key, keyIsChecked, value, hash, 0, 0).StartsWith("STORED");
}
//Private overload for the Cas command.
private CasResult store(string key, bool keyIsChecked, object value, uint hash, int expiry, ulong unique) {
string result = store("cas", key, keyIsChecked, value, hash, expiry, unique);
if (result.StartsWith("STORED")) {
return CasResult.Stored;
} else if (result.StartsWith("EXISTS")) {
return CasResult.Exists;
} else if (result.StartsWith("NOT_FOUND")) {
return CasResult.NotFound;
}
return CasResult.NotStored;
}
//Private common store method.
private string store(string command, string key, bool keyIsChecked, object value, uint hash, int expiry, ulong unique) {
if (!keyIsChecked) {
checkKey(key);
}
return serverPool.Execute<string>(hash, "", delegate(PooledSocket socket) {
SerializedType type;
byte[] bytes;
//Serialize object efficiently, store the datatype marker in the flags property.
try {
bytes = Serializer.Serialize(value, out type, CompressionThreshold);
} catch (Exception e) {
//If serialization fails, return false;
logger.Error("Error serializing object for key '" + key + "'.", e);
return "";
}
//Create commandline
string commandline = "";
switch(command) {
case "set":
case "add":
case "replace":
commandline = command + " " + keyPrefix + key + " " + (ushort)type + " " + expiry + " " + bytes.Length + "\r\n";
break;
case "append":
case "prepend":
commandline = command + " " + keyPrefix + key + " 0 0 " + bytes.Length + "\r\n";
break;
case "cas":
commandline = command + " " + keyPrefix + key + " " + (ushort)type + " " + expiry + " " + bytes.Length + " " + unique + "\r\n";
break;
}
//Write commandline and serialized object.
socket.Write(commandline);
socket.Write(bytes);
socket.Write("\r\n");
return socket.ReadResponse();
});
}
#endregion
#region Get
/// <summary>
/// This method corresponds to the "get" command in the memcached protocol.
/// It will return the value for the given key. It will return null if the key did not exist,
/// or if it was unable to retrieve the value.
/// If given an array of keys, it will return a same-sized array of objects with the corresponding
/// values.
/// Use the overload to specify a custom hash to override server selection.
/// </summary>
public object Get(string key) { ulong i; return get("get", key, true, hash(key), out i); }
public object Get(string key, uint hash) { ulong i; return get("get", key, false, this.hash(hash), out i); }
/// <summary>
/// This method corresponds to the "gets" command in the memcached protocol.
/// It works exactly like the Get method, but it will also return the cas unique value for the item.
/// </summary>
public object Gets(string key, out ulong unique) { return get("gets", key, true, hash(key), out unique); }
public object Gets(string key, uint hash, out ulong unique) { return get("gets", key, false, this.hash(hash), out unique); }
private object get(string command, string key, bool keyIsChecked, uint hash, out ulong unique) {
if (!keyIsChecked) {
checkKey(key);
}
ulong __unique = 0;
object value = serverPool.Execute<object>(hash, null, delegate(PooledSocket socket) {
socket.Write(command + " " + keyPrefix + key + "\r\n");
object _value;
ulong _unique;
if (readValue(socket, out _value, out key, out _unique)) {
socket.ReadLine(); //Read the trailing END.
}
__unique = _unique;
return _value;
});
unique = __unique;
return value;
}
/// <summary>
/// This method executes a multi-get. It will group the keys by server and execute a single get
/// for each server, and combine the results. The returned object[] will have the same size as
/// the given key array, and contain either null or a value at each position according to
/// the key on that position.
/// </summary>
public object[] Get(string[] keys) { ulong[] uniques; return get("get", keys, true, hash(keys), out uniques); }
public object[] Get(string[] keys, uint[] hashes) { ulong[] uniques; return get("get", keys, false, hash(hashes), out uniques); }
/// <summary>
/// This method does a multi-gets. It functions exactly like the multi-get method, but it will
/// also return an array of cas unique values as an out parameter.
/// </summary>
public object[] Gets(string[] keys, out ulong[] uniques) { return get("gets", keys, true, hash(keys), out uniques); }
public object[] Gets(string[] keys, uint[] hashes, out ulong[] uniques) { return get("gets", keys, false, hash(hashes), out uniques); }
private object[] get(string command, string[] keys, bool keysAreChecked, uint[] hashes, out ulong[] uniques) {
//Check arguments.
if (keys == null || hashes == null) {
throw new ArgumentException("Keys and hashes arrays must not be null.");
}
if (keys.Length != hashes.Length) {
throw new ArgumentException("Keys and hashes arrays must be of the same length.");
}
uniques = new ulong[keys.Length];
//Avoid going through the server grouping if there's only one key.
if (keys.Length == 1) {
return new object[] { get(command, keys[0], keysAreChecked, hashes[0], out uniques[0]) };
}
//Check keys.
if (!keysAreChecked) {
for (int i = 0; i < keys.Length; i++) {
checkKey(keys[i]);
}
}
//Group the keys/hashes by server(pool)
Dictionary<SocketPool, Dictionary<string, List<int>>> dict = new Dictionary<SocketPool, Dictionary<string, List<int>>>();
for (int i = 0; i < keys.Length; i++) {
Dictionary<string, List<int>> getsForServer;
SocketPool pool = serverPool.GetSocketPool(hashes[i]);
if (!dict.TryGetValue(pool, out getsForServer)) {
dict[pool] = getsForServer = new Dictionary<string, List<int>>();
}
List<int> positions;
if(!getsForServer.TryGetValue(keys[i], out positions)){
getsForServer[keys[i]] = positions = new List<int>();
}
positions.Add(i);
}
//Get the values
object[] returnValues = new object[keys.Length];
ulong[] _uniques = new ulong[keys.Length];
foreach (KeyValuePair<SocketPool, Dictionary<string, List<int>>> kv in dict) {
serverPool.Execute(kv.Key, delegate(PooledSocket socket){
//Build the get request
StringBuilder getRequest = new StringBuilder(command);
foreach (KeyValuePair<string, List<int>> key in kv.Value) {
getRequest.Append(" ");
getRequest.Append(keyPrefix);
getRequest.Append(key.Key);
}
getRequest.Append("\r\n");
//Send get request
socket.Write(getRequest.ToString());
//Read values, one by one
object gottenObject;
string gottenKey;
ulong unique;
while (readValue(socket, out gottenObject, out gottenKey, out unique)) {
foreach(int position in kv.Value[gottenKey]) {
returnValues[position] = gottenObject;
_uniques[position] = unique;
}
}
});
}
uniques = _uniques;
return returnValues;
}
//Private method for reading results of the "get" command.
private bool readValue(PooledSocket socket, out object value, out string key, out ulong unique) {
string response = socket.ReadResponse();
string[] parts = response.Split(' '); //Result line from server: "VALUE <key> <flags> <bytes> <cas unique>"
if (parts[0] == "VALUE") {
key = parts[1];
SerializedType type = (SerializedType)Enum.Parse(typeof(SerializedType), parts[2]);
byte[] bytes = new byte[Convert.ToUInt32(parts[3], CultureInfo.InvariantCulture)];
if (parts.Length > 4) {
unique = Convert.ToUInt64(parts[4]);
} else {
unique = 0;
}
socket.Read(bytes);
socket.SkipUntilEndOfLine(); //Skip the trailing \r\n
try {
value = Serializer.DeSerialize(bytes, type);
} catch (Exception e) {
//If deserialization fails, return null
value = null;
logger.Error("Error deserializing object for key '" + key + "' of type " + type + ".", e);
}
return true;
} else {
key = null;
value = null;
unique = 0;
return false;
}
}
#endregion
#region Delete
/// <summary>
/// This method corresponds to the "delete" command in the memcache protocol.
/// It will immediately delete the given key and corresponding value.
/// Use the overloads to specify an amount of time the item should be in the delete queue on the server,
/// or to specify a custom hash to override server selection.
/// </summary>
public bool Delete(string key) { return delete(key, true, hash(key), 0); }
public bool Delete(string key, uint hash) { return delete(key, false, this.hash(hash), 0); }
public bool Delete(string key, TimeSpan delay) { return delete(key, true, hash(key), (int)delay.TotalSeconds); }
public bool Delete(string key, uint hash, TimeSpan delay) { return delete(key, false, this.hash(hash), (int)delay.TotalSeconds); }
public bool Delete(string key, DateTime delay) { return delete(key, true, hash(key), getUnixTime(delay)); }
public bool Delete(string key, uint hash, DateTime delay) { return delete(key, false, this.hash(hash), getUnixTime(delay)); }
private bool delete(string key, bool keyIsChecked, uint hash, int time) {
if (!keyIsChecked) {
checkKey(key);
}
return serverPool.Execute<bool>(hash, false, delegate(PooledSocket socket){
string commandline;
if (time == 0) {
commandline = "delete " + keyPrefix + key + "\r\n";
} else {
commandline = "delete " + keyPrefix + key + " " + time + "\r\n";
}
socket.Write(commandline);
return socket.ReadResponse().StartsWith("DELETED");
});
}
#endregion
#region Increment Decrement
/// <summary>
/// This method sets the key to the given value, and stores it in a format such that the methods
/// Increment and Decrement can be used successfully on it, i.e. decimal representation of a 64-bit unsigned integer.
/// Using the overloads it is possible to specify an expiry time, either relative as a TimeSpan or
/// absolute as a DateTime. It is also possible to specify a custom hash to override server selection.
/// This method returns true if the counter was successfully set.
/// </summary>
public bool SetCounter(string key, ulong value) { return Set(key, value.ToString(CultureInfo.InvariantCulture)); }
public bool SetCounter(string key, ulong value, uint hash) { return Set(key, value.ToString(CultureInfo.InvariantCulture), this.hash(hash)); }
public bool SetCounter(string key, ulong value, TimeSpan expiry) { return Set(key, value.ToString(CultureInfo.InvariantCulture), expiry); }
public bool SetCounter(string key, ulong value, uint hash, TimeSpan expiry) { return Set(key, value.ToString(CultureInfo.InvariantCulture), this.hash(hash), expiry); }
public bool SetCounter(string key, ulong value, DateTime expiry) { return Set(key, value.ToString(CultureInfo.InvariantCulture), expiry); }
public bool SetCounter(string key, ulong value, uint hash, DateTime expiry) { return Set(key, value.ToString(CultureInfo.InvariantCulture), this.hash(hash), expiry); }
/// <summary>
/// This method returns the value for the given key as a ulong?, a nullable 64-bit unsigned integer.
/// It returns null if the item did not exist, was not stored properly as per the SetCounter method, or
/// if it was not able to successfully retrieve the item.
/// </summary>
public ulong? GetCounter(string key) {return getCounter(key, true, hash(key));}
public ulong? GetCounter(string key, uint hash) { return getCounter(key, false, this.hash(hash)); }
private ulong? getCounter(string key, bool keyIsChecked, uint hash) {
ulong parsedLong, unique;
return ulong.TryParse(get("get", key, keyIsChecked, hash, out unique) as string, out parsedLong) ? (ulong?)parsedLong : null;
}
public ulong?[] GetCounter(string[] keys) {return getCounter(keys, true, hash(keys));}
public ulong?[] GetCounter(string[] keys, uint[] hashes) { return getCounter(keys, false, hash(hashes)); }
private ulong?[] getCounter(string[] keys, bool keysAreChecked, uint[] hashes) {
ulong?[] results = new ulong?[keys.Length];
ulong[] uniques;
object[] values = get("get", keys, keysAreChecked, hashes, out uniques);
for (int i = 0; i < values.Length; i++) {
ulong parsedLong;
results[i] = ulong.TryParse(values[i] as string, out parsedLong) ? (ulong?)parsedLong : null;
}
return results;
}
/// <summary>
/// This method corresponds to the "incr" command in the memcached protocol.
/// It will increase the item with the given value and return the new value.
/// It will return null if the item did not exist, was not stored properly as per the SetCounter method, or
/// if it was not able to successfully retrieve the item.
/// </summary>
public ulong? Increment(string key, ulong value) { return incrementDecrement("incr", key, true, value, hash(key)); }
public ulong? Increment(string key, ulong value, uint hash) { return incrementDecrement("incr", key, false, value, this.hash(hash)); }
/// <summary>
/// This method corresponds to the "decr" command in the memcached protocol.
/// It will decrease the item with the given value and return the new value. If the new value would be
/// less than 0, it will be set to 0, and the method will return 0.
/// It will return null if the item did not exist, was not stored properly as per the SetCounter method, or
/// if it was not able to successfully retrieve the item.
/// </summary>
public ulong? Decrement(string key, ulong value) { return incrementDecrement("decr", key, true, value, hash(key)); }
public ulong? Decrement(string key, ulong value, uint hash) { return incrementDecrement("decr", key, false, value, this.hash(hash)); }
private ulong? incrementDecrement(string cmd, string key, bool keyIsChecked, ulong value, uint hash) {
if (!keyIsChecked) {
checkKey(key);
}
return serverPool.Execute<ulong?>(hash, null, delegate(PooledSocket socket) {
string command = cmd + " " + keyPrefix + key + " " + value + "\r\n";
socket.Write(command);
string response = socket.ReadResponse();
if (response.StartsWith("NOT_FOUND")) {
return null;
} else {
return Convert.ToUInt64(response.TrimEnd('\0', '\r', '\n'));
}
});
}
#endregion
#region Flush All
/// <summary>
/// This method corresponds to the "flush_all" command in the memcached protocol.
/// When this method is called, it will send the flush command to all servers, thereby deleting
/// all items on all servers.
/// Use the overloads to set a delay for the flushing. If the parameter staggered is set to true,
/// the client will increase the delay for each server, i.e. the first will flush after delay*0,
/// the second after delay*1, the third after delay*2, etc. If set to false, all servers will flush
/// after the same delay.
/// It returns true if the command was successful on all servers.
/// </summary>
public bool FlushAll() { return FlushAll(TimeSpan.Zero, false); }
public bool FlushAll(TimeSpan delay) { return FlushAll(delay, false); }
public bool FlushAll(TimeSpan delay, bool staggered) {
bool noerrors = true;
uint count = 0;
foreach (SocketPool pool in serverPool.HostList) {
serverPool.Execute(pool, delegate(PooledSocket socket) {
uint delaySeconds = (staggered ? (uint)delay.TotalSeconds * count : (uint)delay.TotalSeconds);
//Funnily enough, "flush_all 0" has no effect, you have to send "flush_all" to flush immediately.
socket.Write("flush_all " + (delaySeconds==0?"":delaySeconds.ToString()) + "\r\n");
if (!socket.ReadResponse().StartsWith("OK")) {
noerrors = false;
}
count++;
});
}
return noerrors;
}
#endregion
#region Stats
/// <summary>
/// This method corresponds to the "stats" command in the memcached protocol.
/// It will send the stats command to all servers, and it will return a Dictionary for each server
/// containing the results of the command.
/// </summary>
public Dictionary<string, Dictionary<string, string>> Stats() {
Dictionary<string, Dictionary<string, string>> results = new Dictionary<string, Dictionary<string, string>>();
foreach (SocketPool pool in serverPool.HostList) {
results.Add(pool.Host, stats(pool));
}
return results;
}
/// <summary>
/// This method corresponds to the "stats" command in the memcached protocol.
/// It will send the stats command to the server that corresponds to the given key, hash or host,
/// and return a Dictionary containing the results of the command.
/// </summary>
public Dictionary<string, string> Stats(string key) { return Stats(hash(key)); }
public Dictionary<string, string> Stats(uint hash) { return stats(serverPool.GetSocketPool(this.hash(hash))); }
public Dictionary<string, string> StatsByHost(string host) { return stats(serverPool.GetSocketPool(host)); }
private Dictionary<string, string> stats(SocketPool pool) {
if (pool == null) {
return null;
}
Dictionary<string, string> result = new Dictionary<string, string>();
serverPool.Execute(pool, delegate(PooledSocket socket) {
socket.Write("stats\r\n");
string line;
while (!(line = socket.ReadResponse().TrimEnd('\0', '\r', '\n')).StartsWith("END")) {
string[] s = line.Split(' ');
result.Add(s[1], s[2]);
}
});
return result;
}
#endregion
#region Status
/// <summary>
/// This method retrives the status from the serverpool. It checks the connection to all servers
/// and returns usage statistics for each server.
/// </summary>
public Dictionary<string, Dictionary<string, string>> Status() {
Dictionary<string, Dictionary<string, string>> results = new Dictionary<string, Dictionary<string, string>>();
foreach (SocketPool pool in serverPool.HostList) {
Dictionary<string, string> result = new Dictionary<string, string>();
if (serverPool.Execute<bool>(pool, false, delegate { return true; })) {
result.Add("Status", "Ok");
} else {
result.Add("Status", "Dead, next retry at: " + pool.DeadEndPointRetryTime);
}
result.Add("Sockets in pool", pool.Poolsize.ToString());
result.Add("Acquired sockets", pool.Acquired.ToString());
result.Add("Sockets reused", pool.ReusedSockets.ToString());
result.Add("New sockets created", pool.NewSockets.ToString());
result.Add("New sockets failed", pool.FailedNewSockets.ToString());
result.Add("Sockets died in pool", pool.DeadSocketsInPool.ToString());
result.Add("Sockets died on return", pool.DeadSocketsOnReturn.ToString());
result.Add("Dirty sockets on return", pool.DirtySocketsOnReturn.ToString());
results.Add(pool.Host, result);
}
return results;
}
#endregion
}
}