Browse Source

MWL#182: Explain running statements

First code
- "Asynchronous procedure call" system
- new THD::check_killed() that serves APC request is called from within most important loops
- EXPLAIN code is now able to generate EXPLAIN output on-the-fly [incomplete]

Parts that are still missing:
- put THD::check_killed() call into every loop where we could spend significant amount of time
- Make sure EXPLAIN code works for group-by queries that replace JOIN::join_tab with make_simple_join() 
  and other such cases.
- User interface: what error code to use, where to get timeout settings from, etc.
pull/57/head
Sergey Petrunya 14 years ago
parent
commit
7e66213444
  1. 2
      libmysqld/Makefile.am
  2. 7
      sql/Makefile.am
  3. 355
      sql/my_apc.cc
  4. 98
      sql/my_apc.h
  5. 2
      sql/mysql_priv.h
  6. 1
      sql/mysqld.cc
  7. 19
      sql/protocol.h
  8. 1
      sql/sp_head.cc
  9. 119
      sql/sql_class.cc
  10. 94
      sql/sql_class.h
  11. 34
      sql/sql_lex.cc
  12. 6
      sql/sql_lex.h
  13. 27
      sql/sql_parse.cc
  14. 1
      sql/sql_prepare.cc
  15. 81
      sql/sql_select.cc
  16. 7
      sql/sql_select.h
  17. 91
      sql/sql_show.cc
  18. 6
      sql/sql_yacc.yy

2
libmysqld/Makefile.am

@ -81,7 +81,7 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc event_parse_data.cc opt_table_elimination.cc \
multi_range_read.cc opt_index_cond_pushdown.cc \
sql_expression_cache.cc
sql_expression_cache.cc my_apc.cc
# automake misses these
sql_yacc.cc sql_yacc.h: $(top_srcdir)/sql/sql_yacc.yy

7
sql/Makefile.am

@ -84,7 +84,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
multi_range_read.h sql_handler.h \
sql_join_cache.h \
create_options.h \
sql_expression_cache.h
sql_expression_cache.h \
my_apc.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@ -134,7 +135,9 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
sql_servers.cc event_parse_data.cc \
opt_table_elimination.cc create_options.cc \
multi_range_read.cc \
opt_index_cond_pushdown.cc sql_expression_cache.cc
opt_index_cond_pushdown.cc sql_expression_cache.cc \
my_apc.cc
nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c client_plugin.c

355
sql/my_apc.cc

