Class: BigRecord::ConnectionAdapters::HbaseRestAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- BigRecord::ConnectionAdapters::HbaseRestAdapter
- Defined in:
- lib/big_record/connection_adapters/hbase_rest_adapter.rb
Constant Summary
- LOST_CONNECTION_ERROR_MESSAGES =
[ "Server shutdown in progress", "Broken pipe", "Lost connection to HBase server during query", "HBase server has gone away" ]
- TYPE_NULL =
data types
0x00
- TYPE_STRING =
0x01
- TYPE_BOOLEAN =
utf-8 strings
0x04
- TYPE_BINARY =
delegate to YAML
0x07
- CHARSET =
string charset
"utf-8"
- NULL =
utility constants
"\000"
- @@emulate_booleans =
true
Instance Method Summary
-
- (Boolean) active?
CONNECTION MANAGEMENT ====================================.
-
- (Object) adapter_name
:nodoc:.
- - (Object) add_column_family(table_name, column_name, options = {}) (also: #add_family)
-
- (Object) build_serialized_value(type, value)
Serialize an object in a given type.
-
- (Object) columns_to_hbase_format(data = {})
DATABASE STATEMENTS ======================================.
- - (Object) configuration
- - (Object) create_table(table_name, options = {}) {|table_definition| ... }
- - (Object) delete(table_name, row, timestamp = nil)
-
- (Object) deserialize(str)
Deserialize the given string.
-
- (Object) deserialize_with_header(data)
Deserialize the given string assumed to be in the type header format.
- - (Object) disconnect!
- - (Object) drop_table(table_name)
- - (Object) get(table_name, row, column, options = {})
- - (Object) get_all_schema_versions
- - (Object) get_columns(table_name, row, columns, options = {})
- - (Object) get_columns_raw(table_name, row, columns = nil, options = {})
- - (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
- - (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
- - (Object) get_raw(table_name, row, column, options = {})
-
- (HbaseRestAdapter) initialize(connection, logger, connection_options, config)
constructor
A new instance of HbaseRestAdapter.
-
- (Object) initialize_schema_migrations_table
SCHEMA STATEMENTS ========================================.
- - (Object) modify_column_family(table_name, column_name, options = {}) (also: #modify_family)
- - (Object) reconnect!
- - (Object) remove_column_family(table_name, column_name) (also: #remove_family)
-
- (Object) serialize(value)
Serialize the given value.
-
- (Boolean) supports_migrations?
:nodoc:.
- - (Boolean) table_exists?(table_name)
- - (Object) truncate_table(table_name)
- - (Object) update(table_name, row, values, timestamp)
- - (Object) update_raw(table_name, row, values, timestamp)
Methods inherited from AbstractAdapter
#delete_all, #disable_referential_integrity, #prefetch_primary_key?, #quote_table_name, #raw_connection, #requires_reloading?, #reset_runtime, #supports_count_distinct?, #supports_ddl_transactions?, #verify!
Methods included from Quoting
#quote, #quote_column_name, #quote_string, #quoted_date, #quoted_false, #quoted_true
Methods included from DatabaseStatements
Constructor Details
- (HbaseRestAdapter) initialize(connection, logger, connection_options, config)
A new instance of HbaseRestAdapter
47 48 49 50 51 52 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 47 def initialize(connection, logger, , config) super(connection, logger) , @config = , config connect end |
Instance Method Details
- (Boolean) active?
CONNECTION MANAGEMENT ====================================
68 69 70 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 68 def active? true end |
- (Object) adapter_name
:nodoc:
58 59 60 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 58 def adapter_name 'HBase-Rest' end |
- (Object) add_column_family(table_name, column_name, options = {}) Also known as: add_family
255 256 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 255 def add_column_family(table_name, column_name, = {}) end |
- (Object) build_serialized_value(type, value)
Serialize an object in a given type
280 281 282 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 280 def build_serialized_value(type, value) type.chr + value end |
- (Object) columns_to_hbase_format(data = {})
DATABASE STATEMENTS ======================================
81 82 83 84 85 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 81 def columns_to_hbase_format(data = {}) # Convert it to the hbase-ruby format # TODO: Add this function to hbase-ruby instead. data.map{|col, content| {:name => col.to_s, :value => content}}.compact end |
- (Object) configuration
54 55 56 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 54 def configuration @config.clone end |
- (Object) create_table(table_name, options = {}) {|table_definition| ... }
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 231 def create_table(table_name, = {}) table_definition = HbaseRestAdapterTable.new yield table_definition if block_given? if [:force] && table_exists?(table_name) drop_table(table_name) end result = nil log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do result = @connection.create_table(table_name, *table_definition.to_adapter_format) end result end |
- (Object) delete(table_name, row, timestamp = nil)
194 195 196 197 198 199 200 201 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 194 def delete(table_name, row, = nil) ||= Time.now. result = nil log "DELETE FROM #{table_name} WHERE ROW=#{row};" do result = @connection.delete_row(table_name, row, ) end result end |
- (Object) deserialize(str)
Deserialize the given string. This method supports both the pure YAML format and the type header format.
286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 286 def deserialize(str) return unless str # stay compatible with the old serialization code # YAML documents start with "--- " so if we find that sequence at the beginning we # consider it as a serialized YAML value, else it's the new format with the type header if str[0..3] == "--- " YAML::load(str) if str else deserialize_with_header(str) end end |
- (Object) deserialize_with_header(data)
Deserialize the given string assumed to be in the type header format.
300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 300 def deserialize_with_header(data) return unless data and data.size >= 2 # the type of the data is encoded in the first byte type = data[0]; case type when TYPE_NULL then nil when TYPE_STRING then data[1..-1] when TYPE_BINARY then data[1..-1] else nil end end |
- (Object) disconnect!
75 76 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 75 def disconnect! end |
- (Object) drop_table(table_name)
247 248 249 250 251 252 253 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 247 def drop_table(table_name) result = nil log "DROP TABLE #{table_name};" do result = @connection.destroy_table(table_name) end result end |
- (Object) get(table_name, row, column, options = {})
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 118 def get(table_name, row, column, ={}) serialized_result = get_raw(table_name, row, column, ) result = nil if serialized_result.is_a?(Array) result = serialized_result.collect{|e| deserialize(e)} else result = deserialize(serialized_result) end result end |
- (Object) get_all_schema_versions
219 220 221 222 223 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 219 def get_all_schema_versions sm_table = BigRecord::Migrator.schema_migrations_table_name get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]} end |
- (Object) get_columns(table_name, row, columns, options = {})
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 143 def get_columns(table_name, row, columns, ={}) row_cols = get_columns_raw(table_name, row, columns, ) return nil unless row_cols result = {} row_cols.each do |key,value| begin result[key] = if key == 'id' value else deserialize(value) end rescue Exception => e puts "Could not load column value #{key} for row=#{row.name}" end end result end |
- (Object) get_columns_raw(table_name, row, columns = nil, options = {})
129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 129 def get_columns_raw(table_name, row, columns = nil, ={}) result = {} = [:timestamp] || nil log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do row = @connection.show_row(table_name, row, , columns, ) result.merge!({'id' => row.name}) columns = row.columns columns.each{ |col| result.merge!({col.name => col.value}) } end result end |
- (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 174 def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil) rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row) result = rows.collect do |row| cols = {} cols['id'] = row.name row.columns.each do |col| key = col.name value = col.value begin cols[key] = deserialize(value) rescue Exception => e puts "Could not load column value #{key} for row=#{row.name}" end end cols end result end |
- (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 163 def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil) result = nil log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do = {:start_row => start_row, :end_row => stop_row, :columns => columns, :batch => 200} scanner = @connection.open_scanner(table_name, ) result = @connection.get_rows(scanner, limit) @connection.close_scanner(scanner) end result end |
- (Object) get_raw(table_name, row, column, options = {})
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 107 def get_raw(table_name, row, column, ={}) result = nil = [:timestamp] || nil log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do columns = @connection.show_row(table_name, row, , column, ).columns result = (columns.size == 1) ? columns.first.value : columns.map(&:value) end result end |
- (Object) initialize_schema_migrations_table
SCHEMA STATEMENTS ========================================
209 210 211 212 213 214 215 216 217 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 209 def initialize_schema_migrations_table sm_table = BigRecord::Migrator.schema_migrations_table_name unless table_exists?(sm_table) create_table(sm_table) do |t| t.family :attribute, :versions => 1 end end end |
- (Object) modify_column_family(table_name, column_name, options = {}) Also known as: modify_family
265 266 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 265 def modify_column_family(table_name, column_name, = {}) end |
- (Object) reconnect!
72 73 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 72 def reconnect! end |
- (Object) remove_column_family(table_name, column_name) Also known as: remove_family
260 261 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 260 def remove_column_family(table_name, column_name) end |
- (Object) serialize(value)
Serialize the given value
271 272 273 274 275 276 277 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 271 def serialize(value) case value when NilClass then NULL when String then build_serialized_value(TYPE_STRING, value) else value.to_yaml end end |
- (Boolean) supports_migrations?
:nodoc:
62 63 64 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 62 def supports_migrations? true end |
- (Boolean) table_exists?(table_name)
225 226 227 228 229 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 225 def table_exists?(table_name) log "TABLE EXISTS? #{table_name};" do @connection.list_tables.map(&:name).include?(table_name) end end |
- (Object) truncate_table(table_name)
203 204 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 203 def truncate_table(table_name) end |
- (Object) update(table_name, row, values, timestamp)
99 100 101 102 103 104 105 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 99 def update(table_name, row, values, ) serialized_collection = {} values.each do |column, value| serialized_collection[column] = serialize(value) end update_raw(table_name, row, serialized_collection, ) end |
- (Object) update_raw(table_name, row, values, timestamp)
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 87 def update_raw(table_name, row, values, ) result = nil columns = columns_to_hbase_format(values) ||= Time.now. log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do @connection.create_row(table_name, row, , columns) end result end |