Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
3976b58
Merge pull request #2 from roqua/master
frbl May 26, 2016
a29a6aa
Basic Cassandra framework
Babbie Jun 5, 2016
2789501
Add cassandra username / password to physiqual engine
frbl Jun 6, 2016
8bdb368
Updated dummy initializer to have cassandra config
frbl Jun 6, 2016
eb19fb3
Merge pull request #3 from compsy/fb-add-cassandra-config
Babbie Jun 6, 2016
22421ee
Commit for IDE transfer
Babbie Jun 9, 2016
a87d466
Commit for Frank check
Babbie Jun 9, 2016
a6b82e9
CassandraConnection creation and exporter prep work
Babbie Jun 11, 2016
b88334c
Add comments explaining the properties
frbl Jun 13, 2016
1e12dac
typo
frbl Jun 13, 2016
64f5488
Merge pull request #5 from compsy/fb-add-comments
Babbie Jun 13, 2016
ace8180
remove trailing whitespacing
frbl Jun 13, 2016
e566fd9
Caching first implementation
Babbie Jun 13, 2016
3465384
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 13, 2016
c8dd572
Some refactoring
Babbie Jun 16, 2016
50f0de4
Some refactoring
Babbie Jun 16, 2016
e7e9c82
Some refactoring
Babbie Jun 16, 2016
4e2d81a
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
18f4f49
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
61f7447
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jun 16, 2016
848ffeb
Fix bugs, pull candidate
Babbie Jun 16, 2016
e99e3a4
Rubocop
Babbie Jun 16, 2016
e913523
Entirety of caching moved to Sidekiq.
Babbie Jul 6, 2016
1249f5b
Rubocop
Babbie Jul 6, 2016
899b204
Rubocop
Babbie Jul 6, 2016
47e7db2
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
e526a64
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
dfabe4b
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
fdd981c
Rubocop
Babbie Jul 6, 2016
c826f51
Merge remote-tracking branch 'origin/mbsw-add-caching' into mbsw-add-…
Babbie Jul 6, 2016
de0f43a
Cleaned up the code a bit
frbl Jul 19, 2016
4dba7d7
Added some specs
frbl Jul 19, 2016
3b4ac05
Removed the time splitting construction, refactored the code to use e…
frbl Jul 19, 2016
20a5165
WIP
frbl Jul 19, 2016
7c21ba3
More refactoring, added some specs
frbl Jul 19, 2016
cfb95f0
Updated specs to expect a user instead of just a token
frbl Jul 19, 2016
e6f9b5c
remove failing specs and rubocop'
frbl Jul 19, 2016
6da8226
Added some more specs
frbl Jul 20, 2016
2febdcd
Remove focus
frbl Jul 20, 2016
3aa34d1
Reduced complexity of some methods
frbl Jul 20, 2016
dee8a79
Fixed some specs
frbl Jul 20, 2016
b131062
Hoped to fix the codeclimate issues
frbl Jul 20, 2016
8c54436
Moved function in class
frbl Jul 20, 2016
7686025
Include information required since caching update
Babbie Aug 28, 2016
2fb84e8
Add redis url config
Babbie Aug 28, 2016
056541f
Add redis url config
Babbie Aug 28, 2016
5019e3a
Add redis url config
Babbie Aug 28, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ MethodLength:
# Limit modules to have a length of 200
Metrics/ModuleLength:
Max: 200
Exclude:
- 'spec/**/*'


# Limit classes to have a length of 200
Metrics/ClassLength:
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
Ruby Engine for merging multiple data sources with diary questionnaire data