@ -0,0 +1,355 @@
/*
TODO: MP AB Copyright
*/
#ifdef MY_APC_STANDALONE
#include <my_global.h>
#include <my_pthread.h>
#include <my_sys.h>
#else
#include "mysql_priv.h"
#endif
//#include "my_apc.h"
/*
Standalone testing:
g++ -c -DMY_APC_STANDALONE -g -I.. -I../include -o my_apc.o my_apc.cc
g++ -L../mysys -L../dbug -L../strings my_apc.o -lmysys -ldbug -lmystrings -lpthread -lrt
*/
void Apc_target::init()
{
// todo: should use my_pthread_... functions instead?
DBUG_ASSERT(!enabled);
(void)pthread_mutex_init(&LOCK_apc_queue, MY_MUTEX_INIT_SLOW);
}
void Apc_target::destroy()
{
DBUG_ASSERT(!enabled);
pthread_mutex_destroy(&LOCK_apc_queue);
}
void Apc_target::enable()
{
pthread_mutex_lock(&LOCK_apc_queue);
enabled++;
pthread_mutex_unlock(&LOCK_apc_queue);
}
void Apc_target::disable()
{
bool process= FALSE;
pthread_mutex_lock(&LOCK_apc_queue);
if (!(--enabled))
process= TRUE;
pthread_mutex_unlock(&LOCK_apc_queue);
if (process)
process_apc_requests();
}
void Apc_target::enqueue_request(Call_request *qe)
{
//call_queue_size++;
if (apc_calls)
{
Call_request *after= apc_calls->prev;
qe->next= apc_calls;
apc_calls->prev= qe;
qe->prev= after;
after->next= qe;
}
else
{
apc_calls= qe;
qe->next= qe->prev= qe;
}
}
void Apc_target::dequeue_request(Call_request *qe)
{
//call_queue_size--;
if (apc_calls == qe)
{
if ((apc_calls= apc_calls->next) == qe)
{
//DBUG_ASSERT(!call_queue_size);
apc_calls= NULL;
}
}
qe->prev->next= qe->next;
qe->next->prev= qe->prev;
}
/*
Make an apc call in another thread. The caller is responsible so
that we're not calling to ourselves.
*/
bool Apc_target::make_apc_call(apc_func_t func, void *func_arg,
int timeout_sec, bool *timed_out)
{
bool res= TRUE;
*timed_out= FALSE;
pthread_mutex_lock(&LOCK_apc_queue);
if (enabled)
{
/* Create and post the request */
Call_request apc_request;
apc_request.func= func;
apc_request.func_arg= func_arg;
apc_request.done= FALSE;
(void)pthread_cond_init(&apc_request.COND_request, NULL);
(void)pthread_mutex_init(&apc_request.LOCK_request, MY_MUTEX_INIT_SLOW);
pthread_mutex_lock(&apc_request.LOCK_request);
enqueue_request(&apc_request);
apc_request.what="enqueued by make_apc_call";
pthread_mutex_unlock(&LOCK_apc_queue);
struct timespec abstime;
const int timeout= timeout_sec;
set_timespec(abstime, timeout);
int wait_res= 0;
/* todo: how about processing other errors here? */
while (!apc_request.done && (wait_res != ETIMEDOUT))
{
wait_res= pthread_cond_timedwait(&apc_request.COND_request,
&apc_request.LOCK_request, &abstime);
}
if (!apc_request.done)
{
/* We timed out */
apc_request.done= TRUE;
*timed_out= TRUE;
pthread_mutex_unlock(&apc_request.LOCK_request);
pthread_mutex_lock(&LOCK_apc_queue);
dequeue_request(&apc_request);
pthread_mutex_unlock(&LOCK_apc_queue);
res= TRUE;
}
else
{
/* Request was successfully executed and dequeued by the target thread */
pthread_mutex_unlock(&apc_request.LOCK_request);
res= FALSE;
}
/* Destroy all APC request data */
pthread_mutex_destroy(&apc_request.LOCK_request);
pthread_cond_destroy(&apc_request.COND_request);
}
else
{
pthread_mutex_unlock(&LOCK_apc_queue);
}
return res;
}
/*
Process all APC requests
*/
void Apc_target::process_apc_requests()
{
while (1)
{
Call_request *request;
pthread_mutex_lock(&LOCK_apc_queue);
if (!(request= get_first_in_queue()))
{
pthread_mutex_unlock(&LOCK_apc_queue);
break;
}
request->what="seen by process_apc_requests";
pthread_mutex_lock(&request->LOCK_request);
if (request->done)
{
/*
We can get here when
- the requestor thread has been waiting for this request
- the wait has timed out
- it has set request->done=TRUE
- it has released LOCK_request, because its next action
will be to remove the request from the queue, however,
it could not attempt to lock the queue while holding the lock on
request, because that would deadlock with this function
(we here first lock the queue and then lock the request)
*/
pthread_mutex_unlock(&request->LOCK_request);
pthread_mutex_unlock(&LOCK_apc_queue);
fprintf(stderr, "Whoa rare event #1!\n");
continue;
}
/*
Remove the request from the queue (we're holding its lock so we can be
sure that request owner won't try to remove it)
*/
request->what="dequeued by process_apc_requests";
dequeue_request(request);
request->done= TRUE;
pthread_mutex_unlock(&LOCK_apc_queue);
request->func(request->func_arg);
request->what="func called by process_apc_requests";
pthread_cond_signal(&request->COND_request);
pthread_mutex_unlock(&request->LOCK_request);
}
}
/*****************************************************************************
* Testing
*****************************************************************************/
#ifdef MY_APC_STANDALONE
volatile bool started= FALSE;
volatile bool service_should_exit= FALSE;
volatile bool requestors_should_exit=FALSE;
volatile int apcs_served= 0;
volatile int apcs_missed=0;
volatile int apcs_timed_out=0;
Apc_target apc_target;
int int_rand(int size)
{
return round (((double)rand() / RAND_MAX) * size);
}
/* An APC-serving thread */
void *test_apc_service_thread(void *ptr)
{
my_thread_init();
apc_target.init();
apc_target.enable();
started= TRUE;
fprintf(stderr, "# test_apc_service_thread started\n");
while (!service_should_exit)
{
//apc_target.disable();
usleep(10000);
//apc_target.enable();
for (int i = 0; i < 10 && !service_should_exit; i++)
{
apc_target.process_apc_requests();
usleep(int_rand(30));
}
}
apc_target.disable();
apc_target.destroy();
my_thread_end();
pthread_exit(0);
}
class Apc_order
{
public:
int value; // The value
int *where_to; // Where to write it
Apc_order(int a, int *b) : value(a), where_to(b) {}
};
void test_apc_func(void *arg)
{
Apc_order *order=(Apc_order*)arg;
usleep(int_rand(1000));
*(order->where_to) = order->value;
__sync_fetch_and_add(&apcs_served, 1);
}
void *test_apc_requestor_thread(void *ptr)
{
my_thread_init();
fprintf(stderr, "# test_apc_requestor_thread started\n");
while (!requestors_should_exit)
{
int dst_value= 0;
int src_value= int_rand(4*1000*100);
/* Create APC to do dst_value= src_value */
Apc_order apc_order(src_value, &dst_value);
bool timed_out;
bool res= apc_target.make_apc_call(test_apc_func, (void*)&apc_order, 60, &timed_out);
if (res)
{
if (timed_out)
__sync_fetch_and_add(&apcs_timed_out, 1);
else
__sync_fetch_and_add(&apcs_missed, 1);
if (dst_value != 0)
fprintf(stderr, "APC was done even though return value says it wasnt!\n");
}
else
{
if (dst_value != src_value)
fprintf(stderr, "APC was not done even though return value says it was!\n");
}
//usleep(300);
}
fprintf(stderr, "# test_apc_requestor_thread exiting\n");
my_thread_end();
}
const int N_THREADS=23;
int main(int args, char **argv)
{
pthread_t service_thr;
pthread_t request_thr[N_THREADS];
int i, j;
my_thread_global_init();
pthread_create(&service_thr, NULL, test_apc_service_thread, (void*)NULL);
while (!started)
usleep(1000);
for (i = 0; i < N_THREADS; i++)
pthread_create(&request_thr[i], NULL, test_apc_requestor_thread, (void*)NULL);
for (i = 0; i < 15; i++)
{
usleep(500*1000);
fprintf(stderr, "# %d APCs served %d missed\n", apcs_served, apcs_missed);
}
fprintf(stderr, "# Shutting down requestors\n");
requestors_should_exit= TRUE;
for (i = 0; i < N_THREADS; i++)
pthread_join(request_thr[i], NULL);
fprintf(stderr, "# Shutting down service\n");
service_should_exit= TRUE;
pthread_join(service_thr, NULL);
fprintf(stderr, "# Done.\n");
my_thread_end();
my_thread_global_end();
return 0;
}
#endif // MY_APC_STANDALONE

