You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

922 lines
33 KiB

  1. /* -*- mode: C; c-basic-offset: 4 -*- */
  2. #ident "$Id$"
  3. #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
  4. #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
  5. #include "includes.h"
  6. static void note_txn_closing (TOKUTXN txn);
  7. void
  8. toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
  9. if (txn->progress_poll_fun) {
  10. TOKU_TXN_PROGRESS_S progress = {
  11. .entries_total = txn->num_rollentries,
  12. .entries_processed = txn->num_rollentries_processed,
  13. .is_commit = is_commit,
  14. .stalled_on_checkpoint = stall_for_checkpoint};
  15. txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
  16. }
  17. }
  18. int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
  19. int r=0;
  20. rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn);
  21. txn->num_rollentries_processed++;
  22. if (txn->num_rollentries_processed % 1024 == 0)
  23. toku_poll_txn_progress_function(txn, TRUE, FALSE);
  24. return r;
  25. }
  26. int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
  27. int r=0;
  28. rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn);
  29. txn->num_rollentries_processed++;
  30. if (txn->num_rollentries_processed % 1024 == 0)
  31. toku_poll_txn_progress_function(txn, FALSE, FALSE);
  32. return r;
  33. }
  34. static inline int
  35. txn_has_inprogress_rollback_log(TOKUTXN txn) {
  36. return txn->current_rollback.b != ROLLBACK_NONE.b;
  37. }
  38. static inline int
  39. txn_has_spilled_rollback_logs(TOKUTXN txn) {
  40. return txn->spilled_rollback_tail.b != ROLLBACK_NONE.b;
  41. }
  42. static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoint, void* extra) {
  43. struct brt_header* h = extra;
  44. toku_free_blocknum(
  45. h->blocktable,
  46. cachekey,
  47. h,
  48. for_checkpoint
  49. );
  50. }
  51. int
  52. toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
  53. int r;
  54. CACHEFILE cf = txn->logger->rollback_cachefile;
  55. struct brt_header *h = toku_cachefile_get_userdata(cf);
  56. if (txn->pinned_inprogress_rollback_log == log) {
  57. txn->pinned_inprogress_rollback_log = NULL;
  58. }
  59. r = toku_cachetable_unpin_and_remove (cf, log->thislogname, rollback_unpin_remove_callback, h);
  60. assert(r==0);
  61. return r;
  62. }
  63. static int
  64. toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
  65. apply_rollback_item func) {
  66. int r = 0;
  67. // do the commit/abort calls and free everything
  68. // we do the commit/abort calls in reverse order too.
  69. struct roll_entry *item;
  70. //printf("%s:%d abort\n", __FILE__, __LINE__);
  71. int count=0;
  72. BLOCKNUM next_log = ROLLBACK_NONE;
  73. uint32_t next_log_hash = 0;
  74. BOOL is_current = FALSE;
  75. if (txn_has_inprogress_rollback_log(txn)) {
  76. next_log = txn->current_rollback;
  77. next_log_hash = txn->current_rollback_hash;
  78. is_current = TRUE;
  79. }
  80. else if (txn_has_spilled_rollback_logs(txn)) {
  81. next_log = txn->spilled_rollback_tail;
  82. next_log_hash = txn->spilled_rollback_tail_hash;
  83. }
  84. uint64_t last_sequence = txn->num_rollback_nodes;
  85. BOOL found_head = FALSE;
  86. while (next_log.b != ROLLBACK_NONE.b) {
  87. ROLLBACK_LOG_NODE log;
  88. //pin log
  89. r = toku_get_and_pin_rollback_log(txn, txn->txnid64, last_sequence-1, next_log, next_log_hash, &log);
  90. assert(r==0);
  91. r = toku_maybe_prefetch_older_rollback_log(txn, log);
  92. assert(r==0);
  93. last_sequence = log->sequence;
  94. if (func) {
  95. while ((item=log->newest_logentry)) {
  96. log->newest_logentry = item->prev;
  97. r = func(txn, item, yield, yieldv, lsn);
  98. if (r!=0) return r;
  99. count++;
  100. // We occassionally yield here to prevent transactions
  101. // from hogging the log. This yield will allow other
  102. // threads to grab the ydb lock. However, we don't
  103. // want any transaction doing more than one log
  104. // operation to always yield the ydb lock, as it must
  105. // wait for the ydb lock to be released to proceed.
  106. if (count % 8 == 0) {
  107. yield(NULL, NULL, yieldv);
  108. }
  109. }
  110. }
  111. if (next_log.b == txn->spilled_rollback_head.b) {
  112. assert(!found_head);
  113. found_head = TRUE;
  114. assert(log->sequence == 0);
  115. }
  116. next_log = log->older;
  117. next_log_hash = log->older_hash;
  118. {
  119. //Clean up transaction structure to prevent
  120. //toku_txn_close from double-freeing
  121. if (is_current) {
  122. txn->current_rollback = ROLLBACK_NONE;
  123. txn->current_rollback_hash = 0;
  124. is_current = FALSE;
  125. }
  126. else {
  127. txn->spilled_rollback_tail = next_log;
  128. txn->spilled_rollback_tail_hash = next_log_hash;
  129. }
  130. if (found_head) {
  131. assert(next_log.b == ROLLBACK_NONE.b);
  132. txn->spilled_rollback_head = next_log;
  133. txn->spilled_rollback_head_hash = next_log_hash;
  134. }
  135. }
  136. //Unpins log
  137. r = toku_delete_rollback_log(txn, log);
  138. assert(r==0);
  139. }
  140. return r;
  141. }
  142. int
  143. toku_find_xid_by_xid (OMTVALUE v, void *xidv) {
  144. TXNID xid = (TXNID) v;
  145. TXNID xidfind = (TXNID) xidv;
  146. if (xid<xidfind) return -1;
  147. if (xid>xidfind) return +1;
  148. return 0;
  149. }
  150. int
  151. toku_find_pair_by_xid (OMTVALUE v, void *xidv) {
  152. XID_PAIR pair = v;
  153. TXNID *xidfind = xidv;
  154. if (pair->xid1<*xidfind) return -1;
  155. if (pair->xid1>*xidfind) return +1;
  156. return 0;
  157. }
  158. // For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
  159. // There must be one.
  160. // If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
  161. // than the closing txn, and there is nothing to be done (except to assert the invariant).
  162. // If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
  163. // txn. If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the
  164. // second xid in the pair, otherwise delete the entry from the reverse live list.
  165. static int
  166. live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
  167. TOKUTXN txn = txnv;
  168. TXNID xid = txn->txnid64; // xid of txn that is closing
  169. TXNID *live_xid = live_xidv; // xid on closing txn's live list
  170. OMTVALUE pairv;
  171. XID_PAIR pair;
  172. uint32_t idx;
  173. int r;
  174. OMT reverse = txn->logger->live_list_reverse;
  175. r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
  176. invariant(r==0);
  177. pair = pairv;
  178. invariant(pair->xid1 == *live_xid); //sanity check
  179. if (pair->xid2 == xid) {
  180. //There is a record that needs to be either deleted or updated
  181. TXNID olderxid;
  182. OMTVALUE olderv;
  183. uint32_t olderidx;
  184. OMT snapshot = txn->logger->snapshot_txnids;
  185. BOOL should_delete = TRUE;
  186. // find the youngest txn in snapshot that is older than xid
  187. r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
  188. if (r==0) {
  189. //There is an older txn
  190. olderxid = (TXNID) olderv;
  191. invariant(olderxid < xid);
  192. if (olderxid >= *live_xid) {
  193. //Older txn is new enough, we need to update.
  194. pair->xid2 = olderxid;
  195. should_delete = FALSE;
  196. }
  197. }
  198. else {
  199. invariant(r==DB_NOTFOUND);
  200. }
  201. if (should_delete) {
  202. //Delete record
  203. toku_free(pair);
  204. r = toku_omt_delete_at(reverse, idx);
  205. invariant(r==0);
  206. }
  207. }
  208. else {
  209. invariant(pair->xid2 > xid);
  210. }
  211. return r;
  212. }
  213. // When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
  214. static int
  215. live_list_reverse_note_txn_end(TOKUTXN txn) {
  216. int r;
  217. r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
  218. invariant(r==0);
  219. return r;
  220. }
  221. void toku_rollback_txn_close (TOKUTXN txn) {
  222. assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
  223. assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
  224. assert(txn->current_rollback.b == ROLLBACK_NONE.b);
  225. int r;
  226. TOKULOGGER logger = txn->logger;
  227. r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
  228. {
  229. {
  230. //Remove txn from list (omt) of live transactions
  231. OMTVALUE txnagain;
  232. u_int32_t idx;
  233. r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
  234. assert(r==0);
  235. assert(txn==txnagain);
  236. r = toku_omt_delete_at(logger->live_txns, idx);
  237. assert(r==0);
  238. }
  239. if (txn->parent==NULL) {
  240. OMTVALUE txnagain;
  241. u_int32_t idx;
  242. //Remove txn from list of live root txns
  243. r = toku_omt_find_zero(logger->live_root_txns, find_xid, txn, &txnagain, &idx);
  244. assert(r==0);
  245. assert(txn==txnagain);
  246. r = toku_omt_delete_at(logger->live_root_txns, idx);
  247. assert(r==0);
  248. }
  249. //
  250. // if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
  251. // the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
  252. // necessary because root snapshot transactions are in their own live lists. If we do not remove
  253. // the txnid from the snapshot txnid list first, then when we go to make the modifications to
  254. // live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
  255. //
  256. if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
  257. {
  258. u_int32_t idx;
  259. OMTVALUE v;
  260. //Free memory used for snapshot_txnids
  261. r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
  262. invariant(r==0);
  263. TXNID xid = (TXNID) v;
  264. invariant(xid == txn->txnid64);
  265. r = toku_omt_delete_at(logger->snapshot_txnids, idx);
  266. invariant(r==0);
  267. }
  268. live_list_reverse_note_txn_end(txn);
  269. {
  270. //Free memory used for live root txns local list
  271. invariant(toku_omt_size(txn->live_root_txn_list) > 0);
  272. OMTVALUE v;
  273. //store a single array of txnids
  274. r = toku_omt_fetch(txn->live_root_txn_list, 0, &v);
  275. invariant(r==0);
  276. toku_free(v);
  277. toku_omt_destroy(&txn->live_root_txn_list);
  278. }
  279. }
  280. }
  281. r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);
  282. assert(logger->oldest_living_xid <= txn->txnid64);
  283. if (txn->txnid64 == logger->oldest_living_xid) {
  284. OMTVALUE oldest_txnv;
  285. r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
  286. if (r==0) {
  287. TOKUTXN oldest_txn = oldest_txnv;
  288. assert(oldest_txn != txn); // We just removed it
  289. assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
  290. logger->oldest_living_xid = oldest_txn->txnid64;
  291. logger->oldest_living_starttime = oldest_txn->starttime;
  292. }
  293. else {
  294. //No living transactions
  295. assert(r==EINVAL);
  296. logger->oldest_living_xid = TXNID_NONE_LIVING;
  297. logger->oldest_living_starttime = 0;
  298. }
  299. }
  300. note_txn_closing(txn);
  301. }
  302. void* toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
  303. return malloc_in_memarena(log->rollentry_arena, size);
  304. }
  305. void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len) {
  306. void *r=toku_malloc_in_rollback(log, len);
  307. memcpy(r,v,len);
  308. return r;
  309. }
  310. static int note_brt_used_in_txns_parent(OMTVALUE brtv, u_int32_t UU(index), void*txnv) {
  311. TOKUTXN child = txnv;
  312. TOKUTXN parent = child->parent;
  313. BRT brt = brtv;
  314. int r = toku_txn_note_brt(parent, brt);
  315. if (r==0 &&
  316. brt->h->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
  317. //Pass magic "no rollback needed" flag to parent.
  318. brt->h->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
  319. }
  320. if (r==0 &&
  321. brt->h->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
  322. //Pass magic "no recovery needed" flag to parent.
  323. brt->h->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
  324. }
  325. return r;
  326. }
  327. //Commit each entry in the rollback log.
  328. //If the transaction has a parent, it just promotes its information to its parent.
  329. int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
  330. int r=0;
  331. if (txn->parent!=0) {
  332. // First we must put a rollinclude entry into the parent if we spilled
  333. if (txn_has_spilled_rollback_logs(txn)) {
  334. uint64_t num_nodes = txn->num_rollback_nodes;
  335. if (txn_has_inprogress_rollback_log(txn)) {
  336. num_nodes--; //Don't count the in-progress rollback log.
  337. }
  338. r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
  339. txn->spilled_rollback_head, txn->spilled_rollback_head_hash,
  340. txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash);
  341. if (r!=0) return r;
  342. //Remove ownership from child.
  343. txn->spilled_rollback_head = ROLLBACK_NONE;
  344. txn->spilled_rollback_head_hash = 0;
  345. txn->spilled_rollback_tail = ROLLBACK_NONE;
  346. txn->spilled_rollback_tail_hash = 0;
  347. }
  348. if (txn_has_inprogress_rollback_log(txn)) {
  349. ROLLBACK_LOG_NODE parent_log;
  350. //Pin parent log
  351. r = toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
  352. assert(r==0);
  353. ROLLBACK_LOG_NODE child_log;
  354. //Pin child log
  355. r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes - 1,
  356. txn->current_rollback, txn->current_rollback_hash,
  357. &child_log);
  358. assert(r==0);
  359. // Append the list to the front of the parent.
  360. if (child_log->oldest_logentry) {
  361. // There are some entries, so link them in.
  362. child_log->oldest_logentry->prev = parent_log->newest_logentry;
  363. if (!parent_log->oldest_logentry) {
  364. parent_log->oldest_logentry = child_log->oldest_logentry;
  365. }
  366. parent_log->newest_logentry = child_log->newest_logentry;
  367. parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
  368. txn->parent->rollentry_raw_count += txn->rollentry_raw_count;
  369. child_log->rollentry_resident_bytecount = 0;
  370. }
  371. if (parent_log->oldest_logentry==NULL) {
  372. parent_log->oldest_logentry = child_log->oldest_logentry;
  373. }
  374. child_log->newest_logentry = child_log->oldest_logentry = 0;
  375. // Put all the memarena data into the parent.
  376. if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
  377. // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
  378. memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
  379. }
  380. //Delete child log (unpins child_log)
  381. r = toku_delete_rollback_log(txn, child_log);
  382. assert(r==0);
  383. txn->current_rollback = ROLLBACK_NONE;
  384. txn->current_rollback_hash = 0;
  385. r = toku_maybe_spill_rollbacks(txn->parent, parent_log); //unpins parent_log
  386. assert(r==0);
  387. }
  388. // Note the open brts, the omts must be merged
  389. r = toku_omt_iterate(txn->open_brts, note_brt_used_in_txns_parent, txn);
  390. assert(r==0);
  391. // Merge the list of headers that must be checkpointed before commit
  392. while (!toku_list_empty(&txn->checkpoint_before_commit)) {
  393. struct toku_list *list = toku_list_pop(&txn->checkpoint_before_commit);
  394. toku_list_push(&txn->parent->checkpoint_before_commit, list);
  395. }
  396. //If this transaction needs an fsync (if it commits)
  397. //save that in the parent. Since the commit really happens in the root txn.
  398. txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
  399. txn->parent->num_rollentries += txn->num_rollentries;
  400. } else {
  401. r = toku_apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item);
  402. assert(r==0);
  403. }
  404. return r;
  405. }
  406. int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
  407. int r;
  408. //Empty the list
  409. while (!toku_list_empty(&txn->checkpoint_before_commit)) {
  410. toku_list_pop(&txn->checkpoint_before_commit);
  411. }
  412. r = toku_apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item);
  413. assert(r==0);
  414. return r;
  415. }
  416. static inline PAIR_ATTR make_rollback_pair_attr(long size) {
  417. PAIR_ATTR result={
  418. .size = size,
  419. .nonleaf_size = 0,
  420. .leaf_size = 0,
  421. .rollback_size = size,
  422. .cache_pressure_size = 0,
  423. .is_valid = TRUE
  424. };
  425. return result;
  426. }
  427. // Write something out. Keep trying even if partial writes occur.
  428. // On error: Return negative with errno set.
  429. // On success return nbytes.
  430. static PAIR_ATTR
  431. rollback_memory_size(ROLLBACK_LOG_NODE log) {
  432. size_t size = sizeof(*log);
  433. size += memarena_total_memory_size(log->rollentry_arena);
  434. return make_rollback_pair_attr(size);
  435. }
  436. static void
  437. toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) {
  438. ROLLBACK_LOG_NODE log = *log_p;
  439. *log_p = NULL; //Sanitize
  440. // Cleanup the rollback memory
  441. memarena_close(&log->rollentry_arena);
  442. toku_free(log);
  443. }
  444. static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
  445. void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
  446. BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
  447. int r;
  448. ROLLBACK_LOG_NODE log = rollback_v;
  449. struct brt_header *h = extraargs;
  450. assert(h->cf == cachefile);
  451. assert(log->thislogname.b==logname.b);
  452. if (write_me && !h->panic) {
  453. int n_workitems, n_threads;
  454. toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
  455. r = toku_serialize_rollback_log_to(fd, log->thislogname, log, h, n_workitems, n_threads, for_checkpoint);
  456. if (r) {
  457. if (h->panic==0) {
  458. char *e = strerror(r);
  459. int l = 200 + strlen(e);
  460. char s[l];
  461. h->panic=r;
  462. snprintf(s, l-1, "While writing data to disk, error %d (%s)", r, e);
  463. h->panic_string = toku_strdup(s);
  464. }
  465. }
  466. }
  467. *new_size = size;
  468. if (!keep_me) {
  469. toku_rollback_log_free(&log);
  470. }
  471. }
  472. static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
  473. void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
  474. int r;
  475. struct brt_header *h = extraargs;
  476. assert(h->cf == cachefile);
  477. ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
  478. r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
  479. if (r==0) {
  480. *sizep = rollback_memory_size(*result);
  481. }
  482. return r;
  483. }
  484. static void toku_rollback_pe_est_callback(
  485. void* rollback_v,
  486. void* UU(disk_data),
  487. long* bytes_freed_estimate,
  488. enum partial_eviction_cost *cost,
  489. void* UU(write_extraargs)
  490. )
  491. {
  492. assert(rollback_v != NULL);
  493. *bytes_freed_estimate = 0;
  494. *cost = PE_CHEAP;
  495. }
  496. // callback for partially evicting a cachetable entry
  497. static int toku_rollback_pe_callback (
  498. void *rollback_v,
  499. PAIR_ATTR UU(old_attr),
  500. PAIR_ATTR* new_attr,
  501. void* UU(extraargs)
  502. )
  503. {
  504. assert(rollback_v != NULL);
  505. *new_attr = old_attr;
  506. return 0;
  507. }
  508. static BOOL toku_rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
  509. return FALSE;
  510. }
  511. static int toku_rollback_pf_callback(void* UU(brtnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
  512. // should never be called, given that toku_rollback_pf_req_callback always returns false
  513. assert(FALSE);
  514. return 0;
  515. }
  516. static int toku_rollback_cleaner_callback (
  517. void* UU(brtnode_pv),
  518. BLOCKNUM UU(blocknum),
  519. u_int32_t UU(fullhash),
  520. void* UU(extraargs)
  521. )
  522. {
  523. assert(FALSE);
  524. return 0;
  525. }
  526. static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(struct brt_header* h) {
  527. CACHETABLE_WRITE_CALLBACK wc;
  528. wc.flush_callback = toku_rollback_flush_callback;
  529. wc.pe_est_callback = toku_rollback_pe_est_callback;
  530. wc.pe_callback = toku_rollback_pe_callback;
  531. wc.cleaner_callback = toku_rollback_cleaner_callback;
  532. wc.clone_callback = NULL;
  533. wc.write_extraargs = h;
  534. return wc;
  535. }
  536. static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t older_hash, ROLLBACK_LOG_NODE *result) {
  537. ROLLBACK_LOG_NODE MALLOC(log);
  538. assert(log);
  539. int r;
  540. CACHEFILE cf = txn->logger->rollback_cachefile;
  541. struct brt_header *h = toku_cachefile_get_userdata(cf);
  542. log->layout_version = BRT_LAYOUT_VERSION;
  543. log->layout_version_original = BRT_LAYOUT_VERSION;
  544. log->layout_version_read_from_disk = BRT_LAYOUT_VERSION;
  545. log->dirty = TRUE;
  546. log->txnid = txn->txnid64;
  547. log->sequence = txn->num_rollback_nodes++;
  548. toku_allocate_blocknum(h->blocktable, &log->thislogname, h);
  549. log->thishash = toku_cachetable_hash(cf, log->thislogname);
  550. log->older = older;
  551. log->older_hash = older_hash;
  552. log->oldest_logentry = NULL;
  553. log->newest_logentry = NULL;
  554. log->rollentry_arena = memarena_create();
  555. log->rollentry_resident_bytecount = 0;
  556. *result = log;
  557. r=toku_cachetable_put(cf, log->thislogname, log->thishash,
  558. log, rollback_memory_size(log),
  559. get_write_callbacks_for_rollback_log(h));
  560. assert(r==0);
  561. txn->current_rollback = log->thislogname;
  562. txn->current_rollback_hash = log->thishash;
  563. txn->pinned_inprogress_rollback_log = log;
  564. return 0;
  565. }
  566. int
  567. toku_unpin_inprogress_rollback_log(TOKUTXN txn) {
  568. if (txn->pinned_inprogress_rollback_log) {
  569. return toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
  570. }
  571. else {
  572. return 0;
  573. }
  574. }
  575. int
  576. toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
  577. int r;
  578. CACHEFILE cf = txn->logger->rollback_cachefile;
  579. if (txn->pinned_inprogress_rollback_log == log) {
  580. txn->pinned_inprogress_rollback_log = NULL;
  581. }
  582. r = toku_cachetable_unpin(cf, log->thislogname, log->thishash,
  583. (enum cachetable_dirty)log->dirty, rollback_memory_size(log));
  584. assert(r==0);
  585. return r;
  586. }
  587. //Requires: log is pinned
  588. // log is current
  589. //After:
  590. // log is unpinned if a spill happened
  591. // Maybe there is no current after (if it spilled)
  592. int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) {
  593. int r = 0;
  594. if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
  595. assert(log->thislogname.b == txn->current_rollback.b);
  596. //spill
  597. if (!txn_has_spilled_rollback_logs(txn)) {
  598. //First spilled. Copy to head.
  599. txn->spilled_rollback_head = txn->current_rollback;
  600. txn->spilled_rollback_head_hash = txn->current_rollback_hash;
  601. }
  602. //Unconditionally copy to tail. Old tail does not need to be cached anymore.
  603. txn->spilled_rollback_tail = txn->current_rollback;
  604. txn->spilled_rollback_tail_hash = txn->current_rollback_hash;
  605. txn->current_rollback = ROLLBACK_NONE;
  606. txn->current_rollback_hash = 0;
  607. //Unpin
  608. r = toku_rollback_log_unpin(txn, log);
  609. assert(r==0);
  610. }
  611. return r;
  612. }
  613. //Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
  614. int find_xid (OMTVALUE v, void *txnv) {
  615. TOKUTXN txn = v;
  616. TOKUTXN txnfind = txnv;
  617. if (txn->txnid64<txnfind->txnid64) return -1;
  618. if (txn->txnid64>txnfind->txnid64) return +1;
  619. return 0;
  620. }
  621. static int find_filenum (OMTVALUE v, void *brtv) {
  622. BRT brt = v;
  623. BRT brtfind = brtv;
  624. FILENUM fnum = toku_cachefile_filenum(brt ->cf);
  625. FILENUM fnumfind = toku_cachefile_filenum(brtfind->cf);
  626. if (fnum.fileid<fnumfind.fileid) return -1;
  627. if (fnum.fileid>fnumfind.fileid) return +1;
  628. if (brt < brtfind) return -1;
  629. if (brt > brtfind) return +1;
  630. return 0;
  631. }
  632. //Notify a transaction that it has touched a brt.
  633. int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
  634. OMTVALUE txnv;
  635. u_int32_t index;
  636. // Does brt already know about transaction txn?
  637. int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index);
  638. if (r==0) {
  639. // It's already there.
  640. assert((TOKUTXN)txnv==txn);
  641. return 0;
  642. }
  643. // Otherwise it's not there.
  644. // Insert reference to transaction into brt
  645. r = toku_omt_insert_at(brt->txns, txn, index);
  646. assert(r==0);
  647. // Insert reference to brt into transaction
  648. r = toku_omt_insert(txn->open_brts, brt, find_filenum, brt, 0);
  649. assert(r==0);
  650. return 0;
  651. }
  652. struct swap_brt_extra {
  653. BRT live;
  654. BRT zombie;
  655. };
  656. static int swap_brt (OMTVALUE txnv, u_int32_t UU(idx), void *extra) {
  657. struct swap_brt_extra *info = extra;
  658. TOKUTXN txn = txnv;
  659. OMTVALUE zombie_again=NULL;
  660. u_int32_t index;
  661. int r;
  662. r = toku_txn_note_brt(txn, info->live); //Add new brt.
  663. assert(r==0);
  664. r = toku_omt_find_zero(txn->open_brts, find_filenum, info->zombie, &zombie_again, &index);
  665. assert(r==0);
  666. assert((void*)zombie_again==info->zombie);
  667. r = toku_omt_delete_at(txn->open_brts, index); //Delete old brt.
  668. assert(r==0);
  669. return 0;
  670. }
  671. int toku_txn_note_swap_brt (BRT live, BRT zombie) {
  672. if (zombie->pinned_by_checkpoint) {
  673. //Swap checkpoint responsibility.
  674. assert(!live->pinned_by_checkpoint); //Pin only uses one brt.
  675. live->pinned_by_checkpoint = 1;
  676. zombie->pinned_by_checkpoint = 0;
  677. }
  678. struct swap_brt_extra swap = {.live = live, .zombie = zombie};
  679. int r = toku_omt_iterate(zombie->txns, swap_brt, &swap);
  680. assert(r==0);
  681. toku_omt_clear(zombie->txns);
  682. //Close immediately.
  683. assert(zombie->close_db);
  684. assert(!toku_brt_zombie_needed(zombie));
  685. r = zombie->close_db(zombie->db, zombie->close_flags, false, ZERO_LSN);
  686. return r;
  687. }
  688. static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
  689. TOKUTXN txn = txnv;
  690. BRT brt = brtv;
  691. OMTVALUE brtv_again=NULL;
  692. u_int32_t index;
  693. int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index);
  694. assert(r==0);
  695. assert((void*)brtv_again==brtv);
  696. r = toku_omt_delete_at(txn->open_brts, index);
  697. assert(r==0);
  698. return 0;
  699. }
  700. int toku_txn_note_close_brt (BRT brt) {
  701. assert(toku_omt_size(brt->txns)==0);
  702. int r = toku_omt_iterate(brt->txns, remove_brt, brt);
  703. assert(r==0);
  704. return 0;
  705. }
  706. static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv)
  707. // Effect: This function is called on every open BRT that a transaction used.
  708. // This function removes the transaction from that BRT.
  709. {
  710. BRT brt = brtv;
  711. TOKUTXN txn = txnv;
  712. OMTVALUE txnv_again=NULL;
  713. u_int32_t index;
  714. int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
  715. assert(r==0);
  716. assert((void*)txnv_again==txnv);
  717. r = toku_omt_delete_at(brt->txns, index);
  718. assert(r==0);
  719. if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
  720. brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
  721. brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
  722. }
  723. if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
  724. brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
  725. }
  726. if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
  727. //Close immediately.
  728. assert(brt->close_db);
  729. r = brt->close_db(brt->db, brt->close_flags, false, ZERO_LSN);
  730. }
  731. return r;
  732. }
  733. // for every BRT in txn, remove it.
  734. static void note_txn_closing (TOKUTXN txn) {
  735. toku_omt_iterate(txn->open_brts, remove_txn, txn);
  736. }
  737. // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
  738. int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
  739. {
  740. *raw_count = txn->rollentry_raw_count;
  741. return 0;
  742. }
  743. int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
  744. struct tokutxn fake_txn; fake_txn.txnid64 = xid;
  745. u_int32_t index;
  746. OMTVALUE txnv;
  747. int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index);
  748. if (r == 0) *txnptr = txnv;
  749. return r;
  750. }
  751. int
  752. toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
  753. //Currently processing 'log'. Prefetch the next (older) log node.
  754. BLOCKNUM name = log->older;
  755. int r = 0;
  756. if (name.b != ROLLBACK_NONE.b) {
  757. uint32_t hash = log->older_hash;
  758. CACHEFILE cf = txn->logger->rollback_cachefile;
  759. struct brt_header *h = toku_cachefile_get_userdata(cf);
  760. BOOL doing_prefetch = FALSE;
  761. r = toku_cachefile_prefetch(cf, name, hash,
  762. get_write_callbacks_for_rollback_log(h),
  763. toku_rollback_fetch_callback,
  764. toku_rollback_pf_req_callback,
  765. toku_rollback_pf_callback,
  766. h,
  767. &doing_prefetch);
  768. assert(r==0);
  769. }
  770. return r;
  771. }
  772. int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result) {
  773. BOOL save_inprogress_node = FALSE;
  774. assert(name.b != ROLLBACK_NONE.b);
  775. int r = 0;
  776. ROLLBACK_LOG_NODE log = NULL;
  777. if (name.b == txn->current_rollback.b) {
  778. assert(hash == txn->current_rollback_hash);
  779. log = txn->pinned_inprogress_rollback_log;
  780. save_inprogress_node = TRUE;
  781. }
  782. if (!log) {
  783. CACHEFILE cf = txn->logger->rollback_cachefile;
  784. void * log_v;
  785. struct brt_header *h = toku_cachefile_get_userdata(cf);
  786. r = toku_cachetable_get_and_pin(cf, name, hash,
  787. &log_v, NULL,
  788. get_write_callbacks_for_rollback_log(h),
  789. toku_rollback_fetch_callback,
  790. toku_rollback_pf_req_callback,
  791. toku_rollback_pf_callback,
  792. TRUE, // may_modify_value
  793. h
  794. );
  795. assert(r==0);
  796. log = (ROLLBACK_LOG_NODE)log_v;
  797. }
  798. if (r==0) {
  799. assert(log->thislogname.b == name.b);
  800. assert(log->txnid == xid);
  801. assert(log->sequence == sequence);
  802. if (save_inprogress_node) {
  803. txn->pinned_inprogress_rollback_log = log;
  804. }
  805. *result = log;
  806. }
  807. return r;
  808. }
  809. int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result) {
  810. invariant(txn->state == TOKUTXN_LIVE); // #3258
  811. int r;
  812. ROLLBACK_LOG_NODE log;
  813. if (txn_has_inprogress_rollback_log(txn)) {
  814. r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes-1,
  815. txn->current_rollback, txn->current_rollback_hash, &log);
  816. assert(r==0);
  817. }
  818. else {
  819. //Generate new one.
  820. //tail will be ROLLBACK_NONE if this is the very first
  821. r = toku_create_new_rollback_log(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &log);
  822. assert(r==0);
  823. }
  824. if (r==0) {
  825. assert(log->txnid == txn->txnid64);
  826. assert(log->thislogname.b != ROLLBACK_NONE.b);
  827. *result = log;
  828. }
  829. return r;
  830. }