Class: BigRecord::ConnectionAdapters::HbaseAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- BigRecord::ConnectionAdapters::HbaseAdapter
- Defined in:
- lib/big_record/connection_adapters/hbase_adapter.rb
Constant Summary
- LOST_CONNECTION_ERROR_MESSAGES =
[ "Server shutdown in progress", "Broken pipe", "Lost connection to HBase server during query", "HBase server has gone away" ]
- TYPE_NULL =
data types
0x00
- TYPE_STRING =
0x01
- TYPE_BOOLEAN =
utf-8 strings
TYPE_INTEGER = 0x02; # delegate to YAML TYPE_FLOAT = 0x03; # fixed 1 byte
0x04
- TYPE_BINARY =
delegate to YAML
TYPE_MAP = 0x05; # delegate to YAML TYPE_DATETIME = 0x06; # delegate to YAML
0x07
- CHARSET =
string charset
"utf-8"
- NULL =
utility constants
"\000"
- @@emulate_booleans =
true
Instance Method Summary
-
- (Boolean) active?
CONNECTION MANAGEMENT ====================================.
-
- (Object) adapter_name
:nodoc:.
- - (Object) add_column_family(table_name, column_name, options = {}) (also: #add_family)
-
- (Object) build_serialized_value(type, value)
Serialize an object in a given type.
- - (Object) configuration
- - (Object) create_table(table_name, options = {}) {|table_definition| ... }
- - (Object) delete(table_name, row, timestamp = nil)
-
- (Object) deserialize(str)
Deserialize the given string.
-
- (Object) deserialize_with_header(data)
Deserialize the given string assumed to be in the type header format.
- - (Object) disconnect!
- - (Object) drop_table(table_name)
- - (Object) get(table_name, row, column, options = {})
- - (Object) get_all_schema_versions
- - (Object) get_columns(table_name, row, columns, options = {})
- - (Object) get_columns_raw(table_name, row, columns, options = {})
- - (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
- - (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
- - (Object) get_raw(table_name, row, column, options = {})
-
- (HbaseAdapter) initialize(connection, logger, connection_options, config)
constructor
TRUE = "\001".
-
- (Object) initialize_schema_migrations_table
SCHEMA STATEMENTS ========================================.
- - (Object) modify_column_family(table_name, column_name, options = {}) (also: #modify_family)
- - (Object) reconnect!
- - (Object) remove_column_family(table_name, column_name) (also: #remove_family)
-
- (Object) serialize(value)
Serialize the given value.
-
- (Boolean) supports_migrations?
:nodoc:.
- - (Boolean) table_exists?(table_name)
- - (Object) truncate_table(table_name)
- - (Object) update(table_name, row, values, timestamp)
-
- (Object) update_raw(table_name, row, values, timestamp)
DATABASE STATEMENTS ======================================.
Methods inherited from AbstractAdapter
#delete_all, #disable_referential_integrity, #prefetch_primary_key?, #quote_table_name, #raw_connection, #requires_reloading?, #reset_runtime, #supports_count_distinct?, #supports_ddl_transactions?, #verify!
Methods included from Quoting
#quote, #quote_column_name, #quote_string, #quoted_date, #quoted_false, #quoted_true
Methods included from DatabaseStatements
Constructor Details
- (HbaseAdapter) initialize(connection, logger, connection_options, config)
TRUE = "\001"
FALSE = "\000"
58 59 60 61 62 63 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 58 def initialize(connection, logger, , config) super(connection, logger) , @config = , config connect end |
Instance Method Details
- (Boolean) active?
CONNECTION MANAGEMENT ====================================
79 80 81 82 83 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 79 def active? @connection.ping rescue BigRecord::Driver::DriverError false end |
- (Object) adapter_name
:nodoc:
69 70 71 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 69 def adapter_name 'HBase' end |
- (Object) add_column_family(table_name, column_name, options = {}) Also known as: add_family
251 252 253 254 255 256 257 258 259 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 251 def add_column_family(table_name, column_name, = {}) column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, ) result = nil log "ADD COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do result = @connection.add_column(table_name, column) end result end |
- (Object) build_serialized_value(type, value)
Serialize an object in a given type
295 296 297 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 295 def build_serialized_value(type, value) type.chr + value end |
- (Object) configuration
65 66 67 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 65 def configuration @config.clone end |
- (Object) create_table(table_name, options = {}) {|table_definition| ... }
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 227 def create_table(table_name, = {}) table_definition = HbaseAdapterTable.new yield table_definition if block_given? if [:force] && table_exists?(table_name) drop_table(table_name) end result = nil log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do result = @connection.create_table(table_name, table_definition.to_adapter_format) end result end |
- (Object) delete(table_name, row, timestamp = nil)
185 186 187 188 189 190 191 192 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 185 def delete(table_name, row, = nil) ||= Time.now. result = nil log "DELETE FROM #{table_name} WHERE ROW=#{row};" do result = @connection.delete(table_name, row, ) end result end |
- (Object) deserialize(str)
Deserialize the given string. This method supports both the pure YAML format and the type header format.
301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 301 def deserialize(str) return unless str # stay compatible with the old serialization code # YAML documents start with "--- " so if we find that sequence at the beginning we # consider it as a serialized YAML value, else it's the new format with the type header if str[0..3] == "--- " YAML::load(str) if str else deserialize_with_header(str) end end |
- (Object) deserialize_with_header(data)
Deserialize the given string assumed to be in the type header format.
315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 315 def deserialize_with_header(data) return unless data and data.size >= 2 # the type of the data is encoded in the first byte type = data[0]; case type when TYPE_NULL then nil when TYPE_STRING then data[1..-1] when TYPE_BINARY then data[1..-1] else data end end |
- (Object) disconnect!
90 91 92 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 90 def disconnect! @connection.close rescue nil end |
- (Object) drop_table(table_name)
243 244 245 246 247 248 249 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 243 def drop_table(table_name) result = nil log "DROP TABLE #{table_name};" do result = @connection.drop_table(table_name) end result end |
- (Object) get(table_name, row, column, options = {})
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 121 def get(table_name, row, column, ={}) serialized_result = get_raw(table_name, row, column, ) result = nil if serialized_result.is_a?(Array) result = serialized_result.collect{|e| deserialize(e)} else result = deserialize(serialized_result) end result end |
- (Object) get_all_schema_versions
215 216 217 218 219 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 215 def get_all_schema_versions sm_table = BigRecord::Migrator.schema_migrations_table_name get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]} end |
- (Object) get_columns(table_name, row, columns, options = {})
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 140 def get_columns(table_name, row, columns, ={}) row_cols = get_columns_raw(table_name, row, columns, ) result = {} return nil unless row_cols row_cols.each do |key, col| result[key] = if key == 'id' col else deserialize(col) end end result end |
- (Object) get_columns_raw(table_name, row, columns, options = {})
132 133 134 135 136 137 138 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 132 def get_columns_raw(table_name, row, columns, ={}) result = {} log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do result = @connection.get_columns(table_name, row, columns, ) end result end |
- (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 164 def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row) result = rows.collect do |row_cols| cols = {} row_cols.each do |key, col| begin cols[key] = if key == 'id' col else deserialize(col) end rescue Exception => e puts "Could not load column value #{key} for row=#{row_cols['id']}" end end cols end result end |
- (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
156 157 158 159 160 161 162 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 156 def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil) result = nil log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do result = @connection.get_consecutive_rows(table_name, start_row, limit, columns, stop_row) end result end |
- (Object) get_raw(table_name, row, column, options = {})
113 114 115 116 117 118 119 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 113 def get_raw(table_name, row, column, ={}) result = nil log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do result = @connection.get(table_name, row, column, ) end result end |
- (Object) initialize_schema_migrations_table
SCHEMA STATEMENTS ========================================
205 206 207 208 209 210 211 212 213 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 205 def initialize_schema_migrations_table sm_table = BigRecord::Migrator.schema_migrations_table_name unless table_exists?(sm_table) create_table(sm_table) do |t| t.family :attribute, :versions => 1 end end end |
- (Object) modify_column_family(table_name, column_name, options = {}) Also known as: modify_family
273 274 275 276 277 278 279 280 281 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 273 def modify_column_family(table_name, column_name, = {}) column = BigRecord::Driver::ColumnDescriptor.new(column_name.to_s, ) result = nil log "MODIFY COLUMN TABLE #{table_name} COLUMN #{column_name} (#{options.inspect});" do result = @connection.modify_column(table_name, column) end result end |
- (Object) reconnect!
85 86 87 88 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 85 def reconnect! disconnect! connect end |
- (Object) remove_column_family(table_name, column_name) Also known as: remove_family
263 264 265 266 267 268 269 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 263 def remove_column_family(table_name, column_name) result = nil log "REMOVE COLUMN TABLE #{table_name} COLUMN #{column_name};" do result = @connection.remove_column(table_name, column_name) end result end |
- (Object) serialize(value)
Serialize the given value
286 287 288 289 290 291 292 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 286 def serialize(value) case value when NilClass then NULL when String then build_serialized_value(TYPE_STRING, value) else value.to_yaml end end |
- (Boolean) supports_migrations?
:nodoc:
73 74 75 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 73 def supports_migrations? true end |
- (Boolean) table_exists?(table_name)
221 222 223 224 225 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 221 def table_exists?(table_name) log "TABLE EXISTS? #{table_name};" do @connection.table_exists?(table_name) end end |
- (Object) truncate_table(table_name)
194 195 196 197 198 199 200 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 194 def truncate_table(table_name) result = nil log "TRUNCATE TABLE #{table_name}" do result = @connection.truncate_table(table_name) end result end |
- (Object) update(table_name, row, values, timestamp)
105 106 107 108 109 110 111 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 105 def update(table_name, row, values, ) serialized_collection = {} values.each do |column, value| serialized_collection[column] = serialize(value) end update_raw(table_name, row, serialized_collection, ) end |
- (Object) update_raw(table_name, row, values, timestamp)
DATABASE STATEMENTS ======================================
97 98 99 100 101 102 103 |
# File 'lib/big_record/connection_adapters/hbase_adapter.rb', line 97 def update_raw(table_name, row, values, ) result = nil log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do result = @connection.update(table_name, row, values, ) end result end |