98
sql/my_apc.h

@ -0,0 +1,98 @@
/*
TODO: MP AB Copyright
*/
/*
Design
- Mutex-guarded request queue (it belongs to the target), which can be enabled/
disabled (when empty).
- After the request has been put into queue, the requestor waits for request
to be satisfied. The worker satisifes the request and signals the
requestor.
*/
/*
Target for asynchronous calls.
*/
class Apc_target
{
public:
Apc_target() : enabled(0), apc_calls(NULL) /*, call_queue_size(0)*/ {}
~Apc_target() { DBUG_ASSERT(!enabled && !apc_calls);}
/*
Initialize the target. This must be called before anything else. Right
after initialization, the target is disabled.
*/
void init();
/*
Destroy the target. The target must be disabled when this call is made.
*/
void destroy();
/*
Enter into state where this target will be serving APC requests
*/
void enable();
/*
Leave the state where we could serve APC requests (will serve all already
enqueued requests)
*/
void disable();
/*
This should be called periodically to serve observation requests.
*/
void process_apc_requests();
typedef void (*apc_func_t)(void *arg);
/*
Make an APC call: schedule it for execution and wait until the target
thread has executed it. This function must not be called from a thread
that's different from the target thread.
@retval FALSE - Ok, the call has been made
@retval TRUE - Call wasnt made (either the target is in disabled state or
timeout occured)
*/
bool make_apc_call(apc_func_t func, void *func_arg,
int timeout_sec, bool *timed_out);
private:
class Call_request;
int enabled;
Call_request *apc_calls;
pthread_mutex_t LOCK_apc_queue;
//int call_queue_size;
class Call_request
{
public:
apc_func_t func;
void *func_arg;
bool done;
pthread_mutex_t LOCK_request;
pthread_cond_t COND_request;
Call_request *next;
Call_request *prev;
const char *what;
};
void enqueue_request(Call_request *qe);
void dequeue_request(Call_request *qe);
Call_request *get_first_in_queue()
{
return apc_calls;
}
};
///////////////////////////////////////////////////////////////////////

2
sql/mysql_priv.h

