You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

909 lines
33 KiB

  1. /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
  2. #ident "$Id: brt.c 43396 2012-05-11 17:24:47Z zardosht $"
  3. #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
  4. #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
  5. #include "includes.h"
  6. #include <brt-cachetable-wrappers.h>
  7. void
  8. toku_brt_header_suppress_rollbacks(struct brt_header *h, TOKUTXN txn) {
  9. TXNID txnid = toku_txn_get_txnid(txn);
  10. assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
  11. h->txnid_that_created_or_locked_when_empty == txnid);
  12. h->txnid_that_created_or_locked_when_empty = txnid;
  13. TXNID rootid = toku_txn_get_root_txnid(txn);
  14. assert(h->root_that_created_or_locked_when_empty == TXNID_NONE ||
  15. h->root_that_created_or_locked_when_empty == rootid);
  16. h->root_that_created_or_locked_when_empty = rootid;
  17. }
  18. void
  19. toku_reset_root_xid_that_created(struct brt_header* h, TXNID new_root_xid_that_created) {
  20. // Reset the root_xid_that_created field to the given value.
  21. // This redefines which xid created the dictionary.
  22. // hold lock around setting and clearing of dirty bit
  23. // (see cooperative use of dirty bit in brtheader_begin_checkpoint())
  24. toku_brtheader_lock (h);
  25. h->root_xid_that_created = new_root_xid_that_created;
  26. h->dirty = 1;
  27. toku_brtheader_unlock (h);
  28. }
  29. static void
  30. brtheader_destroy(struct brt_header *h) {
  31. if (!h->panic) assert(!h->checkpoint_header);
  32. //header and checkpoint_header have same Blocktable pointer
  33. //cannot destroy since it is still in use by CURRENT
  34. if (h->type == BRTHEADER_CHECKPOINT_INPROGRESS) h->blocktable = NULL;
  35. else {
  36. assert(h->type == BRTHEADER_CURRENT);
  37. toku_blocktable_destroy(&h->blocktable);
  38. if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data);
  39. if (h->cmp_descriptor.dbt.data) toku_free(h->cmp_descriptor.dbt.data);
  40. toku_brtheader_destroy_treelock(h);
  41. toku_omt_destroy(&h->txns);
  42. }
  43. }
  44. // Make a copy of the header for the purpose of a checkpoint
  45. static void
  46. brtheader_copy_for_checkpoint(struct brt_header *h, LSN checkpoint_lsn) {
  47. assert(h->type == BRTHEADER_CURRENT);
  48. assert(h->checkpoint_header == NULL);
  49. assert(h->panic==0);
  50. struct brt_header* XMALLOC(ch);
  51. *ch = *h; //Do a shallow copy
  52. ch->type = BRTHEADER_CHECKPOINT_INPROGRESS; //Different type
  53. //printf("checkpoint_lsn=%" PRIu64 "\n", checkpoint_lsn.lsn);
  54. ch->checkpoint_lsn = checkpoint_lsn;
  55. ch->panic_string = NULL;
  56. //ch->blocktable is SHARED between the two headers
  57. h->checkpoint_header = ch;
  58. }
  59. static void
  60. brtheader_free(struct brt_header *h) {
  61. brtheader_destroy(h);
  62. toku_free(h);
  63. }
  64. void
  65. toku_brtheader_free (struct brt_header *h) {
  66. brtheader_free(h);
  67. }
  68. void
  69. toku_brtheader_init_treelock(struct brt_header* h) {
  70. int r = toku_pthread_mutex_init(&h->tree_lock, NULL); assert(r == 0);
  71. }
  72. void
  73. toku_brtheader_destroy_treelock(struct brt_header* h) {
  74. int r = toku_pthread_mutex_destroy(&h->tree_lock); assert(r == 0);
  75. }
  76. void
  77. toku_brtheader_grab_treelock(struct brt_header* h) {
  78. int r = toku_pthread_mutex_lock(&h->tree_lock); assert(r == 0);
  79. }
  80. void
  81. toku_brtheader_release_treelock(struct brt_header* h) {
  82. int r = toku_pthread_mutex_unlock(&h->tree_lock); assert(r == 0);
  83. }
  84. /////////////////////////////////////////////////////////////////////////
  85. // Start of Functions that are callbacks to the cachefule
  86. //
  87. // maps to cf->log_fassociate_during_checkpoint
  88. static int
  89. brtheader_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
  90. struct brt_header *h = header_v;
  91. char* fname_in_env = toku_cachefile_fname_in_env(cf);
  92. BYTESTRING bs = { strlen(fname_in_env), // don't include the NUL
  93. fname_in_env };
  94. TOKULOGGER logger = toku_cachefile_logger(cf);
  95. FILENUM filenum = toku_cachefile_filenum (cf);
  96. int r = toku_log_fassociate(logger, NULL, 0, filenum, h->flags, bs);
  97. return r;
  98. }
  99. // maps to cf->log_suppress_rollback_during_checkpoint
  100. static int
  101. brtheader_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) {
  102. int r = 0;
  103. struct brt_header *h = header_v;
  104. TXNID xid = h->txnid_that_created_or_locked_when_empty;
  105. if (xid != TXNID_NONE) {
  106. //Only log if useful.
  107. TOKULOGGER logger = toku_cachefile_logger(cf);
  108. FILENUM filenum = toku_cachefile_filenum (cf);
  109. r = toku_log_suppress_rollback(logger, NULL, 0, filenum, xid);
  110. }
  111. return r;
  112. }
  113. // Maps to cf->begin_checkpoint_userdata
  114. // Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...).
  115. //Has access to fd (it is protected).
  116. static int
  117. brtheader_begin_checkpoint (LSN checkpoint_lsn, void *header_v) {
  118. struct brt_header *h = header_v;
  119. int r = h->panic;
  120. if (r==0) {
  121. // hold lock around copying and clearing of dirty bit
  122. toku_brtheader_lock (h);
  123. assert(h->type == BRTHEADER_CURRENT);
  124. assert(h->checkpoint_header == NULL);
  125. brtheader_copy_for_checkpoint(h, checkpoint_lsn);
  126. h->dirty = 0; // this is only place this bit is cleared (in currentheader)
  127. // on_disk_stats includes on disk changes since last checkpoint,
  128. // so checkpoint_staging_stats now includes changes for checkpoint in progress.
  129. h->checkpoint_staging_stats = h->on_disk_stats;
  130. toku_block_translation_note_start_checkpoint_unlocked(h->blocktable);
  131. toku_brtheader_unlock (h);
  132. }
  133. return r;
  134. }
  135. // maps to cf->checkpoint_userdata
  136. // Write checkpoint-in-progress versions of header and translation to disk (really to OS internal buffer).
  137. // Copy current header's version of checkpoint_staging stat64info to checkpoint header.
  138. // Must have access to fd (protected).
  139. // Requires: all pending bits are clear. This implies that no thread will modify the checkpoint_staging
  140. // version of the stat64info.
  141. static int
  142. brtheader_checkpoint (CACHEFILE cf, int fd, void *header_v) {
  143. struct brt_header *h = header_v;
  144. struct brt_header *ch = h->checkpoint_header;
  145. int r = 0;
  146. if (h->panic!=0) goto handle_error;
  147. //printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
  148. // block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
  149. assert(ch);
  150. if (ch->panic!=0) goto handle_error;
  151. assert(ch->type == BRTHEADER_CHECKPOINT_INPROGRESS);
  152. if (ch->dirty) { // this is only place this bit is tested (in checkpoint_header)
  153. TOKULOGGER logger = toku_cachefile_logger(cf);
  154. if (logger) {
  155. r = toku_logger_fsync_if_lsn_not_fsynced(logger, ch->checkpoint_lsn);
  156. if (r!=0) goto handle_error;
  157. }
  158. uint64_t now = (uint64_t) time(NULL); // 4018;
  159. h->time_of_last_modification = now;
  160. ch->time_of_last_modification = now;
  161. ch->checkpoint_count++;
  162. // Threadsafety of checkpoint_staging_stats here depends on there being no pending bits set,
  163. // so that all callers to flush callback should have the for_checkpoint argument false,
  164. // and therefore will not modify the checkpoint_staging_stats.
  165. // TODO 4184: If the flush callback is called with the for_checkpoint argument true even when all the pending bits
  166. // are clear, then this is a problem.
  167. ch->checkpoint_staging_stats = h->checkpoint_staging_stats;
  168. // The in_memory_stats and on_disk_stats in the checkpoint header should be ignored, but we set them
  169. // here just in case the serializer looks in the wrong place.
  170. ch->in_memory_stats = ch->checkpoint_staging_stats;
  171. ch->on_disk_stats = ch->checkpoint_staging_stats;
  172. // write translation and header to disk (or at least to OS internal buffer)
  173. r = toku_serialize_brt_header_to(fd, ch);
  174. if (r!=0) goto handle_error;
  175. ch->dirty = 0; // this is only place this bit is cleared (in checkpoint_header)
  176. // fsync the cachefile
  177. r = toku_cachefile_fsync(cf);
  178. if (r!=0) {
  179. goto handle_error;
  180. }
  181. h->checkpoint_count++; // checkpoint succeeded, next checkpoint will save to alternate header location
  182. h->checkpoint_lsn = ch->checkpoint_lsn; //Header updated.
  183. }
  184. else {
  185. toku_block_translation_note_skipped_checkpoint(ch->blocktable);
  186. }
  187. if (0) {
  188. handle_error:
  189. if (h->panic) r = h->panic;
  190. else if (ch->panic) {
  191. r = ch->panic;
  192. //Steal panic string. Cannot afford to malloc.
  193. h->panic = ch->panic;
  194. h->panic_string = ch->panic_string;
  195. }
  196. else toku_block_translation_note_failed_checkpoint(ch->blocktable);
  197. }
  198. return r;
  199. }
  200. // maps to cf->end_checkpoint_userdata
  201. // free unused disk space
  202. // (i.e. tell BlockAllocator to liberate blocks used by previous checkpoint).
  203. // Must have access to fd (protected)
  204. static int
  205. brtheader_end_checkpoint (CACHEFILE UU(cachefile), int fd, void *header_v) {
  206. struct brt_header *h = header_v;
  207. int r = h->panic;
  208. if (r==0) {
  209. assert(h->type == BRTHEADER_CURRENT);
  210. toku_block_translation_note_end_checkpoint(h->blocktable, fd, h);
  211. }
  212. if (h->checkpoint_header) { // could be NULL only if panic was true at begin_checkpoint
  213. brtheader_free(h->checkpoint_header);
  214. h->checkpoint_header = NULL;
  215. }
  216. return r;
  217. }
  218. // maps to cf->close_userdata
  219. // Has access to fd (it is protected).
  220. static int
  221. brtheader_close (CACHEFILE cachefile, int fd, void *header_v, char **malloced_error_string, BOOL oplsn_valid, LSN oplsn) {
  222. struct brt_header *h = header_v;
  223. assert(h->type == BRTHEADER_CURRENT);
  224. toku_brtheader_lock(h);
  225. assert(!toku_brt_header_needed(h));
  226. toku_brtheader_unlock(h);
  227. int r = 0;
  228. if (h->panic) {
  229. r = h->panic;
  230. } else if (h->dictionary_opened) { //Otherwise header has never fully been created.
  231. assert(h->cf == cachefile);
  232. TOKULOGGER logger = toku_cachefile_logger(cachefile);
  233. LSN lsn = ZERO_LSN;
  234. //Get LSN
  235. if (oplsn_valid) {
  236. //Use recovery-specified lsn
  237. lsn = oplsn;
  238. //Recovery cannot reduce lsn of a header.
  239. if (lsn.lsn < h->checkpoint_lsn.lsn)
  240. lsn = h->checkpoint_lsn;
  241. }
  242. else {
  243. //Get LSN from logger
  244. lsn = ZERO_LSN; // if there is no logger, we use zero for the lsn
  245. if (logger) {
  246. char* fname_in_env = toku_cachefile_fname_in_env(cachefile);
  247. assert(fname_in_env);
  248. BYTESTRING bs = {.len=strlen(fname_in_env), .data=fname_in_env};
  249. r = toku_log_fclose(logger, &lsn, h->dirty, bs, toku_cachefile_filenum(cachefile)); // flush the log on close (if new header is being written), otherwise it might not make it out.
  250. if (r!=0) return r;
  251. }
  252. }
  253. if (h->dirty) { // this is the only place this bit is tested (in currentheader)
  254. if (logger) { //Rollback cachefile MUST NOT BE CLOSED DIRTY
  255. //It can be checkpointed only via 'checkpoint'
  256. assert(logger->rollback_cachefile != cachefile);
  257. }
  258. int r2;
  259. //assert(lsn.lsn!=0);
  260. r2 = brtheader_begin_checkpoint(lsn, header_v);
  261. if (r==0) r = r2;
  262. r2 = brtheader_checkpoint(cachefile, fd, h);
  263. if (r==0) r = r2;
  264. r2 = brtheader_end_checkpoint(cachefile, fd, header_v);
  265. if (r==0) r = r2;
  266. if (!h->panic) assert(!h->dirty); // dirty bit should be cleared by begin_checkpoint and never set again (because we're closing the dictionary)
  267. }
  268. }
  269. if (malloced_error_string) *malloced_error_string = h->panic_string;
  270. if (r == 0) {
  271. r = h->panic;
  272. }
  273. toku_brtheader_free(h);
  274. return r;
  275. }
  276. // maps to cf->note_pin_by_checkpoint
  277. //Must be protected by ydb lock.
  278. //Is only called by checkpoint begin, which holds it
  279. static int
  280. brtheader_note_pin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
  281. {
  282. //Set arbitrary brt (for given header) as pinned by checkpoint.
  283. //Only one can be pinned (only one checkpoint at a time), but not worth verifying.
  284. struct brt_header *h = header_v;
  285. assert(!h->pinned_by_checkpoint);
  286. h->pinned_by_checkpoint = true;
  287. return 0;
  288. }
  289. // maps to cf->note_unpin_by_checkpoint
  290. //Must be protected by ydb lock.
  291. //Called by end_checkpoint, which grabs ydb lock around note_unpin
  292. static int
  293. brtheader_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v)
  294. {
  295. struct brt_header *h = header_v;
  296. assert(h->pinned_by_checkpoint);
  297. h->pinned_by_checkpoint = false; //Unpin
  298. int r = 0;
  299. //Close if necessary
  300. if (!toku_brt_header_needed(h)) {
  301. //Close immediately.
  302. char *error_string = NULL;
  303. r = toku_remove_brtheader(h, &error_string, false, ZERO_LSN);
  304. lazy_assert_zero(r);
  305. }
  306. return r;
  307. }
  308. //
  309. // End of Functions that are callbacks to the cachefile
  310. /////////////////////////////////////////////////////////////////////////
  311. static int setup_initial_brtheader_root_node (struct brt_header* h, BLOCKNUM blocknum) {
  312. BRTNODE XMALLOC(node);
  313. toku_initialize_empty_brtnode(node, blocknum, 0, 1, h->layout_version, h->nodesize, h->flags, h);
  314. BP_STATE(node,0) = PT_AVAIL;
  315. u_int32_t fullhash = toku_cachetable_hash(h->cf, blocknum);
  316. node->fullhash = fullhash;
  317. int r = toku_cachetable_put(h->cf, blocknum, fullhash,
  318. node, make_brtnode_pair_attr(node),
  319. get_write_callbacks_for_node(h));
  320. if (r != 0)
  321. toku_free(node);
  322. else
  323. toku_unpin_brtnode(h, node);
  324. return r;
  325. }
  326. // TODO: (Zardosht) move this functionality to brt_init_header
  327. // No need in having brt_init_header call this function
  328. static int
  329. brt_init_header_partial (BRT t, CACHEFILE cf, TOKUTXN txn) {
  330. int r;
  331. t->h->flags = t->flags;
  332. if (t->h->cf!=NULL) assert(t->h->cf == cf);
  333. t->h->cf = cf;
  334. t->h->nodesize = t->nodesize;
  335. t->h->basementnodesize = t->basementnodesize;
  336. t->h->root_xid_that_created = txn ? txn->ancestor_txnid64 : TXNID_NONE;
  337. t->h->compare_fun = t->compare_fun;
  338. t->h->update_fun = t->update_fun;
  339. t->h->in_memory_stats = ZEROSTATS;
  340. t->h->on_disk_stats = ZEROSTATS;
  341. t->h->checkpoint_staging_stats = ZEROSTATS;
  342. t->h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
  343. BLOCKNUM root = t->h->root_blocknum;
  344. r = setup_initial_brtheader_root_node(t->h, root);
  345. if (r != 0) {
  346. goto exit;
  347. }
  348. //printf("%s:%d putting %p (%d)\n", __FILE__, __LINE__, t->h, 0);
  349. toku_cachefile_set_userdata(t->h->cf,
  350. t->h,
  351. brtheader_log_fassociate_during_checkpoint,
  352. brtheader_log_suppress_rollback_during_checkpoint,
  353. brtheader_close,
  354. brtheader_checkpoint,
  355. brtheader_begin_checkpoint,
  356. brtheader_end_checkpoint,
  357. brtheader_note_pin_by_checkpoint,
  358. brtheader_note_unpin_by_checkpoint);
  359. exit:
  360. return r;
  361. }
  362. static int
  363. brt_init_header (BRT t, CACHEFILE cf, TOKUTXN txn) {
  364. t->h->type = BRTHEADER_CURRENT;
  365. t->h->checkpoint_header = NULL;
  366. toku_brtheader_init_treelock(t->h);
  367. toku_blocktable_create_new(&t->h->blocktable);
  368. BLOCKNUM root;
  369. //Assign blocknum for root block, also dirty the header
  370. toku_allocate_blocknum(t->h->blocktable, &root, t->h);
  371. t->h->root_blocknum = root;
  372. t->h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
  373. toku_list_init(&t->h->live_brts);
  374. int r = toku_omt_create(&t->h->txns);
  375. assert_zero(r);
  376. r = brt_init_header_partial(t, cf, txn);
  377. if (r==0) toku_block_verify_no_free_blocknums(t->h->blocktable);
  378. return r;
  379. }
  380. // allocate and initialize a brt header.
  381. // t->h->cf is not set to anything.
  382. // TODO: (Zardosht) make this function return a header and set
  383. // it to t->h in the caller
  384. int
  385. toku_create_new_brtheader(BRT t, CACHEFILE cf, TOKUTXN txn) {
  386. int r;
  387. assert (!t->h);
  388. t->h = toku_xmalloc(sizeof(struct brt_header));
  389. if (!t->h) {
  390. assert(errno==ENOMEM);
  391. r = ENOMEM;
  392. goto exit;
  393. }
  394. memset(t->h, 0, sizeof(struct brt_header));
  395. t->h->layout_version = BRT_LAYOUT_VERSION;
  396. t->h->layout_version_original = BRT_LAYOUT_VERSION;
  397. t->h->layout_version_read_from_disk = BRT_LAYOUT_VERSION; // fake, prevent unnecessary upgrade logic
  398. t->h->build_id = BUILD_ID;
  399. t->h->build_id_original = BUILD_ID;
  400. uint64_t now = (uint64_t) time(NULL);
  401. t->h->time_of_creation = now;
  402. t->h->time_of_last_modification = now;
  403. t->h->time_of_last_verification = 0;
  404. memset(&t->h->descriptor, 0, sizeof(t->h->descriptor));
  405. memset(&t->h->cmp_descriptor, 0, sizeof(t->h->cmp_descriptor));
  406. r = brt_init_header(t, cf, txn);
  407. if (r != 0) {
  408. goto exit;
  409. }
  410. r = 0;
  411. exit:
  412. if (r != 0) {
  413. if (t->h) {
  414. toku_free(t->h);
  415. t->h = NULL;
  416. }
  417. return r;
  418. }
  419. return r;
  420. }
  421. // TODO: (Zardosht) get rid of brt parameter
  422. int toku_read_brt_header_and_store_in_cachefile (BRT brt, CACHEFILE cf, LSN max_acceptable_lsn, struct brt_header **header, BOOL* was_open)
  423. // If the cachefile already has the header, then just get it.
  424. // If the cachefile has not been initialized, then don't modify anything.
  425. // max_acceptable_lsn is the latest acceptable checkpointed version of the file.
  426. {
  427. {
  428. struct brt_header *h;
  429. if ((h=toku_cachefile_get_userdata(cf))!=0) {
  430. *header = h;
  431. *was_open = TRUE;
  432. assert(brt->update_fun == h->update_fun);
  433. assert(brt->compare_fun == h->compare_fun);
  434. return 0;
  435. }
  436. }
  437. *was_open = FALSE;
  438. struct brt_header *h;
  439. int r;
  440. {
  441. int fd = toku_cachefile_get_and_pin_fd (cf);
  442. enum deserialize_error_code e = toku_deserialize_brtheader_from(fd, max_acceptable_lsn, &h);
  443. if (e == DS_XSUM_FAIL) {
  444. fprintf(stderr, "Checksum failure while reading header in file %s.\n", toku_cachefile_fname_in_env(cf));
  445. assert(false); // make absolutely sure we crash before doing anything else
  446. } else if (e == DS_ERRNO) {
  447. r = errno;
  448. } else if (e == DS_OK) {
  449. r = 0;
  450. } else {
  451. assert(false);
  452. }
  453. toku_cachefile_unpin_fd(cf);
  454. }
  455. if (r!=0) return r;
  456. h->cf = cf;
  457. h->compare_fun = brt->compare_fun;
  458. h->update_fun = brt->update_fun;
  459. toku_cachefile_set_userdata(cf,
  460. (void*)h,
  461. brtheader_log_fassociate_during_checkpoint,
  462. brtheader_log_suppress_rollback_during_checkpoint,
  463. brtheader_close,
  464. brtheader_checkpoint,
  465. brtheader_begin_checkpoint,
  466. brtheader_end_checkpoint,
  467. brtheader_note_pin_by_checkpoint,
  468. brtheader_note_unpin_by_checkpoint);
  469. *header = h;
  470. return 0;
  471. }
  472. void
  473. toku_brtheader_note_brt_open(BRT live) {
  474. struct brt_header *h = live->h;
  475. toku_brtheader_lock(h);
  476. toku_list_push(&h->live_brts, &live->live_brt_link);
  477. h->dictionary_opened = TRUE;
  478. toku_brtheader_unlock(h);
  479. }
  480. int
  481. toku_brt_header_needed(struct brt_header* h) {
  482. return !toku_list_empty(&h->live_brts) || toku_omt_size(h->txns) != 0 || h->pinned_by_checkpoint;
  483. }
  484. // Close brt. If opsln_valid, use given oplsn as lsn in brt header instead of logging
  485. // the close and using the lsn provided by logging the close. (Subject to constraint
  486. // that if a newer lsn is already in the dictionary, don't overwrite the dictionary.)
  487. int toku_remove_brtheader (struct brt_header* h, char **error_string, BOOL oplsn_valid, LSN oplsn) {
  488. assert(!h->pinned_by_checkpoint);
  489. int r = 0;
  490. // Must do this work before closing the cf
  491. if (h->cf) {
  492. if (error_string) assert(*error_string == 0);
  493. r = toku_cachefile_close(&h->cf, error_string, oplsn_valid, oplsn);
  494. if (r==0 && error_string) assert(*error_string == 0);
  495. }
  496. return r;
  497. }
  498. // gets the first existing BRT handle, if it exists. If no BRT handle exists
  499. // for this header, returns NULL
  500. BRT toku_brtheader_get_some_existing_brt(struct brt_header* h) {
  501. BRT brt_ret = NULL;
  502. toku_brtheader_lock(h);
  503. if (!toku_list_empty(&h->live_brts)) {
  504. brt_ret = toku_list_struct(toku_list_head(&h->live_brts), struct brt, live_brt_link);
  505. }
  506. toku_brtheader_unlock(h);
  507. return brt_ret;
  508. }
  509. // Purpose: set fields in brt_header to capture accountability info for start of HOT optimize.
  510. // Note: HOT accountability variables in header are modified only while holding header lock.
  511. // (Header lock is really needed for touching the dirty bit, but it's useful and
  512. // convenient here for keeping the HOT variables threadsafe.)
  513. void
  514. toku_brt_header_note_hot_begin(BRT brt) {
  515. struct brt_header *h = brt->h;
  516. time_t now = time(NULL);
  517. // hold lock around setting and clearing of dirty bit
  518. // (see cooperative use of dirty bit in brtheader_begin_checkpoint())
  519. toku_brtheader_lock(h);
  520. h->time_of_last_optimize_begin = now;
  521. h->count_of_optimize_in_progress++;
  522. h->dirty = 1;
  523. toku_brtheader_unlock(h);
  524. }
  525. // Purpose: set fields in brt_header to capture accountability info for end of HOT optimize.
  526. // Note: See note for toku_brt_header_note_hot_begin().
  527. void
  528. toku_brt_header_note_hot_complete(BRT brt, BOOL success, MSN msn_at_start_of_hot) {
  529. struct brt_header *h = brt->h;
  530. time_t now = time(NULL);
  531. toku_brtheader_lock(h);
  532. h->count_of_optimize_in_progress--;
  533. if (success) {
  534. h->time_of_last_optimize_end = now;
  535. h->msn_at_start_of_last_completed_optimize = msn_at_start_of_hot;
  536. // If we just successfully completed an optimization and no other thread is performing
  537. // an optimization, then the number of optimizations in progress is zero.
  538. // If there was a crash during a HOT optimization, this is how count_of_optimize_in_progress
  539. // would be reset to zero on the disk after recovery from that crash.
  540. if (h->count_of_optimize_in_progress == h->count_of_optimize_in_progress_read_from_disk)
  541. h->count_of_optimize_in_progress = 0;
  542. }
  543. h->dirty = 1;
  544. toku_brtheader_unlock(h);
  545. }
  546. void
  547. toku_brt_header_init(struct brt_header *h,
  548. BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method compression_method) {
  549. memset(h, 0, sizeof *h);
  550. h->layout_version = BRT_LAYOUT_VERSION;
  551. h->layout_version_original = BRT_LAYOUT_VERSION;
  552. h->build_id = BUILD_ID;
  553. h->build_id_original = BUILD_ID;
  554. uint64_t now = (uint64_t) time(NULL);
  555. h->time_of_creation = now;
  556. h->time_of_last_modification = now;
  557. h->time_of_last_verification = 0;
  558. h->checkpoint_count = 1;
  559. h->checkpoint_lsn = checkpoint_lsn;
  560. h->nodesize = target_nodesize;
  561. h->basementnodesize = target_basementnodesize;
  562. h->root_blocknum = root_blocknum_on_disk;
  563. h->flags = 0;
  564. h->root_xid_that_created = root_xid_that_created;
  565. h->compression_method = compression_method;
  566. h->highest_unused_msn_for_upgrade.msn = MIN_MSN.msn - 1;
  567. }
  568. // Open a brt for use by redirect. The new brt must have the same dict_id as the old_brt passed in. (FILENUM is assigned by the brt_open() function.)
  569. static int
  570. brt_open_for_redirect(BRT *new_brtp, const char *fname_in_env, TOKUTXN txn, struct brt_header* old_h) {
  571. int r;
  572. BRT t;
  573. assert(old_h->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
  574. r = toku_brt_create(&t);
  575. assert_zero(r);
  576. r = toku_brt_set_bt_compare(t, old_h->compare_fun);
  577. assert_zero(r);
  578. r = toku_brt_set_update(t, old_h->update_fun);
  579. assert_zero(r);
  580. r = toku_brt_set_nodesize(t, old_h->nodesize);
  581. assert_zero(r);
  582. r = toku_brt_set_basementnodesize(t, old_h->basementnodesize);
  583. assert_zero(r);
  584. CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf);
  585. r = toku_brt_open_with_dict_id(t, fname_in_env, 0, 0, ct, txn, old_h->dict_id);
  586. assert_zero(r);
  587. assert(t->h->dict_id.dictid == old_h->dict_id.dictid);
  588. *new_brtp = t;
  589. return r;
  590. }
  591. // This function performs most of the work to redirect a dictionary to different file.
  592. // It is called for redirect and to abort a redirect. (This function is almost its own inverse.)
  593. static int
  594. dictionary_redirect_internal(const char *dst_fname_in_env, struct brt_header *src_h, TOKUTXN txn, struct brt_header **dst_hp) {
  595. int r;
  596. FILENUM src_filenum = toku_cachefile_filenum(src_h->cf);
  597. FILENUM dst_filenum = FILENUM_NONE;
  598. struct brt_header *dst_h = NULL;
  599. struct toku_list *list;
  600. // open a dummy brt based off of
  601. // dst_fname_in_env to get the header
  602. // then we will change all the brt's to have
  603. // their headers point to dst_h instead of src_h
  604. BRT tmp_dst_brt = NULL;
  605. r = brt_open_for_redirect(&tmp_dst_brt, dst_fname_in_env, txn, src_h);
  606. assert_zero(r);
  607. dst_h = tmp_dst_brt->h;
  608. // some sanity checks on dst_filenum
  609. dst_filenum = toku_cachefile_filenum(dst_h->cf);
  610. assert(dst_filenum.fileid!=FILENUM_NONE.fileid);
  611. assert(dst_filenum.fileid!=src_filenum.fileid); //Cannot be same file.
  612. // for each live brt, brt->h is currently src_h
  613. // we want to change it to dummy_dst
  614. while (!toku_list_empty(&src_h->live_brts)) {
  615. list = src_h->live_brts.next;
  616. BRT src_brt = NULL;
  617. src_brt = toku_list_struct(list, struct brt, live_brt_link);
  618. toku_brtheader_lock(src_h);
  619. toku_list_remove(&src_brt->live_brt_link);
  620. toku_brtheader_unlock(src_h);
  621. src_brt->h = dst_h;
  622. toku_brtheader_note_brt_open(src_brt);
  623. if (src_brt->redirect_callback) {
  624. src_brt->redirect_callback(src_brt, src_brt->redirect_callback_extra);
  625. }
  626. }
  627. assert(dst_h);
  628. r = toku_brt_close(tmp_dst_brt, FALSE, ZERO_LSN);
  629. assert_zero(r);
  630. *dst_hp = dst_h;
  631. return r;
  632. }
  633. //This is the 'abort redirect' function. The redirect of old_h to new_h was done
  634. //and now must be undone, so here we redirect new_h back to old_h.
  635. int
  636. toku_dictionary_redirect_abort(struct brt_header *old_h, struct brt_header *new_h, TOKUTXN txn) {
  637. char *old_fname_in_env = toku_cachefile_fname_in_env(old_h->cf);
  638. int r;
  639. {
  640. FILENUM old_filenum = toku_cachefile_filenum(old_h->cf);
  641. FILENUM new_filenum = toku_cachefile_filenum(new_h->cf);
  642. assert(old_filenum.fileid!=new_filenum.fileid); //Cannot be same file.
  643. //No living brts in old header.
  644. assert(toku_list_empty(&old_h->live_brts));
  645. }
  646. // If application did not close all DBs using the new file, then there should
  647. // be no zombies and we need to redirect the DBs back to the original file.
  648. if (!toku_list_empty(&new_h->live_brts)) {
  649. struct brt_header *dst_h;
  650. // redirect back from new_h to old_h
  651. r = dictionary_redirect_internal(old_fname_in_env, new_h, txn, &dst_h);
  652. assert_zero(r);
  653. assert(dst_h == old_h);
  654. }
  655. else {
  656. //No live brts.
  657. //No need to redirect back.
  658. r = 0;
  659. }
  660. return r;
  661. }
  662. /****
  663. * on redirect or abort:
  664. * if redirect txn_note_doing_work(txn)
  665. * if redirect connect src brt to txn (txn modified this brt)
  666. * for each src brt
  667. * open brt to dst file (create new brt struct)
  668. * if redirect connect dst brt to txn
  669. * redirect db to new brt
  670. * redirect cursors to new brt
  671. * close all src brts
  672. * if redirect make rollback log entry
  673. *
  674. * on commit:
  675. * nothing to do
  676. *
  677. *****/
  678. int
  679. toku_dictionary_redirect (const char *dst_fname_in_env, BRT old_brt, TOKUTXN txn) {
  680. // Input args:
  681. // new file name for dictionary (relative to env)
  682. // old_brt is a live brt of open handle ({DB, BRT} pair) that currently refers to old dictionary file.
  683. // (old_brt may be one of many handles to the dictionary.)
  684. // txn that created the loader
  685. // Requires:
  686. // ydb_lock is held.
  687. // The brt is open. (which implies there can be no zombies.)
  688. // The new file must be a valid dictionary.
  689. // The block size and flags in the new file must match the existing BRT.
  690. // The new file must already have its descriptor in it (and it must match the existing descriptor).
  691. // Effect:
  692. // Open new BRTs (and related header and cachefile) to the new dictionary file with a new FILENUM.
  693. // Redirect all DBs that point to brts that point to the old file to point to brts that point to the new file.
  694. // Copy the dictionary id (dict_id) from the header of the original file to the header of the new file.
  695. // Create a rollback log entry.
  696. // The original BRT, header, cachefile and file remain unchanged. They will be cleaned up on commmit.
  697. // If the txn aborts, then this operation will be undone
  698. int r;
  699. struct brt_header * old_h = old_brt->h;
  700. // dst file should not be open. (implies that dst and src are different because src must be open.)
  701. {
  702. CACHETABLE ct = toku_cachefile_get_cachetable(old_h->cf);
  703. CACHEFILE cf;
  704. r = toku_cachefile_of_iname_in_env(ct, dst_fname_in_env, &cf);
  705. if (r==0) {
  706. r = EINVAL;
  707. goto cleanup;
  708. }
  709. assert(r==ENOENT);
  710. r = 0;
  711. }
  712. if (txn) {
  713. r = toku_txn_note_brt(txn, old_h); // mark old brt as touched by this txn
  714. assert_zero(r);
  715. }
  716. struct brt_header *new_h;
  717. r = dictionary_redirect_internal(dst_fname_in_env, old_h, txn, &new_h);
  718. assert_zero(r);
  719. // make rollback log entry
  720. if (txn) {
  721. assert(!toku_list_empty(&new_h->live_brts));
  722. r = toku_txn_note_brt(txn, new_h); // mark new brt as touched by this txn
  723. FILENUM old_filenum = toku_cachefile_filenum(old_h->cf);
  724. FILENUM new_filenum = toku_cachefile_filenum(new_h->cf);
  725. r = toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
  726. assert_zero(r);
  727. TXNID xid = toku_txn_get_txnid(txn);
  728. toku_brt_header_suppress_rollbacks(new_h, txn);
  729. r = toku_log_suppress_rollback(txn->logger, NULL, 0, new_filenum, xid);
  730. assert_zero(r);
  731. }
  732. cleanup:
  733. return r;
  734. }
  735. //Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
  736. static int find_xid (OMTVALUE v, void *txnv) {
  737. TOKUTXN txn = v;
  738. TOKUTXN txnfind = txnv;
  739. if (txn->txnid64<txnfind->txnid64) return -1;
  740. if (txn->txnid64>txnfind->txnid64) return +1;
  741. return 0;
  742. }
  743. // returns if ref was added
  744. BOOL
  745. toku_brtheader_maybe_add_txn_ref(struct brt_header* h, TOKUTXN txn) {
  746. BOOL ref_added = FALSE;
  747. OMTVALUE txnv;
  748. u_int32_t index;
  749. toku_brtheader_lock(h);
  750. // Does brt already know about transaction txn?
  751. int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv, &index);
  752. if (r==0) {
  753. // It's already there.
  754. assert((TOKUTXN)txnv==txn);
  755. ref_added = FALSE;
  756. goto exit;
  757. }
  758. // Otherwise it's not there.
  759. // Insert reference to transaction into brt
  760. r = toku_omt_insert_at(h->txns, txn, index);
  761. assert(r==0);
  762. ref_added = TRUE;
  763. exit:
  764. toku_brtheader_unlock(h);
  765. return ref_added;
  766. }
  767. void
  768. toku_brtheader_remove_txn_ref(struct brt_header* h, TOKUTXN txn) {
  769. OMTVALUE txnv_again=NULL;
  770. u_int32_t index;
  771. toku_brtheader_lock(h);
  772. int r = toku_omt_find_zero(h->txns, find_xid, txn, &txnv_again, &index);
  773. assert(r==0);
  774. assert(txnv_again == txn);
  775. r = toku_omt_delete_at(h->txns, index);
  776. assert(r==0);
  777. // TODO: (Zardosht) figure out how to properly do this
  778. // below this unlock, are depending on ydb lock
  779. toku_brtheader_unlock(h);
  780. if (!toku_brt_header_needed(h)) {
  781. //Close immediately.
  782. // I have no idea how this error string business works
  783. char *error_string = NULL;
  784. r = toku_remove_brtheader(h, &error_string, false, ZERO_LSN);
  785. lazy_assert_zero(r);
  786. }
  787. }
  788. void toku_calculate_root_offset_pointer (
  789. struct brt_header* h,
  790. CACHEKEY* root_key,
  791. u_int32_t *roothash
  792. )
  793. {
  794. *roothash = toku_cachetable_hash(h->cf, h->root_blocknum);
  795. *root_key = h->root_blocknum;
  796. }
  797. void toku_brtheader_set_new_root_blocknum(
  798. struct brt_header* h,
  799. CACHEKEY new_root_key
  800. )
  801. {
  802. h->root_blocknum = new_root_key;
  803. }
  804. LSN toku_brt_checkpoint_lsn(struct brt_header* h) {
  805. return h->checkpoint_lsn;
  806. }
  807. int toku_brtheader_set_panic(struct brt_header *h, int panic, char *panic_string) {
  808. if (h->panic == 0) {
  809. h->panic = panic;
  810. if (h->panic_string) {
  811. toku_free(h->panic_string);
  812. }
  813. h->panic_string = toku_strdup(panic_string);
  814. }
  815. return 0;
  816. }