Class: BigRecord::ConnectionAdapters::CassandraAdapter

Inherits:
AbstractAdapter show all
Defined in:
lib/big_record/connection_adapters/cassandra_adapter.rb

Constant Summary

LOST_CONNECTION_ERROR_MESSAGES =
[
  "Server shutdown in progress",
  "Broken pipe",
  "Lost connection to HBase server during query",
  "HBase server has gone away"
]
TYPE_NULL =

data types

0x00
TYPE_STRING =
0x01
TYPE_BOOLEAN =

utf-8 strings

0x04
TYPE_BINARY =

delegate to YAML

0x07
CHARSET =

string charset

"utf-8"
NULL =

utility constants

"\000"
@@emulate_booleans =
true

Instance Method Summary

Methods inherited from AbstractAdapter

#active?, #create_table, #disable_referential_integrity, #drop_table, #prefetch_primary_key?, #quote_table_name, #raw_connection, #reconnect!, #requires_reloading?, #reset_runtime, #supports_count_distinct?, #supports_ddl_transactions?, #table_exists?, #verify!

Methods included from Quoting

#quote, #quote_column_name, #quote_string, #quoted_date, #quoted_false, #quoted_true

Methods included from DatabaseStatements

#insert_fixture

Constructor Details

- (CassandraAdapter) initialize(connection, logger, connection_options, config)

A new instance of CassandraAdapter



42
43
44
45
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 42

def initialize(connection, logger, connection_options, config)
  super(connection, logger)
  @connection_options, @config = connection_options, config
end

Instance Method Details

- (Object) adapter_name

:nodoc:



51
52
53
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 51

def adapter_name 
  'Cassandra'
end

- (Object) build_serialized_value(type, value)

Serialize an object in a given type



227
228
229
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 227

def build_serialized_value(type, value)
  type.chr + value
end

- (Object) configuration



47
48
49
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 47

def configuration
  @config.clone
end

- (Object) delete(table_name, row)



203
204
205
206
207
208
209
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 203

def delete(table_name, row)
  result = nil
  log "DELETE FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.remove(table_name.to_s, row, {:consistency => Cassandra::Consistency::QUORUM})
  end
  result
end

- (Object) delete_all(table_name)

Raises:

  • (NotImplementedError)


211
212
213
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 211

def delete_all(table_name)
  raise NotImplementedError
end

- (Object) deserialize(str)

Deserialize the given string. This method supports both the pure YAML format and the type header format.



233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 233

def deserialize(str)
  return unless str

  # stay compatible with the old serialization code
  # YAML documents start with "--- " so if we find that sequence at the beginning we
  # consider it as a serialized YAML value, else it's the new format with the type header
  if str[0..3] == "--- "
    YAML::load(str) if str
  else
    deserialize_with_header(str)
  end
end

- (Object) deserialize_with_header(data)

Deserialize the given string assumed to be in the type header format.



247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 247

def deserialize_with_header(data)
  return unless data and data.size >= 2

  # the type of the data is encoded in the first byte
  type = data[0];

  case type
  when TYPE_NULL then nil
  when TYPE_STRING then data[1..-1]
  when TYPE_BINARY then data[1..-1]
  else data
  end
end

- (Object) disconnect!

CONNECTION MANAGEMENT ====================================



61
62
63
64
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 61

def disconnect!
  @connection.disconnect!
  super
end

- (Object) get(table_name, row, column, options = {})



92
93
94
95
96
97
98
99
100
101
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 92

def get(table_name, row, column, options={})
  serialized_result = get_raw(table_name, row, column, options)
  result = nil
  if serialized_result.is_a?(Array)
    result = serialized_result.collect{|e| deserialize(e)}
  else
    result = deserialize(serialized_result)
  end
  result
end

- (Object) get_columns(table_name, row, columns, options = {})



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 138

def get_columns(table_name, row, columns, options={})
  row_cols = get_columns_raw(table_name, row, columns, options)
  return nil unless row_cols && !row_cols.empty?

  result = {}
  row_cols.each do |key,value|
    begin
      result[key] =
        if key == 'id'
          value
        else
          deserialize(value)
        end
    rescue Exception => e
      puts "Could not load column value #{key} for row=#{row.name}"
    end
  end
  result
