|
|
|
@ -65,7 +65,10 @@ int toku_logger_create (TOKULOGGER *resultp) { |
|
|
|
r = ml_init(&result->input_lock); if (r!=0) goto panic; |
|
|
|
r = toku_pthread_mutex_init(&result->output_condition_lock, NULL); if (r!=0) goto panic; |
|
|
|
r = toku_pthread_cond_init(&result->output_condition, NULL); if (r!=0) goto panic; |
|
|
|
result->input_lock_ctr = 0; |
|
|
|
result->output_condition_lock_ctr = 0; |
|
|
|
result->output_is_available = TRUE; |
|
|
|
result->swap_ctr = 0; |
|
|
|
return 0; |
|
|
|
|
|
|
|
panic: |
|
|
|
@ -142,6 +145,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { |
|
|
|
int r = 0; |
|
|
|
if (!logger->is_open) goto is_closed; |
|
|
|
ml_lock(&logger->input_lock); |
|
|
|
logger->input_lock_ctr++; |
|
|
|
LSN fsynced_lsn; |
|
|
|
grab_output(logger, &fsynced_lsn); |
|
|
|
r = toku_logger_write_buffer(logger, &fsynced_lsn); if (r!=0) goto panic; //Releases the input lock |
|
|
|
@ -233,11 +237,13 @@ grab_output(TOKULOGGER logger, LSN *fsynced_lsn) |
|
|
|
{ |
|
|
|
int r; |
|
|
|
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0); |
|
|
|
logger->output_condition_lock_ctr++; |
|
|
|
wait_till_output_available(logger); |
|
|
|
logger->output_is_available = FALSE; |
|
|
|
if (fsynced_lsn) { |
|
|
|
*fsynced_lsn = logger->fsynced_lsn; |
|
|
|
} |
|
|
|
logger->output_condition_lock_ctr++; |
|
|
|
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -251,7 +257,7 @@ wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, |
|
|
|
// Exit: Hold the output permission if returns false. |
|
|
|
{ |
|
|
|
BOOL result; |
|
|
|
{ int r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0); } |
|
|
|
{ int r = toku_pthread_mutex_lock(&logger->output_condition_lock); logger->output_condition_lock_ctr++; assert(r==0); } |
|
|
|
while (1) { |
|
|
|
if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock. |
|
|
|
result = TRUE; |
|
|
|
@ -267,7 +273,7 @@ wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, |
|
|
|
assert(r==0); |
|
|
|
} |
|
|
|
*fsynced_lsn = logger->fsynced_lsn; |
|
|
|
{ int r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0); } |
|
|
|
{ int r = toku_pthread_mutex_unlock(&logger->output_condition_lock); logger->output_condition_lock_ctr++; assert(r==0); } |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
@ -279,11 +285,13 @@ release_output (TOKULOGGER logger, LSN fsynced_lsn) |
|
|
|
{ |
|
|
|
int r; |
|
|
|
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0); |
|
|
|
logger->output_condition_lock_ctr++; |
|
|
|
logger->output_is_available = TRUE; |
|
|
|
if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) { |
|
|
|
logger->fsynced_lsn = fsynced_lsn; |
|
|
|
} |
|
|
|
r = toku_pthread_cond_broadcast(&logger->output_condition); assert(r==0); |
|
|
|
logger->output_condition_lock_ctr++; |
|
|
|
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -296,6 +304,7 @@ swap_inbuf_outbuf (TOKULOGGER logger) |
|
|
|
logger->inbuf = logger->outbuf; |
|
|
|
logger->outbuf = tmp; |
|
|
|
assert(logger->inbuf.n_in_buf == 0); |
|
|
|
logger->swap_ctr++; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
@ -333,11 +342,13 @@ toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed) |
|
|
|
{ |
|
|
|
int r; |
|
|
|
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0; |
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = ml_unlock(&logger->input_lock); if (r!=0) goto panic; |
|
|
|
LSN fsynced_lsn; |
|
|
|
grab_output(logger, &fsynced_lsn); |
|
|
|
|
|
|
|
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; |
|
|
|
logger->input_lock_ctr++; |
|
|
|
// Some other thread may have written the log out while we didn't have the lock. If we have space now, then be happy. |
|
|
|
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) { |
|
|
|
release_output(logger, fsynced_lsn); |
|
|
|
@ -376,6 +387,7 @@ int toku_logger_fsync (TOKULOGGER logger) |
|
|
|
int r; |
|
|
|
if (logger->is_panicked) return EINVAL; |
|
|
|
r = ml_lock(&logger->input_lock); assert(r==0); |
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = toku_logger_maybe_fsync(logger, logger->inbuf.max_lsn_in_buf, TRUE); |
|
|
|
if (r!=0) { |
|
|
|
toku_logger_panic(logger, r); |
|
|
|
@ -591,6 +603,7 @@ int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync) |
|
|
|
int r; |
|
|
|
if (do_fsync) { |
|
|
|
// reacquire the locks (acquire output permission first) |
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = ml_unlock(&logger->input_lock); assert(r==0); |
|
|
|
LSN fsynced_lsn; |
|
|
|
BOOL already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn); |
|
|
|
@ -599,9 +612,11 @@ int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync) |
|
|
|
// otherwise we now own the output permission, and our lsn isn't outputed. |
|
|
|
|
|
|
|
r = ml_lock(&logger->input_lock); assert(r==0); |
|
|
|
logger->input_lock_ctr++; |
|
|
|
|
|
|
|
swap_inbuf_outbuf(logger); |
|
|
|
|
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.) |
|
|
|
assert(r==0); |
|
|
|
|
|
|
|
@ -621,6 +636,7 @@ int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync) |
|
|
|
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); |
|
|
|
release_output(logger, fsynced_lsn); |
|
|
|
} else { |
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = ml_unlock(&logger->input_lock); |
|
|
|
assert(r==0); |
|
|
|
} |
|
|
|
@ -635,7 +651,7 @@ toku_logger_write_buffer (TOKULOGGER logger, LSN *fsynced_lsn) |
|
|
|
// Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed. |
|
|
|
{ |
|
|
|
swap_inbuf_outbuf(logger); |
|
|
|
{ int r = ml_unlock(&logger->input_lock); assert(r==0); } |
|
|
|
{ logger->input_lock_ctr++; int r = ml_unlock(&logger->input_lock); assert(r==0); } |
|
|
|
write_outbuf_to_logfile(logger, fsynced_lsn); |
|
|
|
if (logger->write_log_files) { |
|
|
|
int r = toku_file_fsync_without_accounting(logger->fd); |
|
|
|
@ -657,6 +673,7 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) |
|
|
|
LSN fsynced_lsn; |
|
|
|
grab_output(logger, &fsynced_lsn); |
|
|
|
r = ml_lock(&logger->input_lock); assert(r == 0); |
|
|
|
logger->input_lock_ctr++; |
|
|
|
r = toku_logger_write_buffer(logger, &fsynced_lsn); assert(r == 0); |
|
|
|
|
|
|
|
// close the log file |
|
|
|
@ -1172,3 +1189,12 @@ toku_logger_call_remove_finalize_callback(TOKULOGGER logger, DICTIONARY_ID dict_ |
|
|
|
if (logger->remove_finalize_callback) |
|
|
|
logger->remove_finalize_callback(dict_id, logger->remove_finalize_callback_extra); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void |
|
|
|
toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s) { |
|
|
|
s->ilock_ctr = logger->input_lock_ctr; |
|
|
|
s->olock_ctr = logger->output_condition_lock_ctr; |
|
|
|
s->swap_ctr = logger->swap_ctr; |
|
|
|
} |
|
|
|
|