3 * TODO: keys are currently assumed <= sizeof(void *). Key target never freed.
13 #include <urcu-defer.h>
15 #include <arch_atomic.h>
17 #include <urcu/jhash.h>
23 * Maximum number of hash table buckets: 256M on 64-bit.
24 * Should take about 512MB max if we assume 1 node per 4 buckets.
26 #define MAX_HT_BUCKETS ((256 << 10) / sizeof(void *))
29 #define NODE_STOLEN (1 << 0)
34 struct rcu_ht_node
*next
;
41 struct rcu_ht_node
**tbl
;
43 void (*free_fct
)(void *data
); /* fct to free data */
47 pthread_mutex_t resize_mutex
; /* resize mutex: add/del mutex */
48 int resize_ongoing
; /* fast-path resize check */
51 struct rcu_ht
*ht_new(ht_hash_fct hash_fct
, void (*free_fct
)(void *data
),
52 unsigned long init_size
, uint32_t keylen
,
57 ht
= calloc(1, sizeof(struct rcu_ht
));
58 ht
->hash_fct
= hash_fct
;
59 ht
->free_fct
= free_fct
;
60 ht
->size
= init_size
; /* shared */
62 ht
->hashseed
= hashseed
;
63 /* this mutex should not nest in read-side C.S. */
64 pthread_mutex_init(&ht
->resize_mutex
, NULL
);
65 ht
->resize_ongoing
= 0; /* shared */
66 ht
->tbl
= calloc(init_size
, sizeof(struct rcu_ht_node
*));
70 void *ht_lookup(struct rcu_ht
*ht
, void *key
)
73 struct rcu_ht_node
*node
;
76 hash
= ht
->hash_fct(key
, ht
->keylen
, ht
->hashseed
) % ht
->size
;
77 smp_read_barrier_depends(); /* read size before links */
80 node
= rcu_dereference(ht
->tbl
[hash
]);
86 if (node
->key
== key
) {
90 node
= rcu_dereference(node
->next
);
98 * Will re-try until either:
99 * - The key is already there (-EEXIST)
100 * - We successfully add the key at the head of a table bucket.
102 int ht_add(struct rcu_ht
*ht
, void *key
, void *data
)
104 struct rcu_ht_node
*node
, *old_head
, *new_head
;
108 new_head
= calloc(1, sizeof(struct rcu_ht_node
));
110 new_head
->data
= data
;
112 /* here comes the fun and tricky part.
113 * Add at the beginning with a cmpxchg.
114 * Hold a read lock between the moment the first element is read
115 * and the nodes traversal (to find duplicates). This ensures
116 * the head pointer has not been reclaimed when cmpxchg is done.
117 * Always adding at the head ensures that we would have to
118 * re-try if a new item has been added concurrently. So we ensure that
119 * we never add duplicates. */
123 if (unlikely(LOAD_SHARED(ht
->resize_ongoing
))) {
126 * Wait for resize to complete before continuing.
128 ret
= pthread_mutex_lock(&ht
->resize_mutex
);
130 ret
= pthread_mutex_unlock(&ht
->resize_mutex
);
135 hash
= ht
->hash_fct(key
, ht
->keylen
, ht
->hashseed
)
136 % LOAD_SHARED(ht
->size
);
138 old_head
= node
= rcu_dereference(ht
->tbl
[hash
]);
143 if (node
->key
== key
) {
147 node
= rcu_dereference(node
->next
);
149 new_head
->next
= old_head
;
150 if (rcu_cmpxchg_pointer(&ht
->tbl
[hash
], old_head
, new_head
) != old_head
)
156 /* restart loop, release and re-take the read lock to be kind to GP */
163 * Restart until we successfully remove the entry, or no entry is left
164 * ((void *)(unsigned long)-ENOENT).
165 * Deal with concurrent stealers by doing an extra verification pass to check
166 * that no element in the list are still pointing to the element stolen.
167 * This could happen if two concurrent steal for consecutive objects are
168 * executed. A pointer to an object being stolen could be saved by the
169 * concurrent stealer for the previous object.
170 * Also, given that in this precise scenario, another stealer can also want to
171 * delete the doubly-referenced object; use a "stolen" flag to let only one
172 * stealer delete the object.
174 void *ht_steal(struct rcu_ht
*ht
, void *key
)
176 struct rcu_ht_node
**prev
, *node
, *del_node
= NULL
;
184 if (unlikely(LOAD_SHARED(ht
->resize_ongoing
))) {
187 * Wait for resize to complete before continuing.
189 ret
= pthread_mutex_lock(&ht
->resize_mutex
);
191 ret
= pthread_mutex_unlock(&ht
->resize_mutex
);
196 hash
= ht
->hash_fct(key
, ht
->keylen
, ht
->hashseed
)
197 % LOAD_SHARED(ht
->size
);
199 prev
= &ht
->tbl
[hash
];
200 node
= rcu_dereference(*prev
);
209 if (node
->key
== key
) {
213 node
= rcu_dereference(*prev
);
218 * Another concurrent thread stole it ? If so, let it deal with
219 * this. Assume NODE_STOLEN is the only flag. If this changes,
220 * read flags before cmpxchg.
222 if (cmpxchg(&node
->flags
, 0, NODE_STOLEN
) != 0)
226 /* Found it ! pointer to object is in "prev" */
227 if (rcu_cmpxchg_pointer(prev
, node
, node
->next
) == node
)
233 * From that point, we own node. Note that there can still be concurrent
234 * RCU readers using it. We can free it outside of read lock after a GP.
238 data
= del_node
->data
;
239 call_rcu(free
, del_node
);
243 data
= (void *)(unsigned long)-ENOENT
;
247 /* restart loop, release and re-take the read lock to be kind to GP */
253 int ht_delete(struct rcu_ht
*ht
, void *key
)
257 data
= ht_steal(ht
, key
);
258 if (data
&& data
!= (void *)(unsigned long)-ENOENT
) {
260 call_rcu(ht
->free_fct
, data
);
267 /* Delete all old elements. Allow concurrent writer accesses. */
268 int ht_delete_all(struct rcu_ht
*ht
)
271 struct rcu_ht_node
**prev
, *node
, *inext
;
276 * Mutual exclusion with resize operations, but leave add/steal execute
277 * concurrently. This is OK because we operate only on the heads.
279 ret
= pthread_mutex_lock(&ht
->resize_mutex
);
282 for (i
= 0; i
< ht
->size
; i
++) {
286 * Cut the head. After that, we own the first element.
288 node
= rcu_xchg_pointer(prev
, NULL
);
294 * We manage a list shared with concurrent writers and readers.
295 * Note that a concurrent add may or may not be deleted by us,
296 * depending if it arrives before or after the head is cut.
297 * "node" points to our first node. Remove first elements
304 inext
= rcu_xchg_pointer(prev
, NULL
);
306 * "node" is the first element of the list we have cut.
307 * We therefore own it, no concurrent writer may delete
308 * it. There can only be concurrent lookups. Concurrent
309 * add can only be done on a bucket head, but we've cut
310 * it already. inext is also owned by us, because we
311 * have exchanged it for "NULL". It will therefore be
312 * safe to use it after a G.P.
316 call_rcu(ht
->free_fct
, node
->data
);
317 call_rcu(free
, node
);
326 ret
= pthread_mutex_unlock(&ht
->resize_mutex
);
332 * Should only be called when no more concurrent readers nor writers can
333 * possibly access the table.
335 int ht_destroy(struct rcu_ht
*ht
)
339 ret
= ht_delete_all(ht
);
345 static void ht_resize_grow(struct rcu_ht
*ht
)
347 unsigned long i
, new_size
, old_size
;
348 struct rcu_ht_node
**new_tbl
, **old_tbl
;
349 struct rcu_ht_node
*node
, *new_node
, *tmp
;
354 if (old_size
== MAX_HT_BUCKETS
)
358 new_size
= old_size
<< 1;
359 new_tbl
= calloc(new_size
, sizeof(struct rcu_ht_node
*));
361 for (i
= 0; i
< old_size
; i
++) {
363 * Re-hash each entry, insert in new table.
364 * It's important that a reader looking for a key _will_ find it
365 * if it's in the table.
366 * Copy each node. (just the node, not ->data)
370 hash
= ht
->hash_fct(node
->key
, ht
->keylen
, ht
->hashseed
)
372 new_node
= malloc(sizeof(struct rcu_ht_node
));
373 new_node
->key
= node
->key
;
374 new_node
->data
= node
->data
;
375 new_node
->next
= new_tbl
[hash
]; /* add to head */
376 new_tbl
[hash
] = new_node
;
382 smp_wmb(); /* write links and table before changing size */
383 STORE_SHARED(ht
->size
, new_size
);
385 /* Ensure all concurrent lookups use new size and table */
388 for (i
= 0; i
< old_size
; i
++) {
399 static void ht_resize_shrink(struct rcu_ht
*ht
)
401 unsigned long i
, new_size
;
402 struct rcu_ht_node
**new_tbl
;
403 struct rcu_ht_node
**prev
, *node
;
408 new_size
= ht
->size
>> 1;
410 for (i
= 0; i
< new_size
; i
++) {
411 /* Link end with first entry of i + new_size */
418 *prev
= ht
->tbl
[i
+ new_size
];
420 smp_wmb(); /* write links before changing size */
421 STORE_SHARED(ht
->size
, new_size
);
423 /* Ensure all concurrent lookups use new size */
426 new_tbl
= realloc(ht
->tbl
, new_size
* sizeof(struct rcu_ht_node
*));
427 /* shrinking, pointers should not move */
428 assert(new_tbl
== ht
->tbl
);
432 * growth: >0: *2, <0: /2
434 void ht_resize(struct rcu_ht
*ht
, int growth
)
438 ret
= pthread_mutex_lock(&ht
->resize_mutex
);
440 STORE_SHARED(ht
->resize_ongoing
, 1);
442 /* All add/remove are waiting on the mutex. */
446 ht_resize_shrink(ht
);
448 STORE_SHARED(ht
->resize_ongoing
, 0);
449 ret
= pthread_mutex_unlock(&ht
->resize_mutex
);
454 * Expects keys <= than pointer size to be encoded in the pointer itself.
456 uint32_t ht_jhash(void *key
, uint32_t length
, uint32_t initval
)
461 if (length
<= sizeof(void *))
465 ret
= jhash(vkey
, length
, initval
);