@ -803,6 +803,7 @@ typedef my_bool (*qc_engine_callback)(THD *thd, char *table_key,
ulonglong *engine_data);
#include "sql_string.h"
#include "my_decimal.h"
#include "my_apc.h"
/*
to unify the code that differs only in the argument passed to the
@ -1544,6 +1545,7 @@ bool mysqld_show_create(THD *thd, TABLE_LIST *table_list);
bool mysqld_show_create_db(THD *thd, char *dbname, HA_CREATE_INFO *create);
void mysqld_list_processes(THD *thd,const char *user,bool verbose);
void mysqld_show_explain(THD *thd, ulong thread_id);
int mysqld_show_status(THD *thd);
int mysqld_show_variables(THD *thd,const char *wild);
bool mysqld_show_storage_engines(THD *thd);

1
sql/mysqld.cc

@ -3430,6 +3430,7 @@ SHOW_VAR com_status_vars[]= {
{"show_engine_status", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_ENGINE_STATUS]), SHOW_LONG_STATUS},
{"show_events", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_EVENTS]), SHOW_LONG_STATUS},
{"show_errors", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_ERRORS]), SHOW_LONG_STATUS},
{"show_explain", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_EXPLAIN]), SHOW_LONG_STATUS},
{"show_fields", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_FIELDS]), SHOW_LONG_STATUS},
#ifndef DBUG_OFF
{"show_function_code", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_FUNC_CODE]), SHOW_LONG_STATUS},

19
sql/protocol.h

@ -28,6 +28,7 @@ class Protocol
protected:
THD *thd;
String *packet;
/* Used by net_store_data() for charset conversions */
String *convert;
uint field_pos;
#ifndef DBUG_OFF
@ -42,6 +43,10 @@ protected:
MYSQL_FIELD *next_mysql_field;
MEM_ROOT *alloc;
#endif
/*
The following two are low-level functions that are invoked from
higher-level store_xxx() funcs. The data is stored into this->packet.
*/
bool net_store_data(const uchar *from, size_t length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs);
bool store_string_aux(const char *from, size_t length,
@ -55,6 +60,20 @@ public:
enum { SEND_NUM_ROWS= 1, SEND_DEFAULTS= 2, SEND_EOF= 4 };
virtual bool send_fields(List<Item> *list, uint flags);
void get_packet(const char **start, size_t *length)
{
*start= packet->ptr();
*length= packet->length();
}
void set_packet(const char *start, size_t len)
{
packet->length(0);
packet->append(start, len);
#ifndef DBUG_OFF
field_pos= field_count - 1;
#endif
}
bool store(I_List<i_string> *str_list);
bool store(const char *from, CHARSET_INFO *cs);
String *storage_packet() { return packet; }

1
sql/sp_head.cc

@ -204,6 +204,7 @@ sp_get_flags_for_command(LEX *lex)
case SQLCOM_SHOW_CREATE_TRIGGER:
case SQLCOM_SHOW_DATABASES:
case SQLCOM_SHOW_ERRORS:
case SQLCOM_SHOW_EXPLAIN:
case SQLCOM_SHOW_FIELDS:
case SQLCOM_SHOW_FUNC_CODE:
case SQLCOM_SHOW_GRANTS:

119
sql/sql_class.cc

@ -964,6 +964,7 @@ void THD::init(void)
/* Initialize the Debug Sync Facility. See debug_sync.cc. */
debug_sync_init_thread(this);
#endif /* defined(ENABLED_DEBUG_SYNC) */
apc_target.init();
}
@ -1122,7 +1123,8 @@ void THD::cleanup(void)
pthread_mutex_unlock(&LOCK_user_locks);
ull= NULL;
}
apc_target.destroy();
cleanup_done=1;
DBUG_VOID_RETURN;
}
@ -1666,6 +1668,14 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, long key_length)
int THD::send_explain_fields(select_result *result)
{
List<Item> field_list;
make_explain_field_list(field_list);
return (result->send_fields(field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
}
void THD::make_explain_field_list(List<Item> &field_list)
{
Item *item;
CHARSET_INFO *cs= system_charset_info;
field_list.push_back(new Item_return_int("id",3, MYSQL_TYPE_LONGLONG));
@ -1703,10 +1713,9 @@ int THD::send_explain_fields(select_result *result)
}
item->maybe_null= 1;
field_list.push_back(new Item_empty_string("Extra", 255, cs));
return (result->send_fields(field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
}
#ifdef SIGNAL_WITH_VIO_CLOSE
void THD::close_active_vio()
{
@ -1810,6 +1819,21 @@ void THD::rollback_item_tree_changes()
}
/*
Check if the thread has been killed, and also process "APC requests"
@retval true The thread is killed, execution should be interrupted
@retval false Not killed, continue execution
*/
bool THD::check_killed()
{
if (killed)
return TRUE;
apc_target.process_apc_requests();
return FALSE;
}
/*****************************************************************************
** Functions to provide a interface to select results
*****************************************************************************/
@ -1950,6 +1974,68 @@ int select_send::send_data(List<Item> &items)
DBUG_RETURN(0);
}
//////////////////////////////////////////////////////////////////////////////
int select_result_explain_buffer::send_data(List<Item> &items)
{
List_iterator_fast<Item> li(items);
char buff[MAX_FIELD_WIDTH];
String buffer(buff, sizeof(buff), &my_charset_bin);
DBUG_ENTER("select_send::send_data");
protocol->prepare_for_resend();
Item *item;
while ((item=li++))
{
if (item->send(protocol, &buffer))
{
protocol->free(); // Free used buffer
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
break;
}
/*
Reset buffer to its original state, as it may have been altered in
Item::send().
*/
buffer.set(buff, sizeof(buff), &my_charset_bin);
}
//TODO: do we need the following:
if (thd->is_error())
{
protocol->remove_last_row();
DBUG_RETURN(1);
}
/* psergey-TODO: instead of protocol->write(), steal the packet here */
const char *packet_data;
size_t len;
protocol->get_packet(&packet_data, &len);
String *s= new (thd->mem_root) String;
s->append(packet_data, len);
data_rows.push_back(s);
protocol->remove_last_row(); // <-- this does nothing. Do we need it?
// prepare_for_resend() will wipe out the packet
DBUG_RETURN(0);
}
void select_result_explain_buffer::flush_data()
{
List_iterator<String> it(data_rows);
String *str;
while ((str= it++))
{
/* TODO: write out the lines. */
protocol->set_packet(str->ptr(), str->length());
protocol->write();
delete str;
}
data_rows.empty();
}
//////////////////////////////////////////////////////////////////////////////
bool select_send::send_eof()
{
/*
@ -2810,6 +2896,10 @@ void THD::end_statement()
}
/*
Start using arena specified by @set. Current arena data will be saved to
*backup.
*/
void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
{
DBUG_ENTER("THD::set_n_backup_active_arena");
@ -2824,6 +2914,12 @@ void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
}
/*
Stop using the temporary arena, and start again using the arena that is
specified in *backup.
The temporary arena is returned back into *set.
*/
void THD::restore_active_arena(Query_arena *set, Query_arena *backup)
{
DBUG_ENTER("THD::restore_active_arena");
@ -2836,6 +2932,23 @@ void THD::restore_active_arena(Query_arena *set, Query_arena *backup)
DBUG_VOID_RETURN;
}
// psergey
void Show_explain_request::get_explain_data(void *arg)
{
Show_explain_request *req= (Show_explain_request*)arg;
//TODO: change mem_root to point to request_thd->mem_root.
// Actually, change the ARENA, because we're going to allocate items!
Query_arena backup_arena;
req->target_thd->set_n_backup_active_arena((Query_arena*)req->request_thd,
&backup_arena);
req->target_thd->lex->unit.print_explain(req->explain_buf);
req->target_thd->restore_active_arena((Query_arena*)req->request_thd,
&backup_arena);
}
Statement::~Statement()
{
}

94
sql/sql_class.h

@ -1420,6 +1420,19 @@ struct Ha_data
};
class select_result_explain_buffer;
class Show_explain_request
{
public:
THD *target_thd;
THD *request_thd;
select_result_explain_buffer *explain_buf;
static void get_explain_data(void *arg);
};
/**
@class THD
For each client connection we create a separate thread with THD serving as
@ -1990,6 +2003,8 @@ public:
};
killed_state volatile killed;
bool check_killed();
/* scramble - random string sent to client on handshake */
char scramble[SCRAMBLE_LENGTH+1];
@ -2171,6 +2186,16 @@ public:
void close_active_vio();
#endif
void awake(THD::killed_state state_to_set);
/*
This is what allows this thread to serve as a target for others to
schedule Async Procedure Calls on.
It's possible to schedule arbitrary C function call but currently this
facility is used only by SHOW EXPLAIN code (See Show_explain_request)
*/
Apc_target apc_target;
#ifndef MYSQL_CLIENT
enum enum_binlog_query_type {
@ -2302,6 +2327,7 @@ public:
void add_changed_table(const char *key, long key_length);
CHANGED_TABLE_LIST * changed_table_dup(const char *key, long key_length);
int send_explain_fields(select_result *result);
void make_explain_field_list(List<Item> &field_list);
#ifndef EMBEDDED_LIBRARY
/**
Clear the current error, if any.
@ -2750,10 +2776,42 @@ public:
class JOIN;
class select_result :public Sql_alloc {
/* Pure interface for sending tabular data */
class select_result_sink: public Sql_alloc
{
public:
/*
send_data returns 0 on ok, 1 on error and -1 if data was ignored, for
example for a duplicate row entry written to a temp table.
*/
virtual int send_data(List<Item> &items)=0;
virtual ~select_result_sink() {};
};
/*
Interface for sending tabular data, together with some other stuff:
- Primary purpose seems to be seding typed tabular data:
= the DDL is sent with send_fields()
= the rows are sent with send_data()
Besides that,
- there seems to be an assumption that the sent data is a result of
SELECT_LEX_UNIT *unit,
- nest_level is used by SQL parser
*/
class select_result :public select_result_sink
{
protected:
THD *thd;
/*
All descendant classes have their send_data() skip the first
unit->offset_limit_cnt rows sent. Select_materialize
also uses unit->get_unit_column_types().
*/
SELECT_LEX_UNIT *unit;
/* Something used only by the parser: */
int nest_level;
public:
select_result();
@ -2772,11 +2830,6 @@ public:
virtual uint field_count(List<Item> &fields) const
{ return fields.elements; }
virtual bool send_fields(List<Item> &list, uint flags)=0;
/*
send_data returns 0 on ok, 1 on error and -1 if data was ignored, for
example for a duplicate row entry written to a temp table.
*/
virtual int send_data(List<Item> &items)=0;
virtual bool initialize_tables (JOIN *join=0) { return 0; }
virtual void send_error(uint errcode,const char *err);
virtual bool send_eof()=0;
@ -2809,6 +2862,35 @@ public:
};
/*
A select result sink that collects the sent data and then can flush it to
network when requested.
This class is targeted at collecting EXPLAIN output:
- Unoptimized data storage (can't handle big datasets)
- Unlike select_result class, we don't assume that the sent data is an
output of a SELECT_LEX_UNIT (and so we dont apply "LIMIT x,y" from the
unit)
*/
class select_result_explain_buffer : public select_result_sink
{
public:
THD *thd;
Protocol *protocol;
select_result_explain_buffer(){};
/* The following is called in the child thread: */
int send_data(List<Item> &items);
/* this will be called in the parent thread: */
void flush_data();
List<String> data_rows;
};
/*
Base class for select_result descendands which intercept and
transform result set rows. As the rows are not sent to the client,

34
sql/sql_lex.cc

@ -3623,6 +3623,40 @@ bool st_select_lex::save_prep_leaf_tables(THD *thd)
}
int st_select_lex::print_explain(select_result_sink *output)
{
if (join && join->optimized == 2)
{
//psergey-TODO: any?
return join->print_explain(output, TRUE,
FALSE, // need_tmp_table,
FALSE, // bool need_order,
FALSE, // bool distinct,
NULL); //const char *message
}
else
{
DBUG_ASSERT(0);
/* produce "not yet optimized" line */
}
return 0;
}
int st_select_lex_unit::print_explain(select_result_sink *output)
{
int res= 0;
SELECT_LEX *first= first_select();
for (SELECT_LEX *sl= first; sl; sl= sl->next_select())
{
if ((res= sl->print_explain(output)))
break;
}
return res;
}
/**
A routine used by the parser to decide whether we are specifying a full
partitioning or if only partitions to add or to split.

6
sql/sql_lex.h

@ -120,6 +120,7 @@ enum enum_sql_command {
SQLCOM_SHOW_PROFILE, SQLCOM_SHOW_PROFILES,
SQLCOM_SHOW_USER_STATS, SQLCOM_SHOW_TABLE_STATS, SQLCOM_SHOW_INDEX_STATS,
SQLCOM_SHOW_CLIENT_STATS,
SQLCOM_SHOW_EXPLAIN,
/*
When a command is added here, be sure it's also added in mysqld.cc
@ -253,6 +254,8 @@ typedef uchar index_clause_map;
#define INDEX_HINT_MASK_ALL (INDEX_HINT_MASK_JOIN | INDEX_HINT_MASK_GROUP | \
INDEX_HINT_MASK_ORDER)
class select_result_sink;
/* Single element of an USE/FORCE/IGNORE INDEX list specified as a SQL hint */
class Index_hint : public Sql_alloc
{
@ -591,6 +594,7 @@ public:
friend int subselect_union_engine::exec();
List<Item> *get_unit_column_types();
int print_explain(select_result_sink *output);
};
typedef class st_select_lex_unit SELECT_LEX_UNIT;
@ -908,6 +912,8 @@ public:
bool save_leaf_tables(THD *thd);
bool save_prep_leaf_tables(THD *thd);
int print_explain(select_result_sink *output);
private:
/* current index hint kind. used in filling up index_hints */
enum index_hint_type current_index_hint_type;

27
sql/sql_parse.cc

@ -328,6 +328,7 @@ void init_update_queries(void)
sql_command_flags[SQLCOM_SHOW_ENGINE_STATUS]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_ENGINE_MUTEX]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_ENGINE_LOGS]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_EXPLAIN]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_PROCESSLIST]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_GRANTS]= CF_STATUS_COMMAND;
sql_command_flags[SQLCOM_SHOW_CREATE_DB]= CF_STATUS_COMMAND;
@ -3402,6 +3403,32 @@ end_with_restore_list:
thd->security_ctx->priv_user),
lex->verbose);
break;
case SQLCOM_SHOW_EXPLAIN:
{
/* Same security as SHOW PROCESSLIST (TODO check this) */
if (!thd->security_ctx->priv_user[0] &&
check_global_access(thd,PROCESS_ACL))
break;
Item *it= (Item *)lex->value_list.head();
if (lex->table_or_sp_used())
{
my_error(ER_NOT_SUPPORTED_YET, MYF(0), "Usage of subqueries or stored "
"function calls as part of this statement");
break;
}
if ((!it->fixed && it->fix_fields(lex->thd, &it)) || it->check_cols(1))
{
my_message(ER_SET_CONSTANTS_ONLY, ER(ER_SET_CONSTANTS_ONLY),
MYF(0));
goto error;
}
mysqld_show_explain(thd, (ulong)it->val_int());
break;
}
case SQLCOM_SHOW_AUTHORS:
res= mysqld_show_authors(thd);
break;

