Skip to content
Draft
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
3976b58
Merge pull request #2 from roqua/master
frbl May 26, 2016
a29a6aa
Basic Cassandra framework
Babbie Jun 5, 2016
2789501
Add cassandra username / password to physiqual engine
frbl Jun 6, 2016
8bdb368
Updated dummy initializer to have cassandra config
frbl Jun 6, 2016
eb19fb3
Merge pull request #3 from compsy/fb-add-cassandra-config
Babbie Jun 6, 2016
22421ee
Commit for IDE transfer
Babbie Jun 9, 2016
a87d466
Commit for Frank check
Babbie Jun 9, 2016
a6b82e9
CassandraConnection creation and exporter prep work
Babbie Jun 11, 2016
b88334c
Add comments explaining the properties
frbl Jun 13, 2016
1e12dac
typo
frbl Jun 13, 2016
64f5488
Merge pull request #5 from compsy/fb-add-comments
Babbie Jun 13, 2016
ace8180
remove trailing whitespacing
frbl Jun 13, 2016
e566fd9
Caching first implementation
Babbie Jun 13, 2016
3465384
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 13, 2016
c8dd572
Some refactoring
Babbie Jun 16, 2016
50f0de4
Some refactoring
Babbie Jun 16, 2016
e7e9c82
Some refactoring
Babbie Jun 16, 2016
4e2d81a
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
18f4f49
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
61f7447
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
848ffeb
Fix bugs, pull candidate
Babbie Jun 16, 2016
e99e3a4
Rubocop
Babbie Jun 16, 2016
e913523
Entirety of caching moved to Sidekiq.
Babbie Jul 6, 2016
1249f5b
Rubocop
Babbie Jul 6, 2016
899b204
Rubocop
Babbie Jul 6, 2016
47e7db2
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
e526a64
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
dfabe4b
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
fdd981c
Rubocop
Babbie Jul 6, 2016
c826f51
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
de0f43a
Cleaned up the code a bit
frbl Jul 19, 2016
4dba7d7
Added some specs
frbl Jul 19, 2016
3b4ac05
Removed the time splitting construction, refactored the code to use e…
frbl Jul 19, 2016
20a5165
WIP
frbl Jul 19, 2016
7c21ba3
More refactoring, added some specs
frbl Jul 19, 2016
cfb95f0
Updated specs to expect a user instead of just a token
frbl Jul 19, 2016
e6f9b5c
remove failing specs and rubocop'
frbl Jul 19, 2016
6da8226
Added some more specs
frbl Jul 20, 2016
2febdcd
Remove focus
frbl Jul 20, 2016
3aa34d1
Reduced complexity of some methods
frbl Jul 20, 2016
dee8a79
Fixed some specs
frbl Jul 20, 2016
b131062
Hoped to fix the codeclimate issues
frbl Jul 20, 2016
8c54436
Moved function in class
frbl Jul 20, 2016
7686025
Include information required since caching update
Babbie Aug 28, 2016
2fb84e8
Add redis url config
Babbie Aug 28, 2016
056541f
Add redis url config
Babbie Aug 28, 2016
5019e3a
Add redis url config
Babbie Aug 28, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/models/physiqual/data_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ class DataEntry

DATE_FORMAT = '%Y-%m-%d'.freeze

# The start date property is the start of this dataentry object, meaning it
# denotes the actual start of the data in this measurement
attribute :start_date, DateTime
# The end date property denotes the end of this measurement (exclusing the time).
# Meaning, data in this data entry is upto, but not including this time point
attribute :end_date, DateTime
attribute :values, Array, default: []
# Measurement moment is used in the physiqual process. It is used to determine
# the actual 'measurement time' of the current dataentry object. Often it's
# precisely in the middle of the start and end date of this data entry object.
attribute :measurement_moment, DateTime, default: :default_measurement_moment

def default_measurement_moment
Expand Down
93 changes: 93 additions & 0 deletions app/workers/physiqual/cache_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
require 'sidekiq'

