|
|
|
@ -113,7 +113,7 @@ static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, |
|
|
|
NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); |
|
|
|
|
|
|
|
/* These match values in enum_cassandra_consistency_level */ |
|
|
|
const char *cassandra_consistency_level[] = |
|
|
|
const char *cassandra_consistency_level[] = |
|
|
|
{ |
|
|
|
"ONE", |
|
|
|
"QUORUM", |
|
|
|
@ -145,19 +145,19 @@ mysql_mutex_t cassandra_default_host_lock; |
|
|
|
static char* cassandra_default_thrift_host = NULL; |
|
|
|
static char cassandra_default_host_buf[256]=""; |
|
|
|
|
|
|
|
static void |
|
|
|
cassandra_default_thrift_host_update(THD *thd, |
|
|
|
static void |
|
|
|
cassandra_default_thrift_host_update(THD *thd, |
|
|
|
struct st_mysql_sys_var* var, |
|
|
|
void* var_ptr, /*!< out: where the
|
|
|
|
formal string goes */ |
|
|
|
const void* save) /*!< in: immediate result
|
|
|
|
const void* save) /*!< in: immediate result
|
|
|
|
from check function */ |
|
|
|
{ |
|
|
|
const char *new_host= *((char**)save); |
|
|
|
const size_t max_len= sizeof(cassandra_default_host_buf); |
|
|
|
|
|
|
|
mysql_mutex_lock(&cassandra_default_host_lock); |
|
|
|
|
|
|
|
|
|
|
|
if (new_host) |
|
|
|
{ |
|
|
|
strncpy(cassandra_default_host_buf, new_host, max_len); |
|
|
|
@ -169,7 +169,7 @@ cassandra_default_thrift_host_update(THD *thd, |
|
|
|
cassandra_default_host_buf[0]= 0; |
|
|
|
cassandra_default_thrift_host= NULL; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
*((const char**)var_ptr)= cassandra_default_thrift_host; |
|
|
|
|
|
|
|
mysql_mutex_unlock(&cassandra_default_host_lock); |
|
|
|
@ -177,10 +177,10 @@ cassandra_default_thrift_host_update(THD *thd, |
|
|
|
|
|
|
|
|
|
|
|
static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host, |
|
|
|
PLUGIN_VAR_RQCMDARG, |
|
|
|
"Default host for Cassandra thrift connections", |
|
|
|
PLUGIN_VAR_RQCMDARG, |
|
|
|
"Default host for Cassandra thrift connections", |
|
|
|
/*check*/NULL, |
|
|
|
cassandra_default_thrift_host_update, |
|
|
|
cassandra_default_thrift_host_update, |
|
|
|
/*default*/NULL); |
|
|
|
|
|
|
|
static struct st_mysql_sys_var* cassandra_system_variables[]= { |
|
|
|
@ -465,7 +465,7 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked) |
|
|
|
if (!(share = get_share(name, table))) |
|
|
|
DBUG_RETURN(1); |
|
|
|
thr_lock_data_init(&share->lock,&lock,NULL); |
|
|
|
|
|
|
|
|
|
|
|
DBUG_ASSERT(!se); |
|
|
|
/*
|
|
|
|
Don't do the following on open: it prevents SHOW CREATE TABLE when the server |
|
|
|
@ -501,7 +501,7 @@ int ha_cassandra::check_table_options(ha_table_option_struct *options) |
|
|
|
if (!options->thrift_host && (!cassandra_default_thrift_host || |
|
|
|
!cassandra_default_thrift_host[0])) |
|
|
|
{ |
|
|
|
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), |
|
|
|
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), |
|
|
|
"thrift_host table option must be specified, or " |
|
|
|
"@@cassandra_default_thrift_host must be set"); |
|
|
|
return HA_WRONG_CREATE_OPTION; |
|
|
|
@ -509,7 +509,7 @@ int ha_cassandra::check_table_options(ha_table_option_struct *options) |
|
|
|
|
|
|
|
if (!options->keyspace || !options->column_family) |
|
|
|
{ |
|
|
|
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), |
|
|
|
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), |
|
|
|
"keyspace and column_family table options must be specified"); |
|
|
|
return HA_WRONG_CREATE_OPTION; |
|
|
|
} |
|
|
|
@ -543,7 +543,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, |
|
|
|
int res; |
|
|
|
DBUG_ENTER("ha_cassandra::create"); |
|
|
|
DBUG_ASSERT(options); |
|
|
|
|
|
|
|
|
|
|
|
Field **pfield= table_arg->s->field; |
|
|
|
if (!((*pfield)->flags & NOT_NULL_FLAG)) |
|
|
|
{ |
|
|
|
@ -555,7 +555,7 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, |
|
|
|
table_arg->key_info[0].key_parts != 1 || |
|
|
|
table_arg->key_info[0].key_part[0].fieldnr != 1) |
|
|
|
{ |
|
|
|
my_error(ER_WRONG_COLUMN_NAME, MYF(0), |
|
|
|
my_error(ER_WRONG_COLUMN_NAME, MYF(0), |
|
|
|
"Table must have PRIMARY KEY defined over the first column"); |
|
|
|
DBUG_RETURN(HA_WRONG_CREATE_OPTION); |
|
|
|
} |
|
|
|
@ -582,7 +582,7 @@ public: |
|
|
|
Field *field; |
|
|
|
|
|
|
|
/* This will save Cassandra's data in the Field */ |
|
|
|
virtual int cassandra_to_mariadb(const char *cass_data, |
|
|
|
virtual int cassandra_to_mariadb(const char *cass_data, |
|
|
|
int cass_data_len)=0; |
|
|
|
|
|
|
|
/*
|
|
|
|
@ -610,7 +610,7 @@ public: |
|
|
|
field->store(*pdata); |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) |
|
|
|
{ |
|
|
|
buf= field->val_real(); |
|
|
|
@ -672,7 +672,7 @@ public: |
|
|
|
field->store(tmp); |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) |
|
|
|
{ |
|
|
|
longlong tmp= field->val_int(); |
|
|
|
@ -731,7 +731,7 @@ public: |
|
|
|
field->store(tmp); |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) |
|
|
|
{ |
|
|
|
int32_t tmp= field->val_int(); |
|
|
|
@ -756,7 +756,7 @@ public: |
|
|
|
field->store(cass_data, cass_data_len,field->charset()); |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool mariadb_to_cassandra(char **cass_data, int *cass_data_len) |
|
|
|
{ |
|
|
|
String *pstr= field->val_str(&buf); |
|
|
|
@ -780,7 +780,7 @@ public: |
|
|
|
DBUG_ASSERT(cass_data_len==8); |
|
|
|
flip64(cass_data, (char*)&tmp); |
|
|
|
/*
|
|
|
|
store_TIME's arguments: |
|
|
|
store_TIME's arguments: |
|
|
|
- seconds since epoch |
|
|
|
- microsecond fraction of a second. |
|
|
|
*/ |
|
|
|
@ -794,7 +794,7 @@ public: |
|
|
|
ulong ts_microsec; |
|
|
|
int64_t tmp; |
|
|
|
ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec); |
|
|
|
|
|
|
|
|
|
|
|
/* Cassandra needs milliseconds-since-epoch */ |
|
|
|
tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000; |
|
|
|
flip64((const char*)&tmp, (char*)&buf); |
|
|
|
@ -1383,7 +1383,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ |
|
|
|
{ |
|
|
|
/*
|
|
|
|
Cassandra's "varint" type is a binary-encoded arbitary-length |
|
|
|
big-endian number. |
|
|
|
big-endian number. |
|
|
|
- It can be mapped to VARBINARY(N), with sufficiently big N. |
|
|
|
- If the value does not fit into N bytes, it is an error. We should not |
|
|
|
truncate it, because that is just as good as returning garbage. |
|
|
|
@ -1391,7 +1391,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ |
|
|
|
are zero-padded, which will work as multiplying the value by |
|
|
|
2^k for some value of k. |
|
|
|
*/ |
|
|
|
if (field->type() == MYSQL_TYPE_VARCHAR && |
|
|
|
if (field->type() == MYSQL_TYPE_VARCHAR && |
|
|
|
field->binary() && |
|
|
|
(!strcmp(validator_name, validator_varint) || |
|
|
|
!strcmp(validator_name, validator_decimal))) |
|
|
|
@ -1675,7 +1675,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void ha_cassandra::print_conversion_error(const char *field_name, |
|
|
|
void ha_cassandra::print_conversion_error(const char *field_name, |
|
|
|
char *cass_value, |
|
|
|
int cass_value_len) |
|
|
|
{ |
|
|
|
@ -1691,7 +1691,7 @@ void ha_cassandra::print_conversion_error(const char *field_name, |
|
|
|
|
|
|
|
se->print_error("Unable to convert value for field `%s` from Cassandra's data" |
|
|
|
" format. Source data is %d bytes, 0x%s%s", |
|
|
|
field_name, cass_value_len, buf, |
|
|
|
field_name, cass_value_len, buf, |
|
|
|
(i == sizeof(buf) - 1)? "..." : ""); |
|
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); |
|
|
|
} |
|
|
|
@ -1759,7 +1759,7 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk) |
|
|
|
if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value, |
|
|
|
cass_value_len)) |
|
|
|
{ |
|
|
|
print_conversion_error((*field)->field_name, cass_value, |
|
|
|
print_conversion_error((*field)->field_name, cass_value, |
|
|
|
cass_value_len); |
|
|
|
res=1; |
|
|
|
goto err; |
|
|
|
@ -1948,7 +1948,7 @@ int ha_cassandra::write_row(uchar *buf) |
|
|
|
my_bitmap_map *old_map; |
|
|
|
int ires; |
|
|
|
DBUG_ENTER("ha_cassandra::write_row"); |
|
|
|
|
|
|
|
|
|
|
|
if (!se && (ires= connect_and_check_options(table))) |
|
|
|
DBUG_RETURN(ires); |
|
|
|
|
|
|
|
@ -1956,7 +1956,7 @@ int ha_cassandra::write_row(uchar *buf) |
|
|
|
se->clear_insert_buffer(); |
|
|
|
|
|
|
|
old_map= dbug_tmp_use_all_columns(table, table->read_set); |
|
|
|
|
|
|
|
|
|
|
|
insert_lineno++; |
|
|
|
|
|
|
|
/* Convert the key */ |
|
|
|
@ -2008,9 +2008,9 @@ int ha_cassandra::write_row(uchar *buf) |
|
|
|
} |
|
|
|
|
|
|
|
dbug_tmp_restore_column_map(table->read_set, old_map); |
|
|
|
|
|
|
|
|
|
|
|
bool res; |
|
|
|
|
|
|
|
|
|
|
|
if (doing_insert_batch) |
|
|
|
{ |
|
|
|
res= 0; |
|
|
|
@ -2025,7 +2025,7 @@ int ha_cassandra::write_row(uchar *buf) |
|
|
|
|
|
|
|
if (res) |
|
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); |
|
|
|
|
|
|
|
|
|
|
|
DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -2046,7 +2046,7 @@ void ha_cassandra::start_bulk_insert(ha_rows rows) |
|
|
|
int ha_cassandra::end_bulk_insert() |
|
|
|
{ |
|
|
|
DBUG_ENTER("ha_cassandra::end_bulk_insert"); |
|
|
|
|
|
|
|
|
|
|
|
/* Flush out the insert buffer */ |
|
|
|
doing_insert_batch= false; |
|
|
|
bool bres= se->do_insert(); |
|
|
|
@ -2133,7 +2133,7 @@ int ha_cassandra::delete_all_rows() |
|
|
|
DBUG_RETURN(ires); |
|
|
|
|
|
|
|
bres= se->truncate(); |
|
|
|
|
|
|
|
|
|
|
|
if (bres) |
|
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); |
|
|
|
|
|
|
|
@ -2145,12 +2145,12 @@ int ha_cassandra::delete_row(const uchar *buf) |
|
|
|
{ |
|
|
|
bool bres; |
|
|
|
DBUG_ENTER("ha_cassandra::delete_row"); |
|
|
|
|
|
|
|
|
|
|
|
bres= se->remove_row(); |
|
|
|
|
|
|
|
|
|
|
|
if (bres) |
|
|
|
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); |
|
|
|
|
|
|
|
|
|
|
|
DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); |
|
|
|
} |
|
|
|
|
|
|
|
@ -2158,7 +2158,7 @@ int ha_cassandra::delete_row(const uchar *buf) |
|
|
|
int ha_cassandra::info(uint flag) |
|
|
|
{ |
|
|
|
DBUG_ENTER("ha_cassandra::info"); |
|
|
|
|
|
|
|
|
|
|
|
if (!table) |
|
|
|
return 1; |
|
|
|
|
|
|
|
@ -2183,7 +2183,7 @@ void key_copy(uchar *to_key, uchar *from_record, KEY *key_info, |
|
|
|
void ha_cassandra::position(const uchar *record) |
|
|
|
{ |
|
|
|
DBUG_ENTER("ha_cassandra::position"); |
|
|
|
|
|
|
|
|
|
|
|
/* Copy the primary key to rowid */ |
|
|
|
key_copy(ref, (uchar*)record, &table->key_info[0], |
|
|
|
table->field[0]->key_length(), true); |
|
|
|
@ -2196,7 +2196,7 @@ int ha_cassandra::rnd_pos(uchar *buf, uchar *pos) |
|
|
|
{ |
|
|
|
int rc; |
|
|
|
DBUG_ENTER("ha_cassandra::rnd_pos"); |
|
|
|
|
|
|
|
|
|
|
|
int save_active_index= active_index; |
|
|
|
active_index= 0; /* The primary key */ |
|
|
|
rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT); |
|
|
|
@ -2230,7 +2230,7 @@ int ha_cassandra::reset() |
|
|
|
- anything else? |
|
|
|
*/ |
|
|
|
ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, |
|
|
|
void *seq_init_param, |
|
|
|
void *seq_init_param, |
|
|
|
uint n_ranges, uint *bufsz, |
|
|
|
uint *flags, COST_VECT *cost) |
|
|
|
{ |
|
|
|
@ -2240,7 +2240,7 @@ ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq, |
|
|
|
|
|
|
|
|
|
|
|
ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys, |
|
|
|
uint key_parts, uint *bufsz, |
|
|
|
uint key_parts, uint *bufsz, |
|
|
|
uint *flags, COST_VECT *cost) |
|
|
|
{ |
|
|
|
/* Can only be equality lookups on the primary key... */ |
|
|
|
@ -2269,14 +2269,14 @@ bool ha_cassandra::mrr_start_read() |
|
|
|
|
|
|
|
my_bitmap_map *old_map; |
|
|
|
old_map= dbug_tmp_use_all_columns(table, table->read_set); |
|
|
|
|
|
|
|
|
|
|
|
se->new_lookup_keys(); |
|
|
|
|
|
|
|
while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range))) |
|
|
|
{ |
|
|
|
char *cass_key; |
|
|
|
int cass_key_len; |
|
|
|
|
|
|
|
|
|
|
|
DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE); |
|
|
|
|
|
|
|
uchar *key= (uchar*)mrr_cur_range.start_key.key; |
|
|
|
@ -2285,9 +2285,9 @@ bool ha_cassandra::mrr_start_read() |
|
|
|
store_key_image_to_rec(table->field[0], (uchar*)key, key_len); |
|
|
|
|
|
|
|
rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len); |
|
|
|
|
|
|
|
|
|
|
|
// Primitive buffer control
|
|
|
|
if (se->add_lookup_key(cass_key, cass_key_len) > |
|
|
|
if (se->add_lookup_key(cass_key, cass_key_len) > |
|
|
|
THDVAR(table->in_use, multiget_batch_size)) |
|
|
|
break; |
|
|
|
} |
|
|
|
@ -2308,7 +2308,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info) |
|
|
|
res= read_cassandra_columns(true); |
|
|
|
break; |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
{ |
|
|
|
if (source_exhausted) |
|
|
|
{ |
|
|
|
@ -2324,7 +2324,7 @@ int ha_cassandra::multi_range_read_next(range_id_t *range_info) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
We get here if we've refilled the buffer and done another read. Try |
|
|
|
reading from results again |
|
|
|
*/ |
|
|
|
@ -2444,7 +2444,7 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) |
|
|
|
} |
|
|
|
|
|
|
|
/*
|
|
|
|
Compare it to the key we've read. For all types that Cassandra supports, |
|
|
|
Compare it to the key we've read. For all types that Cassandra supports, |
|
|
|
binary byte-wise comparison can be used |
|
|
|
*/ |
|
|
|
bool new_primary_key; |
|
|
|
@ -2472,8 +2472,8 @@ int ha_cassandra::update_row(const uchar *old_data, uchar *new_data) |
|
|
|
|
|
|
|
if (new_primary_key) |
|
|
|
{ |
|
|
|
/*
|
|
|
|
Primary key value changed. This is essentially a DELETE + INSERT. |
|
|
|
/*
|
|
|
|
Primary key value changed. This is essentially a DELETE + INSERT. |
|
|
|
Add a DELETE operation into the batch |
|
|
|
*/ |
|
|
|
Column_name_enumerator_impl name_enumerator(this); |
|
|
|
@ -2606,7 +2606,7 @@ int ha_cassandra::external_lock(THD *thd, int lock_type) |
|
|
|
int ha_cassandra::delete_table(const char *name) |
|
|
|
{ |
|
|
|
DBUG_ENTER("ha_cassandra::delete_table"); |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
Cassandra table is just a view. Dropping it doesn't affect the underlying |
|
|
|
column family. |
|
|
|
*/ |
|
|
|
@ -2640,7 +2640,7 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, |
|
|
|
|
|
|
|
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) |
|
|
|
{ |
|
|
|
cassandra_counters_copy= cassandra_counters; |
|
|
|
cassandra_counters_copy= cassandra_counters; |
|
|
|
|
|
|
|
var->type= SHOW_ARRAY; |
|
|
|
var->value= (char *) &cassandra_status_variables; |
|
|
|
|