|
|
|
@ -876,13 +876,23 @@ static void |
|
|
|
deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, const toku::comparator &cmp) { |
|
|
|
int r; |
|
|
|
int n_in_this_buffer = rbuf_int(rbuf); |
|
|
|
int32_t *fresh_offsets = NULL, *stale_offsets = NULL; |
|
|
|
int32_t *broadcast_offsets = NULL; |
|
|
|
int32_t *fresh_offsets = nullptr, *stale_offsets = nullptr; |
|
|
|
int32_t *broadcast_offsets = nullptr; |
|
|
|
int nfresh = 0, nstale = 0; |
|
|
|
int nbroadcast_offsets = 0; |
|
|
|
XMALLOC_N(n_in_this_buffer, stale_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, fresh_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, broadcast_offsets); |
|
|
|
|
|
|
|
// Only sort buffers if we have a valid comparison function. In certain scenarios,
|
|
|
|
// like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
|
|
|
|
// for simple inspection and don't actually require that the message buffers are
|
|
|
|
// properly sorted. This is very ugly, but correct.
|
|
|
|
const bool sort_buffers = cmp.valid(); |
|
|
|
|
|
|
|
if (sort_buffers) { |
|
|
|
XMALLOC_N(n_in_this_buffer, stale_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, fresh_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, broadcast_offsets); |
|
|
|
} |
|
|
|
|
|
|
|
bnc->msg_buffer.resize(rbuf->size + 64); |
|
|
|
for (int i = 0; i < n_in_this_buffer; i++) { |
|
|
|
bytevec key; ITEMLEN keylen; |
|
|
|
@ -896,21 +906,24 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, const tok |
|
|
|
xids_create_from_buffer(rbuf, &xids); |
|
|
|
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */ |
|
|
|
rbuf_bytes(rbuf, &val, &vallen); |
|
|
|
int32_t *dest; |
|
|
|
if (ft_msg_type_applies_once(type)) { |
|
|
|
if (is_fresh) { |
|
|
|
dest = &fresh_offsets[nfresh]; |
|
|
|
nfresh++; |
|
|
|
int32_t *dest = nullptr; |
|
|
|
if (sort_buffers) { |
|
|
|
if (ft_msg_type_applies_once(type)) { |
|
|
|
if (is_fresh) { |
|
|
|
dest = &fresh_offsets[nfresh]; |
|
|
|
nfresh++; |
|
|
|
} else { |
|
|
|
dest = &stale_offsets[nstale]; |
|
|
|
nstale++; |
|
|
|
} |
|
|
|
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) { |
|
|
|
dest = &broadcast_offsets[nbroadcast_offsets]; |
|
|
|
nbroadcast_offsets++; |
|
|
|
} else { |
|
|
|
dest = &stale_offsets[nstale]; |
|
|
|
nstale++; |
|
|
|
abort(); |
|
|
|
} |
|
|
|
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) { |
|
|
|
dest = &broadcast_offsets[nbroadcast_offsets]; |
|
|
|
nbroadcast_offsets++; |
|
|
|
} else { |
|
|
|
abort(); |
|
|
|
} |
|
|
|
|
|
|
|
// TODO: Function to parse stuff out of an rbuf into an FT_MSG
|
|
|
|
DBT k, v; |
|
|
|
FT_MSG_S msg = { |
|
|
|
@ -922,17 +935,19 @@ deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, const tok |
|
|
|
} |
|
|
|
invariant(rbuf->ndone == rbuf->size); |
|
|
|
|
|
|
|
struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer); |
|
|
|
r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->fresh_message_tree.destroy(); |
|
|
|
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer); |
|
|
|
r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->stale_message_tree.destroy(); |
|
|
|
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer); |
|
|
|
bnc->broadcast_list.destroy(); |
|
|
|
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer); |
|
|
|
if (sort_buffers) { |
|
|
|
struct toku_msg_buffer_key_msn_cmp_extra extra(cmp, &bnc->msg_buffer); |
|
|
|
r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(fresh_offsets, nfresh, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->fresh_message_tree.destroy(); |
|
|
|
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer); |
|
|
|
r = toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp>::mergesort_r(stale_offsets, nstale, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->stale_message_tree.destroy(); |
|
|
|
bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, n_in_this_buffer); |
|
|
|
bnc->broadcast_list.destroy(); |
|
|
|
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// effect: deserialize a single message from rbuf and enqueue the result into the given message buffer
|
|
|
|
@ -1750,19 +1765,27 @@ deserialize_and_upgrade_internal_node(FTNODE node, |
|
|
|
MSN highest_msn; |
|
|
|
highest_msn.msn = 0; |
|
|
|
|
|
|
|
// Only sort buffers if we have a valid comparison function. In certain scenarios,
|
|
|
|
// like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes
|
|
|
|
// for simple inspection and don't actually require that the message buffers are
|
|
|
|
// properly sorted. This is very ugly, but correct.
|
|
|
|
const bool sort_buffers = bfe->ft->cmp.valid(); |
|
|
|
|
|
|
|
// Deserialize de-compressed buffers.
|
|
|
|
for (int i = 0; i < node->n_children; ++i) { |
|
|
|
NONLEAF_CHILDINFO bnc = BNC(node, i); |
|
|
|
int n_in_this_buffer = rbuf_int(rb); // 22. node count
|
|
|
|
|
|
|
|
int32_t *fresh_offsets = NULL; |
|
|
|
int32_t *broadcast_offsets = NULL; |
|
|
|
int32_t *fresh_offsets = nullptr; |
|
|
|
int32_t *broadcast_offsets = nullptr; |
|
|
|
int nfresh = 0; |
|
|
|
int nbroadcast_offsets = 0; |
|
|
|
|
|
|
|
// We skip 'stale' offsets for upgraded nodes.
|
|
|
|
XMALLOC_N(n_in_this_buffer, fresh_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, broadcast_offsets); |
|
|
|
if (sort_buffers) { |
|
|
|
XMALLOC_N(n_in_this_buffer, fresh_offsets); |
|
|
|
XMALLOC_N(n_in_this_buffer, broadcast_offsets); |
|
|
|
} |
|
|
|
|
|
|
|
// Atomically decrement the header's MSN count by the number
|
|
|
|
// of messages in the buffer.
|
|
|
|
@ -1785,15 +1808,17 @@ deserialize_and_upgrade_internal_node(FTNODE node, |
|
|
|
rbuf_bytes(rb, &val, &vallen); // 26. value
|
|
|
|
|
|
|
|
// <CER> can we factor this out?
|
|
|
|
int32_t *dest; |
|
|
|
if (ft_msg_type_applies_once(type)) { |
|
|
|
dest = &fresh_offsets[nfresh]; |
|
|
|
nfresh++; |
|
|
|
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) { |
|
|
|
dest = &broadcast_offsets[nbroadcast_offsets]; |
|
|
|
nbroadcast_offsets++; |
|
|
|
} else { |
|
|
|
abort(); |
|
|
|
int32_t *dest = nullptr; |
|
|
|
if (sort_buffers) { |
|
|
|
if (ft_msg_type_applies_once(type)) { |
|
|
|
dest = &fresh_offsets[nfresh]; |
|
|
|
nfresh++; |
|
|
|
} else if (ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type)) { |
|
|
|
dest = &broadcast_offsets[nbroadcast_offsets]; |
|
|
|
nbroadcast_offsets++; |
|
|
|
} else { |
|
|
|
abort(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Increment our MSN, the last message should have the
|
|
|
|
@ -1809,14 +1834,16 @@ deserialize_and_upgrade_internal_node(FTNODE node, |
|
|
|
xids_destroy(&xids); |
|
|
|
} |
|
|
|
|
|
|
|
struct toku_msg_buffer_key_msn_cmp_extra extra(bfe->ft->cmp, &bnc->msg_buffer); |
|
|
|
typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> key_msn_sort; |
|
|
|
r = key_msn_sort::mergesort_r(fresh_offsets, nfresh, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->fresh_message_tree.destroy(); |
|
|
|
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer); |
|
|
|
bnc->broadcast_list.destroy(); |
|
|
|
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer); |
|
|
|
if (sort_buffers) { |
|
|
|
struct toku_msg_buffer_key_msn_cmp_extra extra(bfe->ft->cmp, &bnc->msg_buffer); |
|
|
|
typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> key_msn_sort; |
|
|
|
r = key_msn_sort::mergesort_r(fresh_offsets, nfresh, extra); |
|
|
|
assert_zero(r); |
|
|
|
bnc->fresh_message_tree.destroy(); |
|
|
|
bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, n_in_this_buffer); |
|
|
|
bnc->broadcast_list.destroy(); |
|
|
|
bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast_offsets, n_in_this_buffer); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Assign the highest msn from our upgrade message buffers
|
|
|
|
|