module Physiqual
class CacheWorker
include Sidekiq::Worker
def perform(table, user_id, from, to)
token = User.find_by_user_id(user_id).physiqual_token
return [] unless token.complete?
session = Sessions::TokenAuthorizedSession.new(token)
@data_service = DataServices::DataServiceFactory.fabricate!(token.class.csrf_token, session)
@user_id = user_id
connection = DataServices::CassandraConnection.instance
from = Time.zone.parse(from)
to = Time.zone.parse(to)
store_data(connection, table, from, to)
end

private

def store_data(connection, table, from, to)
entries = DataServices::CassandraDataService.get_data(connection, @user_id, table, from, to)
data_service_function = get_data_function(table)
new_entries = []
if entries.blank?
new_entries = data_service_function.call(from, to)
else
if entries.first.start_date > from
new_entries += data_service_function.call(from, entries.first.start_date)
end
find_gaps(entries) do |from_gap, to_gap|
new_entries += data_service_function.call(from_gap, to_gap)
end
if entries.last.end_date < to
new_entries += data_service_function.call(entries.last.end_date, to)
end
end
cache(connection, table, @user_id, new_entries) if new_entries.present?
rescue Errors::NotSupportedError => e
Rails.logger.warn e.message
end

def get_data_function(table)
case table
when 'heart_rate'
@data_service.method(:heart_rate)
when 'sleep'
@data_service.method(:sleep)
when 'calories'
@data_service.method(:calories)
when 'distance'
@data_service.method(:distance)
when 'steps'
@data_service.method(:steps)
when 'activities'
@data_service.method(:activities)
end
end

def find_gaps(entries)
entries.each_with_index do |entry, i|
break if i == entries.length - 1
if entry.end_date != entries[i + 1].start_date
yield(entry.end_date, entries[i + 1].start_date)
end
end
end

def cache(connection, table, user_id, new_entries)
year = new_entries.first.measurement_moment.strftime('%Y').to_i
times = []
start_dates = []
end_dates = []
values = []
new_entries.each do |entry|
if year != entry.measurement_moment.strftime('%Y').to_i
connection.insert(table, user_id, year, times, start_dates, end_dates, values)
year = entry.measurement_moment.strftime('%Y').to_i
times = []
start_dates = []
end_dates = []
values = []
end
times << entry.measurement_moment
start_dates << entry.start_date
end_dates << entry.end_date
values << entry.values.first
if entry == new_entries.last
connection.insert(table, user_id, year, times, start_dates, end_dates, values)
end
end
end
end
end
3 changes: 3 additions & 0 deletions db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ module Physiqual
User.find_or_create_by(user_id: 'b')
User.find_or_create_by(user_id: 'c')
end
puts 'Initializing Cassandra'
connection = DataServices::CassandraConnection.instance
connection.init_db
end
2 changes: 2 additions & 0 deletions lib/physiqual/data_services.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
require 'physiqual/data_services/mock_service'
require 'physiqual/data_services/summarized_data_service'
require 'physiqual/data_services/bucketeer_data_service'
require 'physiqual/data_services/cassandra_data_service'
require 'physiqual/data_services/cassandra_connection'

module Physiqual
module DataServices
Expand Down
2 changes: 1 addition & 1 deletion lib/physiqual/data_services/cached_data_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def distance(from, to)
private

def from_cache(type)
Rails.logger.warn("#{type} of #{service_name} not from cache.. ") unless @cache.include? type
Rails.logger.warn("#{type} of #{service_name} not duplicate...") unless @cache.include? type
@cache[type] = yield unless @cache.include? type
@cache[type]
end
Expand Down
152 changes: 152 additions & 0 deletions lib/physiqual/data_services/cassandra_connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
require 'singleton'
require 'cassandra'

module Physiqual
module DataServices
class CassandraConnection
include Singleton
SLICE_SIZE = 100

