Class: BigRecord::ConnectionAdapters::HbaseRestAdapter

Inherits:
AbstractAdapter show all
Defined in:
lib/big_record/connection_adapters/hbase_rest_adapter.rb

Constant Summary

LOST_CONNECTION_ERROR_MESSAGES =
[
  "Server shutdown in progress",
  "Broken pipe",
  "Lost connection to HBase server during query",
  "HBase server has gone away"
]
TYPE_NULL =

data types

0x00
TYPE_STRING =
0x01
TYPE_BOOLEAN =

utf-8 strings

0x04
TYPE_BINARY =

delegate to YAML

0x07
CHARSET =

string charset

"utf-8"
NULL =

utility constants

"\000"
@@emulate_booleans =
true

Instance Method Summary

Methods inherited from AbstractAdapter

#delete_all, #disable_referential_integrity, #prefetch_primary_key?, #quote_table_name, #raw_connection, #requires_reloading?, #reset_runtime, #supports_count_distinct?, #supports_ddl_transactions?, #verify!

Methods included from Quoting

#quote, #quote_column_name, #quote_string, #quoted_date, #quoted_false, #quoted_true

Methods included from DatabaseStatements

#insert_fixture

Constructor Details

- (HbaseRestAdapter) initialize(connection, logger, connection_options, config)

A new instance of HbaseRestAdapter



47
48
49
50
51
52
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 47

def initialize(connection, logger, connection_options, config)
  super(connection, logger)
  @connection_options, @config = connection_options, config

  connect
end

Instance Method Details

- (Boolean) active?

CONNECTION MANAGEMENT ====================================

Returns:

  • (Boolean)


68
69
70
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 68

def active?
  true
end

- (Object) adapter_name

:nodoc:



58
59
60
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 58

def adapter_name 
  'HBase-Rest'
end

- (Object) add_column_family(table_name, column_name, options = {}) Also known as: add_family



255
256
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 255

def add_column_family(table_name, column_name, options = {})
end

- (Object) build_serialized_value(type, value)

Serialize an object in a given type



280
281
282
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 280

def build_serialized_value(type, value)
  type.chr + value
end

- (Object) columns_to_hbase_format(data = {})

DATABASE STATEMENTS ======================================



81
82
83
84
85
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 81

def columns_to_hbase_format(data = {})
  # Convert it to the hbase-ruby format
  # TODO: Add this function to hbase-ruby instead.
  data.map{|col, content| {:name => col.to_s, :value => content}}.compact
end

- (Object) configuration



54
55
56
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 54

def configuration
  @config.clone
end

- (Object) create_table(table_name, options = {}) {|table_definition| ... }

Yields:

  • (table_definition)


231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 231

def create_table(table_name, options = {})
  table_definition = HbaseRestAdapterTable.new

  yield table_definition if block_given?

  if options[:force] && table_exists?(table_name)
    drop_table(table_name)
  end

  result = nil
  log "CREATE TABLE #{table_name} (#{table_definition.column_families_list});" do
    result = @connection.create_table(table_name, *table_definition.to_adapter_format)
  end
  result
end

- (Object) delete(table_name, row, timestamp = nil)



194
195
196
197
198
199
200
201
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 194

def delete(table_name, row, timestamp = nil)
  timestamp ||= Time.now.to_bigrecord_timestamp
  result = nil
  log "DELETE FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.delete_row(table_name, row, timestamp)
  end
  result
end

- (Object) deserialize(str)

Deserialize the given string. This method supports both the pure YAML format and the type header format.



286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 286

def deserialize(str)
  return unless str

  # stay compatible with the old serialization code
  # YAML documents start with "--- " so if we find that sequence at the beginning we
  # consider it as a serialized YAML value, else it's the new format with the type header
  if str[0..3] == "--- "
    YAML::load(str) if str
  else
    deserialize_with_header(str)
  end
end

- (Object) deserialize_with_header(data)

Deserialize the given string assumed to be in the type header format.



300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 300

def deserialize_with_header(data)
  return unless data and data.size >= 2

  # the type of the data is encoded in the first byte
  type = data[0];

  case type
  when TYPE_NULL then nil
  when TYPE_STRING then data[1..-1]
  when TYPE_BINARY then data[1..-1]
  else nil
  end
end

- (Object) disconnect!



75
76
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 75

def disconnect!
end

- (Object) drop_table(table_name)



247
248
249
250
251
252
253
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 247

def drop_table(table_name)
  result = nil
  log "DROP TABLE #{table_name};" do
    result = @connection.destroy_table(table_name)
  end
  result
end

- (Object) get(table_name, row, column, options = {})



118
119
120
121
122
123
124
125
126
127
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 118