[![Code Climate](https://codeclimate.com/github/roqua/physiqual/badges/gpa.svg)](https://codeclimate.com/github/roqua/physiqual) [![Test Coverage](https://codeclimate.com/github/roqua/physiqual/badges/coverage.svg)](https://codeclimate.com/github/roqua/physiqual/coverage) [![Dependency Status](https://gemnasium.com/roqua/physiqual.svg)](https://gemnasium.com/roqua/physiqual) [![Circle CI](https://circleci.com/gh/roqua/physiqual/tree/master.svg?style=svg)](https://circleci.com/gh/roqua/physiqual/tree/master)
## Requirements
Physiqual requires the following additional software for caching:
* A Redis database
* A Cassandra database

Make sure the Cassandra database has a keyspace set up for use in Physiqual.

## Installation
Add Physiqual to your Gemfile. Currently Physiqual is not yet on RubyGems, this will happen after Physiqual is in a more stable beta state.
Expand Down Expand Up @@ -42,6 +48,15 @@ Physiqual.configure do |config|
config.host_url = ENV['HOST_URL'] || 'physiqual.dev'
config.host_protocol = ENV['HOST_PROTOCOL'] || 'http'

# Cassandra settings
config.cassandra_username = ENV['CASSANDRA_USERNAME'] || ''
config.cassandra_password = ENV['CASSANDRA_PASSWORD'] || ''
config.cassandra_host_urls = (ENV['CASSANDRA_HOST_URLS'] || 'physiqual.dev').split(' ')
config.cassandra_keyspace = ENV['CASSANDRA_KEYSPACE']

# Redis settings
config.redis_url = ENV['REDIS_URL']

# EMA Settings
config.measurements_per_day = 3 # Number of measurements per day, from the end of day downwards
config.interval = 6 # Number of hours between measurements
Expand All @@ -59,6 +74,12 @@ Physiqual.configure do |config|
end
```

On the machine(s) that will handle caching, install Physiqual as well. Then run the following:
```bash
bundle exec sidekiq
```
This will allow Physiqual to cache data asynchronously to your Cassandra database.

Now you should be able to start your server.

## Dummy
Expand Down
7 changes: 7 additions & 0 deletions app/models/physiqual/data_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ class DataEntry

DATE_FORMAT = '%Y-%m-%d'.freeze

# The start date property is the start of this dataentry object, meaning it
# denotes the actual start of the data in this measurement
attribute :start_date, DateTime
# The end date property denotes the end of this measurement (exclusing the time).
# Meaning, data in this data entry is upto, but not including this time point
attribute :end_date, DateTime
attribute :values, Array, default: []
# Measurement moment is used in the physiqual process. It is used to determine
# the actual 'measurement time' of the current dataentry object. Often it's
# precisely in the middle of the start and end date of this data entry object.
attribute :measurement_moment, DateTime, default: :default_measurement_moment

def default_measurement_moment
Expand Down
5 changes: 5 additions & 0 deletions cassandra.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
line="CREATE KEYSPACE physiqual WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};"
echo $line | pbcopy

docker run -it --rm --name cassmgmt --link cassandra1:cassmgmt cassandra cqlsh cassandra1
3 changes: 3 additions & 0 deletions db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ module Physiqual
User.find_or_create_by(user_id: 'b')
User.find_or_create_by(user_id: 'c')
end
puts 'Initializing Cassandra'
connection = DataServices::CassandraConnection.instance
connection.init_db
end
2 changes: 2 additions & 0 deletions lib/physiqual.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require 'physiqual/sessions'
require 'physiqual/users'
require 'physiqual/version'
require 'physiqual/cassandra_connection'
require 'physiqual/workers'

module Physiqual
end
126 changes: 126 additions & 0 deletions lib/physiqual/cassandra_connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
require 'singleton'
require 'cassandra'

module Physiqual
class CassandraConnection
include Singleton
SLICE_SIZE = 100

def initialize
# Setup the connection to the cluster
cluster = initialize_cassandra_cluster
@session = cluster.connect(Physiqual.cassandra_keyspace)

variables = { 'heart_rate' => 'decimal',
'sleep' => 'decimal',
'calories' => 'decimal',
'distance' => 'decimal',
'steps' => 'decimal',
'activities' => 'varchar' }

initialize_database(variables)
end

def insert(variable, user_id, year, entries)
# Slice the dates in chuncs of SLICE_SIZE
entries = entries.each_slice(SLICE_SIZE).to_a

entries.each_with_index do |entry_slice|
current_batch = create_batches(entry_slice, variable, user_id, year)
@session.execute(current_batch)
end
end

def create_batches(entry_slice, variable, user_id, year)
@session.batch do |batch|
entry_slice.each do |entry|
value = entry.values.first
# If the table is 'activities', we should not convert the slice to a bigdecimal
value = BigDecimal(value, Float::DIG + 1) if variable != 'activities'

# Retrieve the prepared statement
insert_type = @insert_queries[variable]
batch.add(insert_type,
arguments: [user_id, year,
entry.measurement_moment.to_time,
entry.start_date.to_time,
entry.end_date.to_time, value])
end
end
end

def query_heart_rate(user_id, year, from, to)
@session.execute(@queries['heart_rate'], arguments: [user_id, year, from, to])
end

def query_sleep(user_id, year, from, to)
@session.execute(@queries['sleep'], arguments: [user_id, year, from, to])
end

def query_calories(user_id, year, from, to)
@session.execute(@queries['calories'], arguments: [user_id, year, from, to])
end

def query_distance(user_id, year, from, to)
@session.execute(@queries['distance'], arguments: [user_id, year, from, to])
end

def query_steps(user_id, year, from, to)
@session.execute(@queries['steps'], arguments: [user_id, year, from, to])
end

def query_activities(user_id, year, from, to)
@session.execute(@queries['activities'], arguments: [user_id, year, from, to])
end

private

def initialize_cassandra_cluster
if Physiqual.cassandra_username.blank? || Physiqual.cassandra_password.blank?
return Cassandra.cluster(hosts: Physiqual.cassandra_urls)
end

Cassandra.cluster(
username: Physiqual.cassandra_username,
password: Physiqual.cassandra_password,
hosts: Physiqual.cassandra_urls
)
end

def initialize_database(variable_names)
@insert_queries = {}
@queries = {}
variable_names.each do |variable, type|
create_table(variable, type)
@insert_queries[variable] = prepare_insert(variable)
@queries[variable] = prepare_query(variable)
end
end

def create_keyspace(_keypsace_name)
query = "CREATE KEYSPACE #{keyspace_name}
WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};"
@session.execute(query)
end

def prepare_insert(table_name)
query = "INSERT INTO #{table_name} (user_id, year, time, start_date, end_date, value) VALUES (?, ?, ?, ?, ?, ?)"
@session.prepare(query)
end

def prepare_query(table_name)
query = 'SELECT time, start_date, end_date, value'\
"FROM #{table_name}"\
'WHERE user_id = ? AND year = ? AND time >= ? AND time <= ?'\
'ORDER BY time ASC'
@session.prepare(query)
end

def create_table(name, value_type)
query = "CREATE TABLE IF NOT EXISTS #{name} ("\
"user_id text, year int, time timestamp, start_date timestamp, end_date timestamp, value #{value_type},"\
'PRIMARY KEY ((user_id, year), time))'
@session.execute(query)
end
end
end
1 change: 1 addition & 0 deletions lib/physiqual/data_services.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'physiqual/data_services/mock_service'
require 'physiqual/data_services/summarized_data_service'
require 'physiqual/data_services/bucketeer_data_service'
require 'physiqual/data_services/cassandra_data_service'

module Physiqual
module DataServices
Expand Down
2 changes: 1 addition & 1 deletion lib/physiqual/data_services/bucketeer_data_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def create_buckets(buckets, data)
next unless entry.measurement_moment > buckets[current_bucket].start_date

values = entry.values
buckets[current_bucket].values.push(*values)
buckets[current_bucket].values.concat(values)
end
buckets
end
Expand Down
2 changes: 1 addition & 1 deletion lib/physiqual/data_services/cached_data_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def distance(from, to)
private

def from_cache(type)
Rails.logger.warn("#{type} of #{service_name} not from cache.. ") unless @cache.include? type
Rails.logger.warn("#{type} of #{service_name} not duplicate...") unless @cache.include? type
@cache[type] = yield unless @cache.include? type
@cache[type]
end
Expand Down
105 changes: 105 additions & 0 deletions lib/physiqual/data_services/cassandra_data_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
require 'sidekiq-status'

module Physiqual
module DataServices
class CassandraDataService < DataServiceDecorator
def initialize(data_service, user_id, cassandra_connection)
super(data_service)
@user_id = user_id
@cassandra_connection ||= cassandra_connection
end

def service_name
"cassandra_#{data_service.service_name}"
end

def heart_rate(from, to)
cache_data('heart_rate', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'heart_rate', from, to)
end

def sleep(from, to)
cache_data('sleep', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'sleep', from, to)
end

def calories(from, to)
cache_data('calories', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'calories', from, to)
end

def distance(from, to)
cache_data('distance', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'distance', from, to)
end

def steps(from, to)
cache_data('steps', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'steps', from, to)
end

def activities(from, to)
cache_data('activities', @user_id, from, to)
get_data(@cassandra_connection, @user_id, 'activities', from, to)
end

def get_data(connection, user_id, variable, from, to)
entries = []
years(from, to) do |year, from_per_year, to_per_year|
case variable
when 'heart_rate'
query_result = connection.query_heart_rate(user_id, year, from_per_year, to_per_year)
when 'sleep'
query_result = connection.query_sleep(user_id, year, from_per_year, to_per_year)
when 'calories'
query_result = connection.query_calories(user_id, year, from_per_year, to_per_year)
when 'distance'
query_result = connection.query_distance(user_id, year, from_per_year, to_per_year)
when 'steps'
query_result = connection.query_steps(user_id, year, from_per_year, to_per_year)
when 'activities'
query_result = connection.query_activities(user_id, year, from_per_year, to_per_year)
end

entries += make_data_entries(variable, query_result)
end
entries
end

private

def cache_data(variable, user_id, from, to)
job = Physiqual::Workers::CacheWorker.perform_async(data_service, self, variable, user_id, from, to)
# job = Physiqual::Workers::CacheWorker.new.perform(data_service, self, variable, user_id, from, to)
while Sidekiq::Status.queued? job or Sidekiq::Status.working? job
Rails.logger.info('Sleeping!')
Kernel.sleep(1)
end
end

def years(from, to)
from_year = from.strftime('%Y').to_i
to_year = to.strftime('%Y').to_i
(from_year..to_year).each do |year|
from_per_year = from_year < year ? Time.zone.local(year, 1, 1, 0, 0, 0) : from
to_per_year = to_year > year ? Time.zone.local(year, 12, 31, 23, 59, 59) : to
yield(year, from_per_year, to_per_year)
end
end

def make_data_entries(table, results)
return [] if results.blank?
entries = []
results.each do |result|
# TODO: Should this be to_i or to_f
value = table == 'activities' ? result['value'] : result['value'].to_i
entries << DataEntry.new(start_date: result['start_date'].in_time_zone,
end_date: result['end_date'].in_time_zone,
values: [value],
measurement_moment: result['time'].in_time_zone)
end
entries
end
end
end
end
17 changes: 17 additions & 0 deletions lib/physiqual/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ class << self
mattr_accessor :google_client_secret
mattr_accessor :fitbit_client_id
mattr_accessor :fitbit_client_secret

# Cassandra settings
mattr_accessor :enable_cassandra
mattr_accessor :cassandra_username
mattr_accessor :cassandra_password
mattr_accessor :cassandra_host_urls
mattr_accessor :cassandra_keyspace

# Redis settings
mattr_accessor :redis_url

mattr_accessor :cassandra_keyspace
mattr_accessor :host_url
mattr_accessor :host_protocol
mattr_accessor :measurements_per_day
Expand All @@ -17,6 +29,11 @@ class << self
mattr_accessor :imputers
end

def self.cassandra_urls
urls = cassandra_host_urls
urls.split(' ') unless urls.blank?
end

def self.configure(&_block)
yield self
end
Expand Down
Loading