1
sql/sql_prepare.cc

@ -2049,6 +2049,7 @@ static bool check_prepared_statement(Prepared_statement *stmt)
case SQLCOM_SHOW_ENGINE_LOGS:
case SQLCOM_SHOW_ENGINE_STATUS:
case SQLCOM_SHOW_ENGINE_MUTEX:
case SQLCOM_SHOW_EXPLAIN:
case SQLCOM_SHOW_CREATE_DB:
case SQLCOM_SHOW_GRANTS:
case SQLCOM_SHOW_BINLOG_EVENTS:

81
sql/sql_select.cc

@ -846,6 +846,12 @@ inject_jtbm_conds(JOIN *join, List<TABLE_LIST> *join_list, Item **join_where)
DBUG_VOID_RETURN;
}
int JOIN::optimize()
{
int res= optimize_inner();
optimized= 2;
return res;
}
/**
global select optimisation.
@ -859,7 +865,7 @@ inject_jtbm_conds(JOIN *join, List<TABLE_LIST> *join_list, Item **join_where)
*/
int
JOIN::optimize()
JOIN::optimize_inner()
{
ulonglong select_opts_for_readinfo;
uint no_jbuf_after;
@ -2888,7 +2894,9 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
if (thd->is_error())
goto err;
thd->apc_target.enable();
join->exec();
thd->apc_target.disable();
if (thd->cursor && thd->cursor->is_open())
{
@ -3529,7 +3537,7 @@ make_join_statistics(JOIN *join, List<TABLE_LIST> &tables_list,
goto error;
/* Generate an execution plan from the found optimal join order. */
DBUG_RETURN(join->thd->killed || get_best_combination(join));
DBUG_RETURN(join->thd->check_killed() || get_best_combination(join));
error:
/*
@ -6276,7 +6284,7 @@ best_extension_by_limited_search(JOIN *join,
DBUG_ENTER("best_extension_by_limited_search");
THD *thd= join->thd;
if (thd->killed) // Abort
if (thd->check_killed()) // Abort
DBUG_RETURN(TRUE);
DBUG_EXECUTE("opt", print_plan(join, idx, read_time, record_count, idx,
@ -6436,7 +6444,7 @@ find_best(JOIN *join,table_map rest_tables,uint idx,double record_count,
{
DBUG_ENTER("find_best");
THD *thd= join->thd;
if (thd->killed)
if (thd->check_killed())
DBUG_RETURN(TRUE);
if (!rest_tables)
{
@ -14452,7 +14460,7 @@ create_internal_tmp_table_from_heap2(THD *thd, TABLE *table,
DBUG_EXECUTE_IF("raise_error", write_err= HA_ERR_FOUND_DUPP_KEY ;);
if (write_err)
goto err;
if (thd->killed)
if (thd->check_killed())
{
thd->send_kill_message();
goto err_killed;
@ -14822,7 +14830,7 @@ sub_select_cache(JOIN *join, JOIN_TAB *join_tab, bool end_of_records)
rc= sub_select(join, join_tab, end_of_records);
DBUG_RETURN(rc);
}
if (join->thd->killed)
if (join->thd->check_killed())
{
/* The user has aborted the execution of the query */
join->thd->send_kill_message();
@ -15121,7 +15129,7 @@ evaluate_join_record(JOIN *join, JOIN_TAB *join_tab,
DBUG_RETURN(NESTED_LOOP_ERROR);
if (error < 0)
DBUG_RETURN(NESTED_LOOP_NO_MORE_ROWS);
if (join->thd->killed) // Aborted by user
if (join->thd->check_killed()) // Aborted by user
{
join->thd->send_kill_message();
DBUG_RETURN(NESTED_LOOP_KILLED); /* purecov: inspected */
@ -16250,7 +16258,7 @@ end_write(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
TABLE *table=join->tmp_table;
DBUG_ENTER("end_write");
if (join->thd->killed) // Aborted by user
if (join->thd->check_killed()) // Aborted by user
{
join->thd->send_kill_message();
DBUG_RETURN(NESTED_LOOP_KILLED); /* purecov: inspected */
@ -16321,7 +16329,7 @@ end_update(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
if (end_of_records)
DBUG_RETURN(NESTED_LOOP_OK);
if (join->thd->killed) // Aborted by user
if (join->thd->check_killed()) // Aborted by user
{
join->thd->send_kill_message();
DBUG_RETURN(NESTED_LOOP_KILLED); /* purecov: inspected */
@ -16402,7 +16410,7 @@ end_unique_update(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
if (end_of_records)
DBUG_RETURN(NESTED_LOOP_OK);
if (join->thd->killed) // Aborted by user
if (join->thd->check_killed()) // Aborted by user
{
join->thd->send_kill_message();
DBUG_RETURN(NESTED_LOOP_KILLED); /* purecov: inspected */
@ -16449,7 +16457,7 @@ end_write_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
int idx= -1;
DBUG_ENTER("end_write_group");
if (join->thd->killed)
if (join->thd->check_killed())
{ // Aborted by user
join->thd->send_kill_message();
DBUG_RETURN(NESTED_LOOP_KILLED); /* purecov: inspected */
@ -18226,7 +18234,7 @@ static int remove_dup_with_compare(THD *thd, TABLE *table, Field **first_field,
error= file->ha_rnd_next(record);
for (;;)
{
if (thd->killed)
if (thd->check_killed())
{
thd->send_kill_message();
error=0;
@ -18358,7 +18366,7 @@ static int remove_dup_with_hash_index(THD *thd, TABLE *table,
for (;;)
{
uchar *org_key_pos;
if (thd->killed)
if (thd->check_killed())
{
thd->send_kill_message();
error=0;
@ -20355,29 +20363,40 @@ void JOIN::clear()
}
}
/**
EXPLAIN handling.
Send a description about what how the select will be done to stdout.
@param on_the_fly TRUE <=> we're being executed on-the-fly, so don't make
modifications to any select's data structures
*/
static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
bool distinct,const char *message)
int JOIN::print_explain(select_result_sink *result, bool on_the_fly,
bool need_tmp_table, bool need_order,
bool distinct, const char *message)
{
List<Item> field_list;
List<Item> item_list;
JOIN *join= this; /* Legacy: this code used to be a non-member function */
THD *thd=join->thd;
select_result *result=join->result;
Item *item_null= new Item_null();
CHARSET_INFO *cs= system_charset_info;
int quick_type;
DBUG_ENTER("select_describe");
int error= 0;
DBUG_ENTER("JOIN::print_explain");
DBUG_PRINT("info", ("Select 0x%lx, type %s, message %s",
(ulong)join->select_lex, join->select_lex->type,
message ? message : "NULL"));
DBUG_ASSERT(this->optimized == 2);
/* Don't log this into the slow query log */
thd->server_status&= ~(SERVER_QUERY_NO_INDEX_USED | SERVER_QUERY_NO_GOOD_INDEX_USED);
join->unit->offset_limit_cnt= 0;
if (!on_the_fly)
{
thd->server_status&= ~(SERVER_QUERY_NO_INDEX_USED | SERVER_QUERY_NO_GOOD_INDEX_USED);
join->unit->offset_limit_cnt= 0;
}
/*
NOTE: the number/types of items pushed into item_list must be in sync with
@ -20398,10 +20417,11 @@ static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
item_list.push_back(new Item_string(message,strlen(message),cs));
if (result->send_data(item_list))
join->error= 1;
error= 1;
}
else if (join->select_lex == join->unit->fake_select_lex)
{
join->select_lex->set_explain_type(); //psergey
/*
here we assume that the query will return at least two rows, so we
show "filesort" in EXPLAIN. Of course, sometimes we'll be wrong
@ -20468,12 +20488,13 @@ static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
item_list.push_back(new Item_string("", 0, cs));
if (result->send_data(item_list))
join->error= 1;
error= 1;
}
else if (!join->select_lex->master_unit()->derived ||
join->select_lex->master_unit()->derived->is_materialized_derived())
{
table_map used_tables=0;
join->select_lex->set_explain_type(); //psergey
bool printing_materialize_nest= FALSE;
uint select_id= join->select_lex->select_number;
@ -20965,9 +20986,23 @@ static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
// For next iteration
used_tables|=table->map;
if (result->send_data(item_list))
join->error= 1;
error= 1;
}
}
DBUG_RETURN(error);
}
static void select_describe(JOIN *join, bool need_tmp_table, bool need_order,
bool distinct,const char *message)
{
THD *thd=join->thd;
select_result *result=join->result;
DBUG_ENTER("select_describe");
join->error= join->print_explain(result, FALSE, /* Not on-the-fly */
need_tmp_table, need_order, distinct,
message);
for (SELECT_LEX_UNIT *unit= join->select_lex->first_inner_unit();
unit;
unit= unit->next_unit())
@ -21010,7 +21045,7 @@ bool mysql_explain_union(THD *thd, SELECT_LEX_UNIT *unit, select_result *result)
for (SELECT_LEX *sl= first; sl; sl= sl->next_select())
{
sl->set_explain_type();
sl->set_explain_type(); //psergey-todo: maybe remove this from here?
sl->options|= SELECT_DESCRIBE;
}

7
sql/sql_select.h

@ -963,7 +963,7 @@ public:
const char *zero_result_cause; ///< not 0 if exec must return zero result
bool union_part; ///< this subselect is part of union
bool optimized; ///< flag to avoid double optimization in EXPLAIN
int optimized; ///< flag to avoid double optimization in EXPLAIN
bool initialized; ///< flag to avoid double init_execution calls
/*
@ -1074,6 +1074,7 @@ public:
SELECT_LEX_UNIT *unit);
bool prepare_stage2();
int optimize();
int optimize_inner();
int reinit();
int init_execution();
void exec();
@ -1167,6 +1168,10 @@ public:
{
return (unit->item && unit->item->is_in_predicate());
}
int print_explain(select_result_sink *result, bool on_the_fly,
bool need_tmp_table, bool need_order,
bool distinct,const char *message);
private:
/**
TRUE if the query contains an aggregate function but has no GROUP

91
sql/sql_show.cc

@ -2024,6 +2024,97 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
DBUG_VOID_RETURN;
}
/*
SHOW EXPLAIN FOR command handler
@param thd Current thread's thd
@param thread_id Thread whose explain we need
@notes
- Attempt to do "SHOW EXPLAIN FOR <myself>" will properly produce "target not
running EXPLAINable command".
- todo: check how all this can/will work when using thread pools
*/
void mysqld_show_explain(THD *thd, ulong thread_id)
{
THD *tmp;
Protocol *protocol= thd->protocol;
List<Item> field_list;
DBUG_ENTER("mysqld_show_explain");
thd->make_explain_field_list(field_list);
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
DBUG_VOID_RETURN;
/*
Find the thread we need EXPLAIN for. Thread search code was copied from
kill_one_thread()
*/
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (tmp->command == COM_DAEMON)
continue;
if (tmp->thread_id == thread_id)
{
pthread_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break;
}
}
VOID(pthread_mutex_unlock(&LOCK_thread_count));
if (tmp)
{
bool bres;
/*
Ok we've found the thread of interest and it won't go away because
we're holding its LOCK_thd data.
Post it an EXPLAIN request.
todo: where to get timeout from?
*/
bool timed_out;
int timeout_sec= 30;
Show_explain_request explain_req;
select_result_explain_buffer *explain_buf;
explain_buf= new select_result_explain_buffer;
explain_buf->thd=thd;
explain_buf->protocol= thd->protocol;
explain_req.explain_buf= explain_buf;
explain_req.target_thd= tmp;
explain_req.request_thd= thd;
bres= tmp->apc_target.make_apc_call(Show_explain_request::get_explain_data,
(void*)&explain_req,
timeout_sec, &timed_out);
if (bres)
{
/* TODO not enabled or time out */
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
"SHOW EXPLAIN",
"Target is not running EXPLAINable command");
}
pthread_mutex_unlock(&tmp->LOCK_thd_data);
if (!bres)
{
explain_buf->flush_data();
my_eof(thd);
}
}
else
{
my_error(ER_NO_SUCH_THREAD, MYF(0), thread_id);
}
DBUG_VOID_RETURN;
}
int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
{
TABLE *table= tables->table;

6
sql/sql_yacc.yy

@ -10852,6 +10852,12 @@ show_param:
Lex->spname= $3;
Lex->sql_command = SQLCOM_SHOW_CREATE_EVENT;
}
| describe_command FOR_SYM expr
{
Lex->sql_command= SQLCOM_SHOW_EXPLAIN;
Lex->value_list.empty();
Lex->value_list.push_front($3);
}
;
show_engine_param:

Loading…
Cancel
Save