Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable autotrimming #11

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ production:
The options are:

- `connects_to` - set the Active Record database configuration for the Solid Cable models. All options available in Active Record can be used here.
- `polling_interval` - sets the frequency of the polling interval.
- `message_retention` - sets the retention time for messages kept in the database. Used as the cut-off when trimming is performed.
- `polling_interval` - sets the frequency of the polling interval. (Defaults to
0.1.seconds)
- `message_retention` - sets the retention time for messages kept in the database. Used as the cut-off when trimming is performed. (Defaults to 1.day)
- `autotrim` - sets wether you want Solid Cable to handle autotrimming messages. (Defaults to true)
- `silence_polling` - whether to silence Active Record logs emitted when polling (Defaults to true)

## Trimming

Currently, messages are kept around forever, and have to be manually pruned via the `SolidCable::PruneJob.perform_later` job.
This job uses the `message_retention` setting to determine how long messages are to be kept around.
Messages are autotrimmed based upon the `message_retention` setting to determine how long messages are to be kept around. If no `message_retention` is given or parsing fails, it defaults to `1.day`. Messages are trimmed when a subscriber unsubscribes.

Autotrimming can negatively impact performance depending on your workload because it is doing a delete on ubsubscribe. If
you would prefer, you can disable autotrimming by setting `autotrim: false` and you can manually enqueue the job later, `SolidCable::PruneJob.perform_later`, or run it on a recurring interval out of band.

## License

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

module SolidCable
class PruneJob < ActiveJob::Base
class TrimJob < ActiveJob::Base
def perform
::SolidCable::Message.prunable.delete_all
::SolidCable::Message.trimmable.delete_all
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this should work pretty well with SQLite, I have some worries about how this would behave on MySQL or PostgreSQL.

It's deleting an unbounded number of messages so could lock for a fair amount of time. If the database is being replicated, that could also trigger replication lag as those deletes are processed.

Also there could generally be locking issues with concurrent jobs attempting to run the query.

The approach solid_cache takes is to delete small amounts of data but do it often.

  • Every N / 2 writes we trigger an expiration task (a job or just in a thread).
  • The task will try to expire up to N records.

We expire N records, but trigger the expiration after N/2 inserts so we have downward pressure on the cache size when it is too large. But we don't try to clear everything out at once as that could be millions and millions of records.

Solid Cache then has a slightly complicated process for deleting records in a concurrent safe manner, but I think we could maybe just rely on SKIP LOCKED here instead. That means you need MySQL 8.0 at least, but Solid Queue already requires that so I don't think it would be an issue to have Solid Cable do the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put up #15 which should address this. Let me know what you think!

end
end
end
2 changes: 1 addition & 1 deletion app/models/solid_cable/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidCable
class Message < SolidCable::Record
scope :prunable, lambda {
scope :trimmable, lambda {
where(created_at: ..::SolidCable.message_retention.ago)
}
scope :broadcastable, lambda { |channels, last_id|
Expand Down
2 changes: 2 additions & 0 deletions lib/action_cable/subscription_adapter/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def add_channel(channel, on_success)

def remove_channel(channel)
channels.delete(channel)

::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim?
end

def invoke_callback(*)
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def message_retention
parse_duration(cable_config.message_retention, default: 1.day)
end

def autotrim?
cable_config.autotrim != false
end

private

def cable_config
Expand Down
20 changes: 20 additions & 0 deletions test/config_stubs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module ConfigStubs
extend ActiveSupport::Concern

class ConfigStub
def initialize(**)
@config = ActiveSupport::OrderedOptions.new.
update({ adapter: :test }.merge(**))
end

def config_for(_file)
@config
end
end

def with_cable_config(**)
Rails.stub(:application, ConfigStub.new(**)) { yield }
end
end
23 changes: 22 additions & 1 deletion test/lib/action_cable/subscription_adapter/solid_cable_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

require "active_support/core_ext/hash/indifferent_access"
require "pathname"
require "config_stubs"

class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase
include ConfigStubs

WAIT_WHEN_EXPECTING_EVENT = 1
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
Expand All @@ -28,7 +30,8 @@ def setup
end

def cable_config
{ adapter: "solid_cable", polling_interval: "0.01.seconds" }
{ adapter: "solid_cable", message_retention: "1.second",
polling_interval: "0.01.seconds" }
end

def teardown
Expand Down Expand Up @@ -82,6 +85,24 @@ def test_broadcast_after_unsubscribe
assert_empty keep_queue
end

def test_trims_after_unsubscribe
with_cable_config message_retention: "1.second" do
keep_queue = nil
subscribe_as_queue("channel") do |queue|
keep_queue = queue

@tx_adapter.broadcast("channel", "hello world")

assert_equal 1, SolidCable::Message.where(channel: "channel").count
assert_equal "hello world", queue.pop
sleep 2
end
sleep 3

assert_equal 0, SolidCable::Message.where(channel: "channel").count
end
end

def test_multiple_broadcast
subscribe_as_queue("channel") do |queue|
@tx_adapter.broadcast("channel", "bananas")
Expand Down
20 changes: 20 additions & 0 deletions test/solid_cable_test.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
# frozen_string_literal: true

require "test_helper"
require "config_stubs"

class SolidCableTest < ActiveSupport::TestCase
include ConfigStubs

test "it has a version number" do
assert SolidCable::VERSION
end

test "autotrimming when nothing is set" do
assert_not Rails.application.config_for("cable").key?(:autotrim)
assert SolidCable.autotrim?
end

test "autotrimming when set to false" do
with_cable_config autotrim: false do
assert_not SolidCable.autotrim?
end
end

test "autotrimming when set to true" do
with_cable_config autotrim: true do
assert SolidCable.autotrim?
end
end
end
1 change: 1 addition & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"../test/dummy/db/migrate", __dir__
)]
require "rails/test_help"
require "minitest/autorun"

# Load fixtures from the engine
if ActiveSupport::TestCase.respond_to?(:fixture_paths=)
Expand Down