|
|
|
@ -348,6 +348,11 @@ pthread_mutex_t federated_mutex; // This is the mutex we use to |
|
|
|
// init the hash
|
|
|
|
static int federated_init= FALSE; // Variable for checking the
|
|
|
|
// init state of hash
|
|
|
|
static char ident_quote_char= '`'; // Character for quoting
|
|
|
|
// identifiers
|
|
|
|
static char value_quote_char= '\''; // Character for quoting
|
|
|
|
// literals
|
|
|
|
static const int bulk_padding= 64; // bytes "overhead" in packet
|
|
|
|
|
|
|
|
/* Federated storage engine handlerton */ |
|
|
|
|
|
|
|
@ -440,6 +445,58 @@ bool federated_db_end() |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@brief Append identifiers to the string. |
|
|
|
|
|
|
|
@param[in,out] string The target string. |
|
|
|
@param[in] name Identifier name |
|
|
|
@param[in] length Length of identifier name in bytes |
|
|
|
@param[in] quote_char Quote char to use for quoting identifier. |
|
|
|
|
|
|
|
@return Operation Status |
|
|
|
@retval FALSE OK |
|
|
|
@retval TRUE There was an error appending to the string. |
|
|
|
|
|
|
|
@note This function is based upon the append_identifier() function |
|
|
|
in sql_show.cc except that quoting always occurs. |
|
|
|
*/ |
|
|
|
|
|
|
|
static bool append_ident(String *string, const char *name, uint length, |
|
|
|
const char quote_char) |
|
|
|
{ |
|
|
|
bool result; |
|
|
|
uint clen; |
|
|
|
const char *name_end; |
|
|
|
DBUG_ENTER("append_ident"); |
|
|
|
|
|
|
|
if (quote_char) |
|
|
|
{ |
|
|
|
string->reserve(length * 2 + 2); |
|
|
|
if ((result= string->append("e_char, 1, system_charset_info))) |
|
|
|
goto err; |
|
|
|
|
|
|
|
for (name_end= name+length; name < name_end; name+= clen) |
|
|
|
{ |
|
|
|
uchar c= *(uchar *) name; |
|
|
|
if (!(clen= my_mbcharlen(system_charset_info, c))) |
|
|
|
clen= 1; |
|
|
|
if (clen == 1 && c == (uchar) quote_char && |
|
|
|
(result= string->append("e_char, 1, system_charset_info))) |
|
|
|
goto err; |
|
|
|
if ((result= string->append(name, clen, string->charset()))) |
|
|
|
goto err; |
|
|
|
} |
|
|
|
result= string->append("e_char, 1, system_charset_info); |
|
|
|
} |
|
|
|
else |
|
|
|
result= string->append(name, length, system_charset_info); |
|
|
|
|
|
|
|
err: |
|
|
|
DBUG_RETURN(result); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
Check (in create) whether the tables exists, and that it can be connected to |
|
|
|
|
|
|
|
@ -458,7 +515,6 @@ bool federated_db_end() |
|
|
|
static int check_foreign_data_source(FEDERATED_SHARE *share, |
|
|
|
bool table_create_flag) |
|
|
|
{ |
|
|
|
char escaped_table_name[NAME_LEN*2]; |
|
|
|
char query_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
uint error_code; |
|
|
|
@ -499,7 +555,6 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
int escaped_table_name_length= 0; |
|
|
|
/*
|
|
|
|
Since we do not support transactions at this version, we can let the |
|
|
|
client API silently reconnect. For future versions, we will need more |
|
|
|
@ -517,14 +572,8 @@ static int check_foreign_data_source(FEDERATED_SHARE *share, |
|
|
|
query.append(FEDERATED_SELECT); |
|
|
|
query.append(FEDERATED_STAR); |
|
|
|
query.append(FEDERATED_FROM); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
escaped_table_name_length= |
|
|
|
escape_string_for_mysql(&my_charset_bin, (char*)escaped_table_name, |
|
|
|
sizeof(escaped_table_name), |
|
|
|
share->table_name, |
|
|
|
share->table_name_length); |
|
|
|
query.append(escaped_table_name, escaped_table_name_length); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
append_ident(&query, share->table_name, share->table_name_length, |
|
|
|
ident_quote_char); |
|
|
|
query.append(FEDERATED_WHERE); |
|
|
|
query.append(FEDERATED_FALSE); |
|
|
|
|
|
|
|
@ -725,7 +774,9 @@ error: |
|
|
|
ha_federated::ha_federated(TABLE *table_arg) |
|
|
|
:handler(&federated_hton, table_arg), |
|
|
|
mysql(0), stored_result(0) |
|
|
|
{} |
|
|
|
{ |
|
|
|
bzero(&bulk_insert, sizeof(bulk_insert)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -784,9 +835,8 @@ uint ha_federated::convert_row_to_internal_format(byte *record, |
|
|
|
static bool emit_key_part_name(String *to, KEY_PART_INFO *part) |
|
|
|
{ |
|
|
|
DBUG_ENTER("emit_key_part_name"); |
|
|
|
if (to->append(FEDERATED_BTICK) || |
|
|
|
to->append(part->field->field_name) || |
|
|
|
to->append(FEDERATED_BTICK)) |
|
|
|
if (append_ident(to, part->field->field_name, |
|
|
|
strlen(part->field->field_name), ident_quote_char)) |
|
|
|
DBUG_RETURN(1); // Out of memory
|
|
|
|
DBUG_RETURN(0); |
|
|
|
} |
|
|
|
@ -1309,31 +1359,28 @@ static FEDERATED_SHARE *get_share(const char *table_name, TABLE *table) |
|
|
|
query.append(FEDERATED_SELECT); |
|
|
|
for (field= table->field; *field; field++) |
|
|
|
{ |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
query.append((*field)->field_name); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
append_ident(&query, (*field)->field_name, |
|
|
|
strlen((*field)->field_name), ident_quote_char); |
|
|
|
query.append(FEDERATED_COMMA); |
|
|
|
} |
|
|
|
query.length(query.length()- strlen(FEDERATED_COMMA)); |
|
|
|
query.append(FEDERATED_FROM); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
|
|
|
|
tmp_share.table_name_length= strlen(tmp_share.table_name); |
|
|
|
append_ident(&query, tmp_share.table_name, |
|
|
|
tmp_share.table_name_length, ident_quote_char); |
|
|
|
|
|
|
|
if (!(share= (FEDERATED_SHARE *) |
|
|
|
my_multi_malloc(MYF(MY_WME), |
|
|
|
&share, sizeof(*share), |
|
|
|
&select_query, |
|
|
|
query.length()+table->s->connect_string.length+1, |
|
|
|
&select_query, query.length()+1, |
|
|
|
NullS))) |
|
|
|
goto error; |
|
|
|
|
|
|
|
memcpy(share, &tmp_share, sizeof(tmp_share)); |
|
|
|
memcpy(select_query, query.ptr(), query.length()+1); |
|
|
|
|
|
|
|
share->table_name_length= strlen(share->table_name); |
|
|
|
/* TODO: share->table_name to LEX_STRING object */ |
|
|
|
query.append(share->table_name, share->table_name_length); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
share->select_query= select_query; |
|
|
|
strmov(share->select_query, query.ptr()); |
|
|
|
share->use_count= 0; |
|
|
|
DBUG_PRINT("info", |
|
|
|
("share->select_query %s", share->select_query)); |
|
|
|
@ -1467,6 +1514,8 @@ int ha_federated::open(const char *name, int mode, uint test_if_locked) |
|
|
|
table->s->reclength); |
|
|
|
DBUG_PRINT("info", ("ref_length: %u", ref_length)); |
|
|
|
|
|
|
|
reset(); |
|
|
|
|
|
|
|
DBUG_RETURN(0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -1538,6 +1587,83 @@ inline uint field_in_record_is_null(TABLE *table, |
|
|
|
DBUG_RETURN(0); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@brief Construct the INSERT statement. |
|
|
|
|
|
|
|
@details This method will construct the INSERT statement and appends it to |
|
|
|
the supplied query string buffer. |
|
|
|
|
|
|
|
@return |
|
|
|
@retval FALSE No error |
|
|
|
@retval TRUE Failure |
|
|
|
*/ |
|
|
|
|
|
|
|
bool ha_federated::append_stmt_insert(String *query) |
|
|
|
{ |
|
|
|
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
Field **field; |
|
|
|
uint tmp_length; |
|
|
|
|
|
|
|
/* The main insert query string */ |
|
|
|
String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); |
|
|
|
DBUG_ENTER("ha_federated::append_stmt_insert"); |
|
|
|
|
|
|
|
insert_string.length(0); |
|
|
|
|
|
|
|
if (replace_duplicates) |
|
|
|
insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); |
|
|
|
else if (ignore_duplicates && !insert_dup_update) |
|
|
|
insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); |
|
|
|
else |
|
|
|
insert_string.append(STRING_WITH_LEN("INSERT INTO ")); |
|
|
|
append_ident(&insert_string, share->table_name, share->table_name_length, |
|
|
|
ident_quote_char); |
|
|
|
insert_string.append(FEDERATED_OPENPAREN); |
|
|
|
tmp_length= insert_string.length() - strlen(FEDERATED_COMMA); |
|
|
|
|
|
|
|
/*
|
|
|
|
loop through the field pointer array, add any fields to both the values |
|
|
|
list and the fields list that match the current query id |
|
|
|
*/ |
|
|
|
for (field= table->field; *field; field++) |
|
|
|
{ |
|
|
|
/* append the field name */ |
|
|
|
append_ident(&insert_string, (*field)->field_name, |
|
|
|
strlen((*field)->field_name), ident_quote_char); |
|
|
|
|
|
|
|
/* append commas between both fields and fieldnames */ |
|
|
|
/*
|
|
|
|
unfortunately, we can't use the logic |
|
|
|
if *(fields + 1) to make the following |
|
|
|
appends conditional because we may not append |
|
|
|
if the next field doesn't match the condition: |
|
|
|
(((*field)->query_id && (*field)->query_id == current_query_id) |
|
|
|
*/ |
|
|
|
insert_string.append(FEDERATED_COMMA); |
|
|
|
} |
|
|
|
|
|
|
|
/*
|
|
|
|
remove trailing comma |
|
|
|
*/ |
|
|
|
insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); |
|
|
|
|
|
|
|
/*
|
|
|
|
if there were no fields, we don't want to add a closing paren |
|
|
|
AND, we don't want to chop off the last char '(' |
|
|
|
insert will be "INSERT INTO t1 VALUES ();" |
|
|
|
*/ |
|
|
|
if (insert_string.length() > tmp_length) |
|
|
|
{ |
|
|
|
insert_string.append(FEDERATED_CLOSEPAREN); |
|
|
|
} |
|
|
|
|
|
|
|
insert_string.append(FEDERATED_VALUES); |
|
|
|
|
|
|
|
DBUG_RETURN(query->append(insert_string)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
write_row() inserts a row. No extra() hint is given currently if a bulk load |
|
|
|
is happeneding. buf() is a byte array of data. You can use the field |
|
|
|
@ -1554,13 +1680,14 @@ inline uint field_in_record_is_null(TABLE *table, |
|
|
|
|
|
|
|
int ha_federated::write_row(byte *buf) |
|
|
|
{ |
|
|
|
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
char values_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE]; |
|
|
|
Field **field; |
|
|
|
uint tmp_length; |
|
|
|
int error= 0; |
|
|
|
bool use_bulk_insert; |
|
|
|
bool auto_increment_update_required= (table->next_number_field != NULL); |
|
|
|
|
|
|
|
/* The main insert query string */ |
|
|
|
String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin); |
|
|
|
/* The string containing the values to be added to the insert */ |
|
|
|
String values_string(values_buffer, sizeof(values_buffer), &my_charset_bin); |
|
|
|
/* The actual value of the field, to be added to the values_string */ |
|
|
|
@ -1568,7 +1695,6 @@ int ha_federated::write_row(byte *buf) |
|
|
|
sizeof(insert_field_value_buffer), |
|
|
|
&my_charset_bin); |
|
|
|
values_string.length(0); |
|
|
|
insert_string.length(0); |
|
|
|
insert_field_value_string.length(0); |
|
|
|
DBUG_ENTER("ha_federated::write_row"); |
|
|
|
|
|
|
|
@ -1578,15 +1704,19 @@ int ha_federated::write_row(byte *buf) |
|
|
|
|
|
|
|
/*
|
|
|
|
start both our field and field values strings |
|
|
|
We must disable multi-row insert for "INSERT...ON DUPLICATE KEY UPDATE" |
|
|
|
Ignore duplicates is always true when insert_dup_update is true. |
|
|
|
When replace_duplicates == TRUE, we can safely enable multi-row insert. |
|
|
|
When performing multi-row insert, we only collect the columns values for |
|
|
|
the row. The start of the statement is only created when the first |
|
|
|
row is copied in to the bulk_insert string. |
|
|
|
*/ |
|
|
|
insert_string.append(FEDERATED_INSERT); |
|
|
|
insert_string.append(FEDERATED_BTICK); |
|
|
|
insert_string.append(share->table_name, share->table_name_length); |
|
|
|
insert_string.append(FEDERATED_BTICK); |
|
|
|
insert_string.append(FEDERATED_OPENPAREN); |
|
|
|
if (!(use_bulk_insert= bulk_insert.str && |
|
|
|
(!insert_dup_update || replace_duplicates))) |
|
|
|
append_stmt_insert(&values_string); |
|
|
|
|
|
|
|
values_string.append(FEDERATED_VALUES); |
|
|
|
values_string.append(FEDERATED_OPENPAREN); |
|
|
|
tmp_length= values_string.length(); |
|
|
|
|
|
|
|
/*
|
|
|
|
loop through the field pointer array, add any fields to both the values |
|
|
|
@ -1599,14 +1729,12 @@ int ha_federated::write_row(byte *buf) |
|
|
|
else |
|
|
|
{ |
|
|
|
(*field)->val_str(&insert_field_value_string); |
|
|
|
values_string.append('\''); |
|
|
|
values_string.append(value_quote_char); |
|
|
|
insert_field_value_string.print(&values_string); |
|
|
|
values_string.append('\''); |
|
|
|
values_string.append(value_quote_char); |
|
|
|
|
|
|
|
insert_field_value_string.length(0); |
|
|
|
} |
|
|
|
/* append the field name */ |
|
|
|
insert_string.append((*field)->field_name); |
|
|
|
|
|
|
|
/* append the value */ |
|
|
|
values_string.append(insert_field_value_string); |
|
|
|
@ -1620,32 +1748,61 @@ int ha_federated::write_row(byte *buf) |
|
|
|
if the next field doesn't match the condition: |
|
|
|
(((*field)->query_id && (*field)->query_id == current_query_id) |
|
|
|
*/ |
|
|
|
insert_string.append(FEDERATED_COMMA); |
|
|
|
values_string.append(FEDERATED_COMMA); |
|
|
|
} |
|
|
|
|
|
|
|
/*
|
|
|
|
remove trailing comma |
|
|
|
*/ |
|
|
|
insert_string.length(insert_string.length() - strlen(FEDERATED_COMMA)); |
|
|
|
/*
|
|
|
|
if there were no fields, we don't want to add a closing paren |
|
|
|
AND, we don't want to chop off the last char '(' |
|
|
|
insert will be "INSERT INTO t1 VALUES ();" |
|
|
|
*/ |
|
|
|
if (table->s->fields) |
|
|
|
if (values_string.length() > tmp_length) |
|
|
|
{ |
|
|
|
/* chops off leading commas */ |
|
|
|
values_string.length(values_string.length() - strlen(FEDERATED_COMMA)); |
|
|
|
insert_string.append(FEDERATED_CLOSEPAREN); |
|
|
|
} |
|
|
|
/* we always want to append this, even if there aren't any fields */ |
|
|
|
values_string.append(FEDERATED_CLOSEPAREN); |
|
|
|
|
|
|
|
/* add the values */ |
|
|
|
insert_string.append(values_string); |
|
|
|
if (use_bulk_insert) |
|
|
|
{ |
|
|
|
/*
|
|
|
|
Send the current bulk insert out if appending the current row would |
|
|
|
cause the statement to overflow the packet size, otherwise set |
|
|
|
auto_increment_update_required to FALSE as no query was executed. |
|
|
|
*/ |
|
|
|
if (bulk_insert.length + values_string.length() + bulk_padding > |
|
|
|
mysql->net.max_packet_size && bulk_insert.length) |
|
|
|
{ |
|
|
|
error= mysql_real_query(mysql, bulk_insert.str, bulk_insert.length); |
|
|
|
bulk_insert.length= 0; |
|
|
|
} |
|
|
|
else |
|
|
|
auto_increment_update_required= FALSE; |
|
|
|
|
|
|
|
if (bulk_insert.length == 0) |
|
|
|
{ |
|
|
|
char insert_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
String insert_string(insert_buffer, sizeof(insert_buffer), |
|
|
|
&my_charset_bin); |
|
|
|
insert_string.length(0); |
|
|
|
append_stmt_insert(&insert_string); |
|
|
|
dynstr_append_mem(&bulk_insert, insert_string.ptr(), |
|
|
|
insert_string.length()); |
|
|
|
} |
|
|
|
else |
|
|
|
dynstr_append_mem(&bulk_insert, ",", 1); |
|
|
|
|
|
|
|
if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length())) |
|
|
|
dynstr_append_mem(&bulk_insert, values_string.ptr(), |
|
|
|
values_string.length()); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
error= mysql_real_query(mysql, values_string.ptr(), |
|
|
|
values_string.length()); |
|
|
|
} |
|
|
|
|
|
|
|
if (error) |
|
|
|
{ |
|
|
|
DBUG_RETURN(stash_remote_error()); |
|
|
|
} |
|
|
|
@ -1653,12 +1810,79 @@ int ha_federated::write_row(byte *buf) |
|
|
|
If the table we've just written a record to contains an auto_increment |
|
|
|
field, then store the last_insert_id() value from the foreign server |
|
|
|
*/ |
|
|
|
if (table->next_number_field) |
|
|
|
if (auto_increment_update_required) |
|
|
|
update_auto_increment(); |
|
|
|
|
|
|
|
DBUG_RETURN(0); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@brief Prepares the storage engine for bulk inserts. |
|
|
|
|
|
|
|
@param[in] rows estimated number of rows in bulk insert |
|
|
|
or 0 if unknown. |
|
|
|
|
|
|
|
@details Initializes memory structures required for bulk insert. |
|
|
|
*/ |
|
|
|
|
|
|
|
void ha_federated::start_bulk_insert(ha_rows rows) |
|
|
|
{ |
|
|
|
uint page_size; |
|
|
|
DBUG_ENTER("ha_federated::start_bulk_insert"); |
|
|
|
|
|
|
|
dynstr_free(&bulk_insert); |
|
|
|
|
|
|
|
/**
|
|
|
|
We don't bother with bulk-insert semantics when the estimated rows == 1 |
|
|
|
The rows value will be 0 if the server does not know how many rows |
|
|
|
would be inserted. This can occur when performing INSERT...SELECT |
|
|
|
*/ |
|
|
|
|
|
|
|
if (rows == 1) |
|
|
|
DBUG_VOID_RETURN; |
|
|
|
|
|
|
|
page_size= (uint) my_getpagesize(); |
|
|
|
|
|
|
|
if (init_dynamic_string(&bulk_insert, NULL, page_size, page_size)) |
|
|
|
DBUG_VOID_RETURN; |
|
|
|
|
|
|
|
bulk_insert.length= 0; |
|
|
|
DBUG_VOID_RETURN; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@brief End bulk insert. |
|
|
|
|
|
|
|
@details This method will send any remaining rows to the remote server. |
|
|
|
Finally, it will deinitialize the bulk insert data structure. |
|
|
|
|
|
|
|
@return Operation status |
|
|
|
@retval 0 No error |
|
|
|
@retval != 0 Error occured at remote server. Also sets my_errno. |
|
|
|
*/ |
|
|
|
|
|
|
|
int ha_federated::end_bulk_insert() |
|
|
|
{ |
|
|
|
int error= 0; |
|
|
|
DBUG_ENTER("ha_federated::end_bulk_insert"); |
|
|
|
|
|
|
|
if (bulk_insert.str && bulk_insert.length) |
|
|
|
{ |
|
|
|
if (mysql_real_query(mysql, bulk_insert.str, bulk_insert.length)) |
|
|
|
error= stash_remote_error(); |
|
|
|
else |
|
|
|
if (table->next_number_field) |
|
|
|
update_auto_increment(); |
|
|
|
} |
|
|
|
|
|
|
|
dynstr_free(&bulk_insert); |
|
|
|
|
|
|
|
DBUG_RETURN(my_errno= error); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
ha_federated::update_auto_increment |
|
|
|
|
|
|
|
@ -1688,9 +1912,8 @@ int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt) |
|
|
|
|
|
|
|
query.set_charset(system_charset_info); |
|
|
|
query.append(FEDERATED_OPTIMIZE); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
query.append(share->table_name, share->table_name_length); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
append_ident(&query, share->table_name, share->table_name_length, |
|
|
|
ident_quote_char); |
|
|
|
|
|
|
|
if (mysql_real_query(mysql, query.ptr(), query.length())) |
|
|
|
{ |
|
|
|
@ -1711,9 +1934,8 @@ int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt) |
|
|
|
|
|
|
|
query.set_charset(system_charset_info); |
|
|
|
query.append(FEDERATED_REPAIR); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
query.append(share->table_name, share->table_name_length); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
append_ident(&query, share->table_name, share->table_name_length, |
|
|
|
ident_quote_char); |
|
|
|
if (check_opt->flags & T_QUICK) |
|
|
|
query.append(FEDERATED_QUICK); |
|
|
|
if (check_opt->flags & T_EXTEND) |
|
|
|
@ -1788,10 +2010,12 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) |
|
|
|
update_string.length(0); |
|
|
|
where_string.length(0); |
|
|
|
|
|
|
|
update_string.append(FEDERATED_UPDATE); |
|
|
|
update_string.append(FEDERATED_BTICK); |
|
|
|
update_string.append(share->table_name); |
|
|
|
update_string.append(FEDERATED_BTICK); |
|
|
|
if (ignore_duplicates) |
|
|
|
update_string.append(STRING_WITH_LEN("UPDATE IGNORE ")); |
|
|
|
else |
|
|
|
update_string.append(STRING_WITH_LEN("UPDATE ")); |
|
|
|
append_ident(&update_string, share->table_name, |
|
|
|
share->table_name_length, ident_quote_char); |
|
|
|
update_string.append(FEDERATED_SET); |
|
|
|
|
|
|
|
/*
|
|
|
|
@ -1806,8 +2030,11 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) |
|
|
|
|
|
|
|
for (Field **field= table->field; *field; field++) |
|
|
|
{ |
|
|
|
where_string.append((*field)->field_name); |
|
|
|
update_string.append((*field)->field_name); |
|
|
|
uint field_name_length= strlen((*field)->field_name); |
|
|
|
append_ident(&where_string, (*field)->field_name, field_name_length, |
|
|
|
ident_quote_char); |
|
|
|
append_ident(&update_string, (*field)->field_name, field_name_length, |
|
|
|
ident_quote_char); |
|
|
|
update_string.append(FEDERATED_EQ); |
|
|
|
|
|
|
|
if ((*field)->is_null()) |
|
|
|
@ -1816,9 +2043,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) |
|
|
|
{ |
|
|
|
/* otherwise = */ |
|
|
|
(*field)->val_str(&field_value); |
|
|
|
update_string.append('\''); |
|
|
|
update_string.append(value_quote_char); |
|
|
|
field_value.print(&update_string); |
|
|
|
update_string.append('\''); |
|
|
|
update_string.append(value_quote_char); |
|
|
|
field_value.length(0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -1829,9 +2056,9 @@ int ha_federated::update_row(const byte *old_data, byte *new_data) |
|
|
|
where_string.append(FEDERATED_EQ); |
|
|
|
(*field)->val_str(&field_value, |
|
|
|
(char*) (old_data + (*field)->offset())); |
|
|
|
where_string.append('\''); |
|
|
|
where_string.append(value_quote_char); |
|
|
|
field_value.print(&where_string); |
|
|
|
where_string.append('\''); |
|
|
|
where_string.append(value_quote_char); |
|
|
|
field_value.length(0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -1888,16 +2115,16 @@ int ha_federated::delete_row(const byte *buf) |
|
|
|
delete_string.length(0); |
|
|
|
delete_string.append(FEDERATED_DELETE); |
|
|
|
delete_string.append(FEDERATED_FROM); |
|
|
|
delete_string.append(FEDERATED_BTICK); |
|
|
|
delete_string.append(share->table_name); |
|
|
|
delete_string.append(FEDERATED_BTICK); |
|
|
|
append_ident(&delete_string, share->table_name, |
|
|
|
share->table_name_length, ident_quote_char); |
|
|
|
delete_string.append(FEDERATED_WHERE); |
|
|
|
|
|
|
|
for (Field **field= table->field; *field; field++) |
|
|
|
{ |
|
|
|
Field *cur_field= *field; |
|
|
|
data_string.length(0); |
|
|
|
delete_string.append(cur_field->field_name); |
|
|
|
append_ident(&delete_string, (*field)->field_name, |
|
|
|
strlen((*field)->field_name), ident_quote_char); |
|
|
|
|
|
|
|
if (cur_field->is_null()) |
|
|
|
{ |
|
|
|
@ -1907,9 +2134,9 @@ int ha_federated::delete_row(const byte *buf) |
|
|
|
{ |
|
|
|
delete_string.append(FEDERATED_EQ); |
|
|
|
cur_field->val_str(&data_string); |
|
|
|
delete_string.append('\''); |
|
|
|
delete_string.append(value_quote_char); |
|
|
|
data_string.print(&delete_string); |
|
|
|
delete_string.append('\''); |
|
|
|
delete_string.append(value_quote_char); |
|
|
|
} |
|
|
|
|
|
|
|
delete_string.append(FEDERATED_AND); |
|
|
|
@ -2397,7 +2624,6 @@ int ha_federated::info(uint flag) |
|
|
|
{ |
|
|
|
char error_buffer[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
char status_buf[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
char escaped_table_name[FEDERATED_QUERY_BUFFER_SIZE]; |
|
|
|
int error; |
|
|
|
uint error_code; |
|
|
|
MYSQL_RES *result= 0; |
|
|
|
@ -2411,14 +2637,8 @@ int ha_federated::info(uint flag) |
|
|
|
{ |
|
|
|
status_query_string.length(0); |
|
|
|
status_query_string.append(FEDERATED_INFO); |
|
|
|
status_query_string.append(FEDERATED_SQUOTE); |
|
|
|
|
|
|
|
escape_string_for_mysql(&my_charset_bin, (char *)escaped_table_name, |
|
|
|
sizeof(escaped_table_name), |
|
|
|
share->table_name, |
|
|
|
share->table_name_length); |
|
|
|
status_query_string.append(escaped_table_name); |
|
|
|
status_query_string.append(FEDERATED_SQUOTE); |
|
|
|
append_ident(&status_query_string, share->table_name, |
|
|
|
share->table_name_length, value_quote_char); |
|
|
|
|
|
|
|
if (mysql_real_query(mysql, status_query_string.ptr(), |
|
|
|
status_query_string.length())) |
|
|
|
@ -2484,6 +2704,51 @@ error: |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@brief Handles extra signals from MySQL server |
|
|
|
|
|
|
|
@param[in] operation Hint for storage engine |
|
|
|
|
|
|
|
@return Operation Status |
|
|
|
@retval 0 OK |
|
|
|
*/ |
|
|
|
int ha_federated::extra(ha_extra_function operation) |
|
|
|
{ |
|
|
|
DBUG_ENTER("ha_federated::extra"); |
|
|
|
switch (operation) { |
|
|
|
case HA_EXTRA_IGNORE_DUP_KEY: |
|
|
|
ignore_duplicates= TRUE; |
|
|
|
break; |
|
|
|
case HA_EXTRA_NO_IGNORE_DUP_KEY: |
|
|
|
insert_dup_update= FALSE; |
|
|
|
ignore_duplicates= FALSE; |
|
|
|
break; |
|
|
|
case HA_EXTRA_WRITE_CAN_REPLACE: |
|
|
|
replace_duplicates= TRUE; |
|
|
|
break; |
|
|
|
case HA_EXTRA_WRITE_CANNOT_REPLACE: |
|
|
|
/*
|
|
|
|
We use this flag to ensure that we do not create an "INSERT IGNORE" |
|
|
|
statement when inserting new rows into the remote table. |
|
|
|
*/ |
|
|
|
replace_duplicates= FALSE; |
|
|
|
break; |
|
|
|
case HA_EXTRA_INSERT_WITH_UPDATE: |
|
|
|
insert_dup_update= TRUE; |
|
|
|
break; |
|
|
|
case HA_EXTRA_RESET: |
|
|
|
insert_dup_update= FALSE; |
|
|
|
ignore_duplicates= FALSE; |
|
|
|
replace_duplicates= FALSE; |
|
|
|
break; |
|
|
|
default: |
|
|
|
/* do nothing */ |
|
|
|
DBUG_PRINT("info",("unhandled operation: %d", (uint) operation)); |
|
|
|
} |
|
|
|
DBUG_RETURN(0); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
Used to delete all rows in a table. Both for cases of truncate and |
|
|
|
for cases where the optimizer realizes that all rows will be |
|
|
|
@ -2506,9 +2771,8 @@ int ha_federated::delete_all_rows() |
|
|
|
|
|
|
|
query.set_charset(system_charset_info); |
|
|
|
query.append(FEDERATED_TRUNCATE); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
query.append(share->table_name); |
|
|
|
query.append(FEDERATED_BTICK); |
|
|
|
append_ident(&query, share->table_name, share->table_name_length, |
|
|
|
ident_quote_char); |
|
|
|
|
|
|
|
/*
|
|
|
|
TRUNCATE won't return anything in mysql_affected_rows |
|
|
|
@ -2616,6 +2880,9 @@ int ha_federated::stash_remote_error() |
|
|
|
DBUG_ENTER("ha_federated::stash_remote_error()"); |
|
|
|
remote_error_number= mysql_errno(mysql); |
|
|
|
strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1); |
|
|
|
if (remote_error_number == ER_DUP_ENTRY || |
|
|
|
remote_error_number == ER_DUP_KEY) |
|
|
|
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); |
|
|
|
DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM); |
|
|
|
} |
|
|
|
|
|
|
|
|