From 4405d494e458b4dc6085e2cf9518579531fc7066 Mon Sep 17 00:00:00 2001 From: Brian Durand Date: Tue, 7 Dec 2010 16:11:26 -0600 Subject: [PATCH] Add new ActiveSupport::WeakReference implementation that performs consistently across all runtimes. This is then usd to reduce memory bloat in large transactions by only holding onto strong references to objects that implement the transaction callbacks. --- .../abstract/database_statements.rb | 23 ++- .../test/cases/transaction_callbacks_test.rb | 69 ++++++ activesupport/lib/active_support.rb | 2 + activesupport/lib/active_support/weak_hash.rb | 118 ++++++++++ activesupport/lib/active_support/weak_reference.rb | 239 ++++++++++++++++++++ activesupport/test/weak_hash_test.rb | 102 +++++++++ activesupport/test/weak_reference_test.rb | 55 +++++ 7 files changed, 605 insertions(+), 3 deletions(-) create mode 100644 activesupport/lib/active_support/weak_hash.rb create mode 100644 activesupport/lib/active_support/weak_reference.rb create mode 100644 activesupport/test/weak_hash_test.rb create mode 100644 activesupport/test/weak_reference_test.rb diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index ee9a0af..f697cc2 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -207,6 +207,9 @@ module ActiveRecord # Register a record with the current transaction so that its after_commit and after_rollback callbacks # can be called. def add_transaction_record(record) + # Use a weak reference unless transaction based callbacks are defined to prevent large transactions + # from balooning the heap. + record = ActiveSupport::WeakReference.new(record) if record._commit_callbacks.empty? && record._rollback_callbacks.empty? last_batch = @_current_transaction_records.last last_batch << record if last_batch end @@ -313,10 +316,10 @@ module ActiveRecord # is false, only rollback records since the last save point. def rollback_transaction_records(rollback) #:nodoc if rollback - records = @_current_transaction_records.flatten + records = _dereference_transaction_records(@_current_transaction_records) @_current_transaction_records.clear else - records = @_current_transaction_records.pop + records = _dereference_transaction_records(@_current_transaction_records.pop) end unless records.blank? @@ -332,7 +335,7 @@ module ActiveRecord # Send a commit message to all records after they have been committed. def commit_transaction_records #:nodoc - records = @_current_transaction_records.flatten + records = _dereference_transaction_records(@_current_transaction_records) @_current_transaction_records.clear unless records.blank? records.uniq.each do |record| @@ -344,6 +347,20 @@ module ActiveRecord end end end + + # Flatten and dereference weak references to transaction records. Any garbage collected + # weak references will be removed. + def _dereference_transaction_records(records) #:nodoc + records.collect do |ref| + if ref.is_a?(ActiveSupport::WeakReference) + ref.object + elsif ref.is_a?(Array) + _dereference_transaction_records(ref) + else + ref + end + end.flatten.compact + end end end end diff --git a/activerecord/test/cases/transaction_callbacks_test.rb b/activerecord/test/cases/transaction_callbacks_test.rb index 85f222b..941b34f 100644 --- a/activerecord/test/cases/transaction_callbacks_test.rb +++ b/activerecord/test/cases/transaction_callbacks_test.rb @@ -244,6 +244,75 @@ class TransactionCallbacksTest < ActiveRecord::TestCase assert_equal :rollback, @first.last_after_transaction_error assert_equal [:after_rollback], @second.history end + + def test_transaction_records_only_held_if_referenced_or_callbacks_defined_on_rollback + save_implementation = ActiveSupport::WeakReference.implementation + ActiveSupport::WeakReference.implementation = ActiveSupport::WeakReference::TestImpl + begin + topic_1 = TopicWithCallbacks.new + topic_2 = Topic.new + topic_3 = TopicWithCallbacks.new + topic_4 = Topic.new + Topic.transaction do + [topic_1, topic_2, topic_3, topic_4].each_with_index do |topic, index| + topic.id = index + 1 + def topic.rolledback!(*args) + @rolledback_flag = true + end + def topic.rolledback? + @rolledback_flag if instance_variable_defined?(:@rolledback_flag) + end + topic.add_to_transaction + end + # Fake garbage collection to release weak references to topic_3 and topic_4 + ActiveSupport::WeakReference::TestImpl.gc(topic_3, topic_4) + raise ActiveRecord::Rollback + end + # Expected behavior is that the object without a hard reference and that doesn't implement + # after transaction callbacks will not be retained by the garbage collector and won't get + # the rolledback! message. + assert topic_1.rolledback? + assert topic_2.rolledback? + assert topic_3.rolledback? + assert !topic_4.rolledback? + ensure + ActiveSupport::WeakReference.implementation = save_implementation + end + end + + def test_transaction_records_only_held_if_referenced_or_callbacks_defined_on_commit + save_implementation = ActiveSupport::WeakReference.implementation + ActiveSupport::WeakReference.implementation = ActiveSupport::WeakReference::TestImpl + begin + topic_1 = TopicWithCallbacks.new + topic_2 = Topic.new + topic_3 = TopicWithCallbacks.new + topic_4 = Topic.new + Topic.transaction do + [topic_1, topic_2, topic_3, topic_4].each_with_index do |topic, index| + topic.id = index + 1 + def topic.committed!(*args) + @commit_flag = true + end + def topic.committed? + @commit_flag if instance_variable_defined?(:@commit_flag) + end + topic.add_to_transaction + end + # Fake garbage collection to release weak references to topic_3 and topic_4 + ActiveSupport::WeakReference::TestImpl.gc(topic_3, topic_4) + end + # Expected behavior is that the object without a hard reference and that doesn't implement + # after transaction callbacks will not be retained by the garbage collector and won't get + # the rolledback! message. + assert topic_1.committed? + assert topic_2.committed? + assert topic_3.committed? + assert !topic_4.committed? + ensure + ActiveSupport::WeakReference.implementation = save_implementation + end + end end diff --git a/activesupport/lib/active_support.rb b/activesupport/lib/active_support.rb index 6b87774..f30f8ad 100644 --- a/activesupport/lib/active_support.rb +++ b/activesupport/lib/active_support.rb @@ -75,6 +75,8 @@ module ActiveSupport autoload :SafeBuffer, "active_support/core_ext/string/output_safety" autoload :TestCase + autoload :WeakHash + autoload :WeakReference end autoload :I18n, "active_support/i18n" diff --git a/activesupport/lib/active_support/weak_hash.rb b/activesupport/lib/active_support/weak_hash.rb new file mode 100644 index 0000000..58f12a4 --- /dev/null +++ b/activesupport/lib/active_support/weak_hash.rb @@ -0,0 +1,118 @@ +require 'thread' + +module ActiveSupport + # Implementation of a map in which only weak references are kept to the map values. + # This allows the garbage collector to reclaim these objects if the + # only reference to them is in the weak hash map. This is often useful for cache + # implementations since the map can be allowed to grow without bound and the + # garbage collector can be relied on to clean it up as necessary. One must be careful, + # though, when accessing values since they can be collected at any time until their + # is a strong reference to them. + # + # Example usage: + # + # cache = WeakHash.new + # foo = "foo" + # cache["strong"] = foo # add a value with a strong reference + # cache["weak"] = "bar" # add a value without a strong reference + # cache["strong"] # "foo" + # cache["weak"] # "bar" + # GC.start + # cache["strong"] # "foo" + # cache["weak"] # nil + class WeakHash + # Create a new WeakHash. Values added to the hash will be cleaned up by the garbage + # collector if there are no other reference except in the WeakHash. + def initialize + @weak_references = {} + @weak_references_to_keys_map = {} + @weak_reference_cleanup = lambda{|object_id| remove_weak_reference_to(object_id)} + @mutex = Mutex.new + end + + # Get a value from the map by key. If the value has been reclaimed by the garbage + # collector, this will return nil. + def [](key) + ref = @weak_references[key] + value = ref.object if ref + value + end + + # Add a key/value to the map. + def []=(key, value) + ObjectSpace.define_finalizer(value, @weak_reference_cleanup) + key = key.dup if key.is_a?(String) + @mutex.synchronize do + @weak_references[key] = WeakReference.new(value) + keys_for_id = @weak_references_to_keys_map[value.__id__] + unless keys_for_id + keys_for_id = [] + @weak_references_to_keys_map[value.__id__] = keys_for_id + end + keys_for_id << key + end + value + end + + # Remove the value associated the the key from the map. + def delete(key) + ref = @weak_references.delete(key) + if ref + keys_to_id = @weak_references_to_keys_map[ref.referenced_object_id] + if keys_to_id + keys_to_id.delete(key) + @weak_references_to_keys_map.delete(ref.referenced_object_id) if keys_to_id.empty? + end + ref.object + else + nil + end + end + + # Iterate through all the key/value pairs in the map that have not been reclaimed + # by the garbage collector. + def each + @weak_references.each do |key, ref| + value = ref.object + yield(key, value) if value + end + end + + # Clear the map of all key/value pairs. + def clear + @mutex.synchronize do + @weak_references.clear + @weak_references_to_keys_map.clear + end + end + + # Merge the values from another hash into this map. + def merge!(other_hash) + other_hash.each do |key, value| + self[key] = value + end + end + + def inspect + live_entries = {} + each do |key, value| + live_entries[key] = value + end + live_entries.inspect + end + + private + + def remove_weak_reference_to(object_id) + @mutex.synchronize do + keys = @weak_references_to_keys_map[object_id] + if keys + keys.each do |key| + @weak_references.delete(key) + end + @weak_references_to_keys_map.delete(object_id) + end + end + end + end +end \ No newline at end of file diff --git a/activesupport/lib/active_support/weak_reference.rb b/activesupport/lib/active_support/weak_reference.rb new file mode 100644 index 0000000..6610aa9 --- /dev/null +++ b/activesupport/lib/active_support/weak_reference.rb @@ -0,0 +1,239 @@ +require 'thread' + +module ActiveSupport + # WeakReference is a class to represent a reference to an object that is not seen by + # the tracing phase of the garbage collector. This allows the referenced + # object to be garbage collected as if nothing is referring to it. + # + # This class provides several compatibility and performance benefits over using + # the WeakRef implementation that comes with Ruby and is compatible with Jruby. + # + # Usage: + # + # foo = Object.new + # ref = ActiveSupport::WeakReference.new(foo) + # ref.alive? # should be true + # ref.object # should be foo + # ObjectSpace.garbage_collect + # ref.alive? # should be false + # ref.object # should be nil + class WeakReference + # The object id of the object being referenced. + attr_reader :referenced_object_id + + class << self + # Create a new WeakReference. The actual implementation class will be + # optimized for the runtime is being used. If the default implementation + # isn't appropriate, it can be changed by setting a different one. + def new(obj) + ref = implementation.allocate + ref.instance_variable_set(:@referenced_object_id, obj.__id__) + ref.send(:initialize, obj) + ref + end + + # Get the implementation class for weak references. + def implementation + @implementation + end + + # Set the implementation class for weak references. + def implementation=(klass) + @implementation = klass + end + end + + # Get the referenced object. If the object has been reclaimed by the + # garbage collector, then this will return nil. + def object + raise NotImplementedError + end + + def inspect + obj = object + "<##{self.class.name}: #{obj ? obj.inspect : "##{referenced_object_id} (not accessible)"}>" + end + + # This is a pure ruby implementation of a weak reference. It is much more + # efficient than the bundled WeakRef implementation because it does not + # subclass Delegator which is very heavy to instantiate and utilizes a + # fair amount of memory. + # + # This implementation cannot be used by Jruby if ObjectSpace has been + # disabled. + class StandardImpl < WeakReference + # Map of references to the object_id's they refer to. + @@referenced_object_ids = {} + + # Map of object_ids to references to them. + @@object_id_references = {} + + @@mutex = Mutex.new + + # Finalizer that cleans up weak references when an object is destroyed. + @@object_finalizer = lambda do |object_id| + @@mutex.synchronize do + reference_ids = @@object_id_references[object_id] + if reference_ids + reference_ids.each do |reference_object_id| + @@referenced_object_ids.delete(reference_object_id) + end + @@object_id_references.delete(object_id) + end + end + end + + # Finalizer that cleans up weak references when references are destroyed. + @@reference_finalizer = lambda do |object_id| + @@mutex.synchronize do + referenced_id = @@referenced_object_ids.delete(object_id) + if referenced_id + obj = ObjectSpace._id2ref(referenced_object_id) rescue nil + if obj + backreferences = obj.instance_variable_get(:@__weak_backreferences__) if obj.instance_variable_defined?(:@__weak_backreferences__) + if backreferences + backreferences.delete(object_id) + obj.remove_instance_variable(:@__weak_backreferences__) if backreferences.empty? + end + end + references = @@object_id_references[referenced_id] + if references + references.delete(object_id) + @@object_id_references.delete(referenced_id) if references.empty? + end + end + end + end + + # Create a new weak reference to an object. The existence of the weak reference + # will not prevent the garbage collector from reclaiming the referenced object. + def initialize(obj) + ObjectSpace.define_finalizer(obj, @@object_finalizer) + ObjectSpace.define_finalizer(self, @@reference_finalizer) + @@mutex.synchronize do + @@referenced_object_ids[self.__id__] = obj.__id__ + add_backreference(obj) + references = @@object_id_references[obj.__id__] + unless references + references = [] + @@object_id_references[obj.__id__] = references + end + references.push(self.__id__) + end + end + + # Get the reference object. If the object has already been garbage collected, + # then this method will return nil. + def object + obj = nil + begin + if referenced_object_id == @@referenced_object_ids[self.object_id] + obj = ObjectSpace._id2ref(referenced_object_id) + obj = nil unless verify_backreferences(obj) + end + rescue RangeError + # Object has been garbage collected. + end + obj + end + + private + + def add_backreference(obj) + backreferences = obj.instance_variable_get(:@__weak_backreferences__) if obj.instance_variable_defined?(:@__weak_backreferences__) + unless backreferences + backreferences = [] + obj.instance_variable_set(:@__weak_backreferences__, backreferences) + end + backreferences << object_id + end + + def verify_backreferences(obj) + backreferences = obj.instance_variable_get(:@__weak_backreferences__) if obj.instance_variable_defined?(:@__weak_backreferences__) + backreferences && backreferences.include?(object_id) + end + end + + # This implementation of a weak reference utilizes the weakling library which will + # use native Java weak references when running under Jruby. If the weakling gem + # has been required this will automatically be used. + class WeaklingImpl < WeakReference + def initialize(obj) + @ref = ::Weakling::WeakRef.new(obj) + end + + def object + @ref.get + rescue RefError + nil + end + end + + # This implementation of a weak reference simply wraps the standard WeakRef implementation + # that comes with Ruby. It is used as a fallback for Jruby if case the weakling gem + # is not available. + class WeakRefImpl < WeakReference + def initialize(obj) + @ref = ::WeakRef.new(obj) + end + + def object + @ref.__getobj__ + rescue => e + # Jruby implementation uses RefError while MRI uses WeakRef::RefError + if (defined?(RefError) && e.is_a?(RefError)) || (defined?(::WeakRef::RefError) && e.is_a?(::WeakRef::RefError)) + nil + else + raise e + end + end + end + + # This implementation can be used for testing. It implements the proper interface, + # but allows for mimicking garbage collection on demand. + class TestImpl < WeakReference + @@object_list = {} + + def initialize(obj) + @object = obj + @@object_list[obj.__id__] = true + ObjectSpace.define_finalizer(self, lambda{@@object_list.delete(obj.__id__)}) + end + + def object + if @@object_list.include?(@object.__id__) + @object + else + @object = nil + end + end + + # Simulate garbage collection of the objects passed in as arguments. If no objects + # are specified, all objects will be reclaimed. + def self.gc(*objects) + if objects.empty? + @@object_list = {} + else + objects.each{|obj| @@object_list.delete(obj.__id__)} + end + end + end + + if defined?(RUBY_ENGINE) && RUBY_ENGINE == 'jruby' + # If using Jruby set the implementation depending on whether or not the weakling gem is installed. + begin + require 'weakling' + self.implementation = WeaklingImpl + rescue LoadError + require 'weakref' + self.implementation = WeakRefImpl + end + elsif defined?(RUBY_ENGINE) && RUBY_ENGINE == 'rbx' + # If using Rubinius set the implementation to use WeakRef since it is very efficient. + require 'weakref' + self.implementation = WeakRefImpl + else + self.implementation = StandardImpl + end + end +end diff --git a/activesupport/test/weak_hash_test.rb b/activesupport/test/weak_hash_test.rb new file mode 100644 index 0000000..ffd3dce --- /dev/null +++ b/activesupport/test/weak_hash_test.rb @@ -0,0 +1,102 @@ +require 'abstract_unit' +require 'active_support/weak_reference' +require 'active_support/weak_hash' + +class WeakHashTest < ActiveSupport::TestCase + + def setup + # Use an implementation of WeakReference that is meant for testing since it allows + # for simulating garbage collection. + @weak_reference_implementation = ActiveSupport::WeakReference.implementation + ActiveSupport::WeakReference.implementation = ActiveSupport::WeakReference::TestImpl + end + + def teardown + ActiveSupport::WeakReference.implementation = @weak_reference_implementation + ActiveSupport::WeakReference::TestImpl.gc + end + + def test_keeps_entries_with_strong_references + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + assert_equal value_1, hash["key 1"] + assert_equal value_2, hash["key 2"] + end + + def test_removes_entries_that_have_been_garbage_collected + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + assert_equal "value 2", hash["key 2"] + assert_equal "value 1", hash["key 1"] + ActiveSupport::WeakReference::TestImpl.gc(value_2) + assert_nil hash["key 2"] + assert_equal value_1, hash["key 1"] + end + + def test_can_clear_the_map + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + hash.clear + assert_nil hash["key 1"] + assert_nil hash["key 2"] + end + + def test_can_delete_entries + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + ActiveSupport::WeakReference::TestImpl.gc(value_2) + assert_nil hash.delete("key 2") + assert_equal value_1, hash.delete("key 1") + assert_nil hash["key 1"] + end + + def test_can_merge_in_another_hash + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + value_3 = "value 3" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + hash.merge!("key 3" => value_3) + assert_equal "value 2", hash["key 2"] + assert_equal value_1, hash["key 1"] + ActiveSupport::WeakReference::TestImpl.gc(value_2) + assert_nil hash["key 2"] + assert_equal value_1, hash["key 1"] + assert_equal value_3, hash["key 3"] + end + + def test_can_iterate_over_all_entries + hash = ActiveSupport::WeakHash.new + value_1 = "value 1" + value_2 = "value 2" + value_3 = "value 3" + hash["key 1"] = value_1 + hash["key 2"] = value_2 + hash["key 3"] = value_3 + ActiveSupport::WeakReference::TestImpl.gc(value_2) + keys = [] + values = [] + hash.each{|k,v| keys << k; values << v} + assert_equal ["key 1", "key 3"], keys.sort + assert_equal ["value 1", "value 3"], values.sort + end + + def test_inspect + hash = ActiveSupport::WeakHash.new + hash["key 1"] = "value 1" + assert hash.inspect + end +end diff --git a/activesupport/test/weak_reference_test.rb b/activesupport/test/weak_reference_test.rb new file mode 100644 index 0000000..5d48f94 --- /dev/null +++ b/activesupport/test/weak_reference_test.rb @@ -0,0 +1,55 @@ +require 'abstract_unit' +require 'active_support/weak_reference' + +module WeakReferenceTestBehaviors + def test_references_can_get_non_garbage_collected_objects + obj = Object.new + ref = ActiveSupport::WeakReference.new(obj) + assert_equal obj, ref.object + assert_equal obj.object_id, ref.referenced_object_id + end + + def test_references_get_the_correct_object + # Since we can't reliably control the garbage collector, this is a brute force test. + id_to_ref = {} + 10000.times do |i| + obj = Object.new + if id_to_ref.key?(obj.object_id) + ref = id_to_ref[obj.object_id] + if ref.object + flunk "weak reference found with a live reference to an object that was not the one it was created with" + break + end + end + id_to_ref[obj.object_id] = ActiveSupport::WeakReference.new(obj) + GC.start if i % 1000 == 0 + end + end + + def test_inspect + ref = ActiveSupport::WeakReference.new(Object.new) + assert ref.inspect + end +end + +class WeakReferenceTest < ActiveSupport::TestCase + def self.weak_reference_implementation + ActiveSupport::WeakReference + end + include WeakReferenceTestBehaviors +end + +# If Jruby is using the weakling backed implementation, test the WeakRef fallback implementation as well. +if defined?(RUBY_ENGINE) && RUBY_ENGINE == 'jruby' + if defined?(::Weakling::WeakRef) + class WeakReferenceWeakRefImplTest < ActiveSupport::TestCase + def self.weak_reference_implementation + ActiveSupport::WeakReference::WeaklingImpl + end + include WeakReferenceTestBehaviors + end + else + $stderr.puts "Skipping ActiveSupport::WeakReference::WeakingImpl tests. Install the weakling gem to run them." + end +end + -- 1.6.4.1