def initialize
@cluster = nil
if !ENV['CASSANDRA_USERNAME'] || !ENV['CASSANDRA_PASSWORD'] || ENV['CASSANDRA_USERNAME'] == ''
@cluster = Cassandra.cluster(
hosts: (ENV['CASSANDRA_HOST_URLS'] || 'physiqual.dev').split(' ')
)
else
@cluster = Cassandra.cluster(
username: ENV['CASSANDRA_USERNAME'],
password: ENV['CASSANDRA_PASSWORD'],
hosts: (ENV['CASSANDRA_HOST_URLS'] || 'physiqual.dev').split(' ')
)
end
@session = @cluster.connect('physiqual')

init_db
init_insert
init_query
end

def insert(table, user_id, year, times, start_dates, end_dates, values)
times_slices, start_dates_slices, end_dates_slices, values_slices =
slice(times, start_dates, end_dates, values)
times_slices.each_with_index do |times_slice, i|
start_dates_slice = start_dates_slices[i]
end_dates_slice = end_dates_slices[i]
values_slice = values_slices[i]
batch = @session.batch do |b|
times_slice.each_with_index do |time, j|
value = BigDecimal(values_slice[j], Float::DIG + 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waarom is die conversie naar bigdecimal nodig?

case table
when 'heart_rate'
insert_type = @insert_heart_rate
when 'sleep'
insert_type = @insert_sleep
when 'calories'
insert_type = @insert_calories
when 'distance'
insert_type = @insert_distance
when 'steps'
insert_type = @insert_steps
when 'activities'
insert_type = @insert_activities
value = values_slice[j]
end
b.add(insert_type, arguments: [user_id, year, time.to_time, start_dates_slice[j].to_time,
end_dates_slice[j].to_time, value])
end
end
@session.execute(batch)
end
end

def slice(times, start_dates, end_dates, values)
return times.each_slice(SLICE_SIZE).to_a,
start_dates.each_slice(SLICE_SIZE).to_a,
end_dates.each_slice(SLICE_SIZE).to_a,
values.each_slice(SLICE_SIZE).to_a
end

def query_heart_rate(user_id, year, from, to)
@session.execute(@query_heart_rate, arguments: [user_id, year, from, to])
end

def query_sleep(user_id, year, from, to)
@session.execute(@query_sleep, arguments: [user_id, year, from, to])
end

def query_calories(user_id, year, from, to)
@session.execute(@query_calories, arguments: [user_id, year, from, to])
end

def query_distance(user_id, year, from, to)
@session.execute(@query_distance, arguments: [user_id, year, from, to])
end

def query_steps(user_id, year, from, to)
@session.execute(@query_steps, arguments: [user_id, year, from, to])
end

def query_activities(user_id, year, from, to)
@session.execute(@query_activities, arguments: [user_id, year, from, to])
end

private

def init_db
create_table('heart_rate', 'decimal')
create_table('sleep', 'decimal')
create_table('calories', 'decimal')
create_table('distance', 'decimal')
create_table('steps', 'decimal')
create_table('activities', 'varchar')
end

def create_table(name, value_type)
@session.execute("
CREATE TABLE IF NOT EXISTS #{name} (
user_id text, year int, time timestamp, start_date timestamp, end_date timestamp, value #{value_type},
PRIMARY KEY ((user_id, year), time)
)
")
end

def init_insert
@insert_heart_rate = prepare_insert('heart_rate')
@insert_sleep = prepare_insert('sleep')
@insert_calories = prepare_insert('calories')
@insert_distance = prepare_insert('distance')
@insert_steps = prepare_insert('steps')
@insert_activities = prepare_insert('activities')
end

def prepare_insert(table_name)
@session.prepare("
INSERT INTO #{table_name} (
user_id, year, time, start_date, end_date, value
) VALUES (
?, ?, ?, ?, ?, ?
)
")
end

def init_query
@query_heart_rate = prepare_query('heart_rate')
@query_sleep = prepare_query('sleep')
@query_calories = prepare_query('calories')
@query_distance = prepare_query('distance')
@query_steps = prepare_query('steps')
@query_activities = prepare_query('activities')
end

def prepare_query(table_name)
@session.prepare("
SELECT time, start_date, end_date, value
FROM #{table_name}
WHERE user_id = ? AND year = ? AND time >= ? AND time <= ?
ORDER BY time ASC
")
end
end
end
end
Loading