end

- (Object) get_columns_raw(table_name, row, columns, options = {})



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 103

def get_columns_raw(table_name, row, columns, options={})
  result = {}

  log "SELECT (#{columns.join(", ")}) FROM #{table_name} WHERE ROW=#{row};" do
    prefix_mode = false
    prefixes = []

    columns.each do |name|
      prefix, name = name.split(":")
      prefixes << prefix+":" unless prefixes.include?(prefix+":")
      prefix_mode = name.blank?
    end

    if prefix_mode
      prefixes.sort!
      values = @connection.get(table_name, row, {:start => prefixes.first, :finish => prefixes.last + "~"})

      result["id"] = row if values && values.size > 0

      values.each do |key,value|
        result[key] = value unless value.blank?
      end
    else
      values = @connection.get_columns(table_name, row, columns)

      result["id"] = row if values && values.compact.size > 0

      columns.each_index do |id|
        result[columns[id].to_s] = values[id] unless values[id].blank?
      end
    end
  end
  result
end

- (Object) get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 186

def get_consecutive_rows(table_name, start_row, limit, columns, stop_row = nil)
  rows = get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row)

  result = rows.collect do |row|
    cols = {}
    row.each do |key,value|
      begin
        cols[key] = (key == "id") ? value : deserialize(value)
      rescue Exception => e
        puts "Could not load column value #{key} for row=#{row.name}"
      end
    end
    cols
  end
  result
end

- (Object) get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 158

def get_consecutive_rows_raw(table_name, start_row, limit, columns, stop_row = nil)
  result = []
  log "SCAN (#{columns.join(", ")}) FROM #{table_name} WHERE START_ROW=#{start_row} AND STOP_ROW=#{stop_row} LIMIT=#{limit};" do
    options = {}
    options[:start] = start_row unless start_row.blank?
    options[:finish] = stop_row unless stop_row.blank?
    options[:count] = limit unless limit.blank?

    keys = @connection.get_range(table_name, options)

    # This will be refactored. Don't make fun of me yet.
    if !keys.empty?
      keys.each do |key|
        row = {}
        row["id"] = key.key

        key.columns.each do |col|
          column = col.column
          row[column.name] = column.value
        end

        result << row if row.keys.size > 1
      end
    end
  end
  result
end

- (Object) get_raw(table_name, row, column, options = {})



84
85
86
87
88
89
90
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 84

def get_raw(table_name, row, column, options={})
  result = nil
  log "SELECT (#{column}) FROM #{table_name} WHERE ROW=#{row};" do
    result = @connection.get(table_name, row, column)
  end
  result
end

- (Object) serialize(value)

Serialize the given value



218
219
220
221
222
223
224
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 218

def serialize(value)
  case value
  when NilClass then NULL
  when String then build_serialized_value(TYPE_STRING, value)
  else value.to_yaml
  end
end

- (Boolean) supports_migrations?

:nodoc:

Returns:

  • (Boolean)


55
56
57
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 55

def supports_migrations? 
  false
end

- (Object) update(table_name, row, values, timestamp)



76
77
78
79
80
81
82
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 76

def update(table_name, row, values, timestamp)
  serialized_collection = {}
  values.each do |column, value|
    serialized_collection[column] = serialize(value)
  end
  update_raw(table_name, row, serialized_collection, timestamp)
end

- (Object) update_raw(table_name, row, values, timestamp)

DATABASE STATEMENTS ======================================



68
69
70
71
72
73
74
# File 'lib/big_record/connection_adapters/cassandra_adapter.rb', line 68

def update_raw(table_name, row, values, timestamp)
  result = nil
  log "UPDATE #{table_name} SET #{values.inspect if values} WHERE ROW=#{row};" do
    result = @connection.insert(table_name, row, values, {:consistency => Cassandra::Consistency::QUORUM})
  end
  result
end