diff --git a/mysql-test/r/cassandra.result b/mysql-test/r/cassandra.result index 6dbf08651d8..651adf40c7b 100644 --- a/mysql-test/r/cassandra.result +++ b/mysql-test/r/cassandra.result @@ -14,7 +14,12 @@ ERROR HY000: Unable to connect to foreign data source: Default TException. [Keys create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra thrift_host='localhost' keyspace='no_such_keyspace'; ERROR HY000: Can't create table 'test.t1' (errno: 140) -create table t1 (rowkey char(36) primary key, column1 char(60)) engine=cassandra -thrift_host='localhost' keyspace='mariadbtest' column_family='cf1'; -insert into t1 values ('key0', 'data1'); +create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra +thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; +insert into t1 values ('rowkey10', 'data1-value', 123456); +insert into t1 values ('rowkey11', 'data1-value2', 34543); +select * from t1; +rowkey data1 data2 + data1-value 123456 + data1-value2 34543 drop table t1; diff --git a/mysql-test/t/cassandra.test b/mysql-test/t/cassandra.test index 6acb5f769ab..e316e23d626 100644 --- a/mysql-test/t/cassandra.test +++ b/mysql-test/t/cassandra.test @@ -36,12 +36,12 @@ create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra ./cqlsh --cql3 -CREATE KEYSPACE mariadbtest +CREATE KEYSPACE mariadbtest2 WITH strategy_class = 'org.apache.cassandra.locator.SimpleStrategy' AND strategy_options:replication_factor='1'; -USE mariadbtest; -create columnfamily cf1 ( pk varchar primary key, data1 varchar); +USE mariadbtest2; +create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint); --enable_parsing ############################################################################ @@ -49,11 +49,12 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar); ############################################################################ # Now, create a table for real and insert data -create table t1 (rowkey char(36) primary key, data1 varchar(60)) engine=cassandra - thrift_host='localhost' keyspace='mariadbtest' column_family='cf1'; - -insert into t1 values ('key0', 'data1'); +create table t1 (rowkey char(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra + thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; +insert into t1 values ('rowkey10', 'data1-value', 123456); +insert into t1 values ('rowkey11', 'data1-value2', 34543); +select * from t1; drop table t1; ############################################################################ diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index e3d731815f9..f8891a43351 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -60,7 +60,12 @@ class Cassandra_se_impl: public Cassandra_se_interface std::string key_to_insert; int64_t insert_timestamp; std::vector* insert_list; + + /* Resultset we're reading */ + std::vector key_slice_vec; + std::vector::iterator key_slice_it; + SlicePredicate slice_pred; public: Cassandra_se_impl() : cass(NULL) {} virtual ~Cassandra_se_impl(){ delete cass; } @@ -78,10 +83,18 @@ public: void add_insert_column(const char *name, const char *value, int value_len); bool do_insert(); - /* Reads */ + /* Reads, point lookups */ bool get_slice(char *key, size_t key_len, bool *found); bool get_next_read_column(char **name, char **value, int *value_len); + /* Reads, multi-row scans */ + bool get_range_slices(); + void finish_reading_range_slices(); + bool get_next_range_slice_row(); + + /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ + void clear_read_columns(); + void add_read_column(const char *name); }; @@ -265,7 +278,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) try { cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred, - ConsistencyLevel::ONE); + cur_consistency_level); if (column_data_vec.size() == 0) { @@ -318,3 +331,70 @@ bool Cassandra_se_impl::get_next_read_column(char **name, char **value, } +bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters +{ + bool res= true; + + ColumnParent cparent; + cparent.column_family= column_family; + + /* SlicePredicate can be used to limit columns we will retrieve */ + // Try passing nothing... + + KeyRange key_range; // Try passing nothing, too. + key_range.__isset.start_key=true; + key_range.__isset.end_key=true; + key_range.start_key.assign("", 0); + key_range.end_key.assign("", 0); + + try { + + cass->get_range_slices(key_slice_vec, + cparent, slice_pred, key_range, + cur_consistency_level); + res= false; + + } catch (InvalidRequestException ire) { + print_error("%s [%s]", ire.what(), ire.why.c_str()); + } catch (UnavailableException ue) { + print_error("UnavailableException: %s", ue.what()); + } catch (TimedOutException te) { + print_error("TimedOutException: %s", te.what()); + } + + key_slice_it= key_slice_vec.begin(); + return res; +} + + +bool Cassandra_se_impl::get_next_range_slice_row() +{ + if (key_slice_it == key_slice_vec.end()) + return true; + + column_data_vec= key_slice_it->columns; + column_data_it= column_data_vec.begin(); + key_slice_it++; + return false; +} + + +void Cassandra_se_impl::finish_reading_range_slices() +{ + key_slice_vec.clear(); +} + + +void Cassandra_se_impl::clear_read_columns() +{ + slice_pred.column_names.clear(); +} + + +void Cassandra_se_impl::add_read_column(const char *name_arg) +{ + std::string name(name_arg); + slice_pred.__isset.column_names= true; + slice_pred.column_names.push_back(name); +} + diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index dea4f0f51ea..d35a6e6a003 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -7,22 +7,6 @@ */ -/* - Storage for (name,value) pairs. name==NULL means 'non-object'. - - This should be used for - - shipping data from sql to cassandra for INSERTs - - shipping data from cassandra to SQL for record reads. - -*/ -class NameAndValue -{ -public: - char *name; - char *value; - size_t value_len; -}; - /* Interface to one cassandra column family, i.e. one 'table' */ @@ -52,6 +36,15 @@ public: virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ; virtual bool get_next_read_column(char **name, char **value, int *value_len)=0; + /* Reads, multi-row scans */ + virtual bool get_range_slices()=0; + virtual void finish_reading_range_slices()=0; + virtual bool get_next_range_slice_row()=0; + + /* read_set setup */ + virtual void clear_read_columns()=0; + virtual void add_read_column(const char *name)=0; + /* Passing error messages up to ha_cassandra */ char err_buffer[512]; const char *error_str() { return err_buffer; } diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 757c0fd5e10..02eddcaf5cd 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -212,8 +212,7 @@ static handler* cassandra_create_handler(handlerton *hton, ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) :handler(hton, table_arg), - se(NULL), names_and_vals(NULL), - field_converters(NULL) + se(NULL), field_converters(NULL) {} @@ -265,11 +264,7 @@ int ha_cassandra::close(void) DBUG_ENTER("ha_cassandra::close"); delete se; se= NULL; - if (names_and_vals) - { - my_free(names_and_vals); - names_and_vals= NULL; - } + free_field_converters(); DBUG_RETURN(free_share(share)); } @@ -589,6 +584,7 @@ void ha_cassandra::free_field_converters() } } + void store_key_image_to_rec(Field *field, uchar *ptr, uint len); int ha_cassandra::index_read_map(uchar *buf, const uchar *key, @@ -622,34 +618,49 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, } else { - char *cass_name; - char *cass_value; - int cass_value_len; - Field **field; + read_cassandra_columns(); + } - /* Start with all fields being NULL */ - for (field= table->field + 1; *field; field++) - (*field)->set_null(); + DBUG_RETURN(rc); +} - while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len)) + +void ha_cassandra::read_cassandra_columns() +{ + char *cass_name; + char *cass_value; + int cass_value_len; + Field **field; + + /* + cassandra_to_mariadb() calls will use field->store(...) methods, which + require that the column is in the table->write_set + */ + my_bitmap_map *old_map; + old_map= dbug_tmp_use_all_columns(table, table->write_set); + + /* Start with all fields being NULL */ + for (field= table->field + 1; *field; field++) + (*field)->set_null(); + + while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len)) + { + // map to our column. todo: use hash or something.. + int idx=1; + for (field= table->field + 1; *field; field++) { - // map to our column. todo: use hash or something.. - int idx=1; - for (field= table->field + 1; *field; field++) + idx++; + if (!strcmp((*field)->field_name, cass_name)) { - idx++; - if (!strcmp((*field)->field_name, cass_name)) - { - int fieldnr= (*field)->field_index; - (*field)->set_notnull(); - field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len); - break; - } + int fieldnr= (*field)->field_index; + (*field)->set_notnull(); + field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len); + break; } } } - DBUG_RETURN(rc); + dbug_tmp_restore_column_map(table->write_set, old_map); } @@ -690,19 +701,50 @@ int ha_cassandra::write_row(uchar *buf) } -NameAndValue *ha_cassandra::get_names_and_vals() +int ha_cassandra::rnd_init(bool scan) +{ + bool bres; + DBUG_ENTER("ha_cassandra::rnd_init"); + if (!scan) + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + + se->clear_read_columns(); + for (uint i= 1; i < table->s->fields; i++) + se->add_read_column(table->field[i]->field_name); + + bres= se->get_range_slices(); + + DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0); +} + + +int ha_cassandra::rnd_end() +{ + DBUG_ENTER("ha_cassandra::rnd_end"); + + se->finish_reading_range_slices(); + DBUG_RETURN(0); +} + + +int ha_cassandra::rnd_next(uchar *buf) { - if (names_and_vals) - return names_and_vals; + int rc; + DBUG_ENTER("ha_cassandra::rnd_next"); + + // Unpack and return the next record. + if (se->get_next_range_slice_row()) + { + rc= HA_ERR_END_OF_FILE; + } else { - size_t size= sizeof(NameAndValue) * (table->s->fields + 1); - names_and_vals= (NameAndValue*)my_malloc(size ,0); - memset(names_and_vals, 0, size); - return names_and_vals; + read_cassandra_columns(); + rc= 0; } -} + DBUG_RETURN(rc); +} ///////////////////////////////////////////////////////////////////////////// // Dummy implementations start @@ -743,27 +785,6 @@ int ha_cassandra::index_last(uchar *buf) DBUG_RETURN(rc); } -int ha_cassandra::rnd_init(bool scan) -{ - DBUG_ENTER("ha_cassandra::rnd_init"); - DBUG_RETURN(0); -} - -int ha_cassandra::rnd_end() -{ - DBUG_ENTER("ha_cassandra::rnd_end"); - DBUG_RETURN(0); -} - - -int ha_cassandra::rnd_next(uchar *buf) -{ - int rc; - DBUG_ENTER("ha_cassandra::rnd_next"); - rc= HA_ERR_END_OF_FILE; - DBUG_RETURN(rc); -} - void ha_cassandra::position(const uchar *record) { DBUG_ENTER("ha_cassandra::position"); diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index 595b840d360..30958e4e17c 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -36,19 +36,17 @@ class ha_cassandra: public handler Cassandra_se_interface *se; - /* pre-allocated array of #fields elements */ - NameAndValue *names_and_vals; - NameAndValue *get_names_and_vals(); - - ColumnDataConverter **field_converters; uint n_field_converters; bool setup_field_converters(Field **field, uint n_fields); void free_field_converters(); + + void read_cassandra_columns(); public: ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ~ha_cassandra() { + free_field_converters(); delete se; }