def get(table_name, row, column, options={})
  serialized_result = get_raw(table_name, row, column, options)
  result = nil
  if serialized_result.is_a?(Array)
    result = serialized_result.collect{|e| deserialize(e)}
  else
    result = deserialize(serialized_result)
  end
  result
end

- (Object) get_all_schema_versions



219
220
221
222
223
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 219

def get_all_schema_versions
  sm_table = BigRecord::Migrator.schema_migrations_table_name

  get_consecutive_rows(sm_table, nil, nil, ["attribute:version"]).map{|version| version["attribute:version"]}
end

- (Object) get_columns(table_name, row, columns, options = {})



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 143

def get_columns(table_name, row, columns, options={})
  row_cols = get_columns_raw(table_name, row, columns, options)
  return nil unless row_cols

  result = {}
  row_cols.each do |key,value|
    begin
      result[key] =
      if key == 'id'
        value
      else
        deserialize(value)
      end
    rescue Exception => e
      puts "Could not load column value #{key} for row=#{row.name}"
    end
  end
  result
end

- (Object) get_columns_raw(table_name, row, columns = nil, options = {})



129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 129

def get_columns_raw(table_name, row, columns = nil, options={})
  result = {}

  timestamp = options[:timestamp] || nil

  log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do
    row = @connection.show_row(table_name, row, timestamp, columns, options)
    result.merge!({'id' => row.name})
    columns = row.columns
    columns.each{ |col| result.merge!({col.name => col.value}) }
  end
  result
end

- (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 174

def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
  rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row)

  result = rows.collect do |row|
    cols = {}
    cols['id'] = row.name
    row.columns.each do |col|
      key = col.name
      value = col.value
      begin
        cols[key] = deserialize(value)
      rescue Exception => e
        puts "Could not load column value #{key} for row=#{row.name}"
      end
    end
    cols
  end
  result
end

- (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)



163
164
165
166
167
168
169
170
171
172
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 163

def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
  result = nil
  log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do
    options = {:start_row => start_row, :end_row => stop_row, :columns => columns, :batch => 200}
    scanner = @connection.open_scanner(table_name, options)
    result = @connection.get_rows(scanner, limit)
    @connection.close_scanner(scanner)
  end
  result
end

- (Object) get_raw(table_name, row, column, options = {})



107
108
109
110
111
112
113
114
115
116
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 107

def get_raw(table_name, row, column, options={})
  result = nil
  timestamp = options[:timestamp] || nil
  log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do
    columns = @connection.show_row(table_name, row, timestamp, column, options).columns

    result = (columns.size == 1) ? columns.first.value : columns.map(&:value)
  end
  result
end

- (Object) initialize_schema_migrations_table

SCHEMA STATEMENTS ========================================



209
210
211
212
213
214
215
216
217
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 209

def initialize_schema_migrations_table
  sm_table = BigRecord::Migrator.schema_migrations_table_name

  unless table_exists?(sm_table)
    create_table(sm_table) do |t|
      t.family :attribute, :versions => 1
    end
  end
end

- (Object) modify_column_family(table_name, column_name, options = {}) Also known as: modify_family



265
266
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 265

def modify_column_family(table_name, column_name, options = {})
end

- (Object) reconnect!



72
73
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 72

def reconnect!
end

- (Object) remove_column_family(table_name, column_name) Also known as: remove_family



260
261
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 260

def remove_column_family(table_name, column_name)
end

- (Object) serialize(value)

Serialize the given value



271
272
273
274
275
276
277
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 271

def serialize(value)
  case value
  when NilClass then NULL
  when String then build_serialized_value(TYPE_STRING, value)
  else value.to_yaml
  end
end

- (Boolean) supports_migrations?

:nodoc:

Returns:

  • (Boolean)


62
63
64
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 62

def supports_migrations? 
  true
end

- (Boolean) table_exists?(table_name)

Returns:

  • (Boolean)


225
226
227
228
229
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 225

def table_exists?(table_name)
  log "TABLE EXISTS? #{table_name};" do
    @connection.list_tables.map(&:name).include?(table_name)
  end
end

- (Object) truncate_table(table_name)



203
204
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 203

def truncate_table(table_name)
end

- (Object) update(table_name, row, values, timestamp)



99
100
101
102
103
104
105
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 99

def update(table_name, row, values, timestamp)
  serialized_collection = {}
  values.each do |column, value|
    serialized_collection[column] = serialize(value)
  end
  update_raw(table_name, row, serialized_collection, timestamp)
end

- (Object) update_raw(table_name, row, values, timestamp)



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/big_record/connection_adapters/hbase_rest_adapter.rb', line 87

def update_raw(table_name, row, values, timestamp)
  result = nil

  columns = columns_to_hbase_format(values)
  timestamp ||= Time.now.to_bigrecord_timestamp

  log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do
    @connection.create_row(table_name, row, timestamp, columns)
  end
  result
end