Inside ActiveSupport Notifications
- Preparation
- ActiveSupport::Notifications
- ActiveSupport::Notifications::Fanout
- ActiveSupport::Notifications::Instrumenter
- ActiveSupport::Notifications::Event
- Main Working Flow
- MISC
- Reference
Preparation
- Read API doc first, ActiveSupport::Notifications.
- About hooks inside Rails for instrumentation, check the edge doc on Active Support Instrumentation.
Then let’s read through the ActiveSupport::Notifications source code.
ActiveSupport::Notifications
File name
activesupport/lib/active_support/notifications.rb
Dependency
active_support/notifications/instrumenteractive_support/notifications/fanoutactive_support/per_thread_registry
Brief
- A class attribute accessor
notifier, which is initialized byFanout.new. - Several class methods as the interfaces exposed, which encapsulate
notiferto do the real work. - A class named InstrumentationRegistry.
Specification
Class Variables
self.notifier = Fanout.new
Class Methods
-
subscribe,unsubscribeandpublishall delegate tonotifier, likeruby def subscribe(*args, &block) notifier.subscribe(*args, &block) end -
subscribed(callback, *args, &block),subscribewhileblockis running, andunsubscribewhile running is over.ruby def subscribed(callback, *args, &block) subscriber = subscribe(*args, &callback) yield ensure unsubscribe(subscriber) end -
instrument(name, payload = {})ruby def instrument(name, payload = {}) if notifier.listening?(name) instrumenter.instrument(name, payload) { yield payload if block_given? } else yield payload if block_given? end end
notifier.listening?(name) is checking if there are subscribers listening on the event name.
And what’s an instrumenter?
Check out the ActiveSupport::Notifications::Instrumenter below or skip this part temporally.
Let’s talk about InstrumentationRegistry first, it is a sub-class defined in ActiveSupport::Notifications. It extends ActiveSupport::PerThreadRegistry to keep thread safe, and defines #instrumenter_for used for recording Instrumenter instance for specific notifier.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Then look back in ActiveSupport::Notifications for its usage:
1 2 3 | |
Extending ActiveSupport::PerThreadRegistry gives InstrumentationRegistry the instance class methods, returns a thread local InstrumentationRegistry instance. Check the ActiveSupport::PerThreadRegistry api for details.
ActiveSupport::Notifications::Fanout
File name
activesupport/lib/active_support/notifications/fanout.rb
Dependency
mutex_m, a Ruby Std-lib module. apithread_safe, a collection of thread-safe versions of common core Ruby classes. api
Brief
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Specification
Instance Variables
@subscribers, an array, records the subscribers.
@listeners_for, a reverse map(hash). It maps the event name to the subscribers. Initialized by ThreadSafe::Cache.new, thread_safe gem says:
ThreadSafe::Cachealso exists, as a hash-like object, and should have much better performance characteristics esp. under high concurrency thanThreadSafe::Hash. However,ThreadSafe::Cacheis not strictly semantically equivalent to a rubyHash– for instance, it does not necessarily retain ordering by insertion time asHashdoes. For most uses it should do fine though, and we recommend you considerThreadSafe::Cacheinstead ofThreadSafe::Hashfor your concurrency-safe hash needs. It understands some options when created (depending on your ruby platform) that control some of the internals.
Instance Methods
-
subscribe(pattern = nil, block = Proc.new)ActiveSupport::Notifications.subscribeuse this method onnotifierto subscribe event name based onpatternand ablockto do the instrumentation callback.It initialize a subscriber with
Subscribers.new pattern, block, usesynchronize(for thread safe) to record subscriber into@subscribers, and clear the@listeners_for, then returns the subscriber.ruby def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block synchronize do @subscribers << subscriber @listeners_for.clear end subscriber end -
unsubscribe(subscriber)ruby synchronize do @subscribers.reject! { |s| s.matches?(subscriber) } @listeners_for.clear end endNote the
matches?method on subscriber. Every subscriber object defines this method, Subscribers::Evented and Subscribers::Timed defines it like this:ruby def matches?(subscriber_or_name) self === subscriber_or_name || @pattern && @pattern === subscriber_or_name endUnsubscribe a subscriber object or unsubscribe based on the
@patternmatching.Subscribers::AllMessages alias
matches?to===, just do the type matching. -
start,finish,publish(name, id, payload)just delegates to subscribers based on the event name.
ruby def start(name, id, payload) listeners_for(name).each { |s| s.start(name, id, payload) } end -
listeners_for(name)a helper method, equals fetch or set on
@listeners_for, returns the subscribers based on the event name.ruby def listeners_for(name) # this is correctly done double-checked locking (ThreadSafe::Cache's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } end end -
listening?(name)a helper method, checks if
listeners_for(name).any?. -
waitas this is a sync queue, this method is left blank.
Modules
-
Subscribers
Subscribers defines a class method
newand three sub-classes: Evented, Timed, and AllMessages. Timed inheritates Evented, and AllMessages encapsulates an Evented object.About
self.new(pattern, listener), remember where doesSubscribers.newget called?It’s in
Fanout#subscribe(pattern = nil, block = Proc.new). If theblockcan duck-typing:startand:finish, it’ll initialize a subscriber by Evented withpatternandblockrecorded, otherwise by Timed. And ifpatternis nil, which means callingActiveSupport::Notifications.subscribewithout event name,Subscribers.newreturns an AllMessages object which initialized with a subscriber defined above. Otherwise(ifpatternpresents) returns a subscriber directly.Normally we use
ActiveSupport::Notifications.subscribein two ways:- subscribe all events, which means no
patternpassed, and a Subscribers::AllMessages instance saved. - the
blockwe pass toActiveSupport::Notifications.subscribewon’t respond tostartandfinish, which means a Subscribers::Timed instance saved.
These two ways have no conflicts: if passing a nil pattern and a block, it just returns an Subscribers::AllMessages instance which has a Subscribers::Timed instance wrapped.
So let’s check it out what does a Subscribers::Timed instance respond?
- subscribe all events, which means no
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | |
ActiveSupport::Notifications::Instrumenter
File name
activesupport/lib/active_support/notifications/instrumenter.rb
Dependency
securerandom, api
Brief
Specification
Instance Variables
@id, with an attr_reader, generated by SecureRandom.hex(10).
@notifier, records the Fanout instance.
Instance Methods
start(name, payload),finish(name, payload)
1 2 3 4 5 6 7 | |
instrument(name, payload={})
1 2 3 4 5 6 7 8 9 10 11 | |
Where does this method get called? in `ActiveSupport::Notifications.instrument`:
1
| |
The processing begins with `start`, ends with `finish`, instrumenter delegates it to `@notifier`, and `notifier` turns to `@subscribers` which are listening for the event name. And what does `@subscribers` do with `start` and `finish`? Normally we use subscriber objects defined by `Subscribers::Timed`, which use `start` to save a beginning timestamp, and `finish` to save an ending timestamp, and calling the block user passed.
Attention on the control flow, the events get sent even if an error occurs in the passed-in block.
ActiveSupport::Notifications::Event
File name
activesupport/lib/active_support/notifications/instrumenter.rb
Dependency
Specification
Note that this class has a @children instance variable recording associations between events, and these two methods:
1 2 3 4 5 6 7 | |
Main Working Flow
ActiveSupport::Notifications.subscribe
ActiveSupport::Notifications.subscribe(name) {|*args| }.- in Notficiations.subscribe,
notifier.subscribe(*args, &block). - in Fanout#subscribe,
subscriber = Subscribers.new pattern, block, then records subscriber into@subscribers.
ActiveSupport::Notifications.instrument
ActiveSupport::Notifications.instrument(name, payload) { }.- in Notficiations.instrument,
instrumenter.instrument(name, payload) { yield payload if block_given? }. - in Instrumenter#instrument,
@notifier.start,yield payloadand then@notifier.finish. - in Fanout::Subscribers::Timed#start, records beginning time.
- in Fanout::Subscribers::Timed#finish, records ending time, and passing
name, start_time, end_time, id, payloadto the subscribers callbacks.
MISC
How does ActiveSupport::Notifications keep thread safe?
extend ActiveSupport::PerThreadRegistry in InstrumentationRegistry.
Reference
Unit test:
- activesupport/test/notifications_test.rb
- activesupport/test/notifications/evented_notification_test.rb
- activesupport/test/notifications/instrumenter_test.rb
On Notifications, Log Subscribers, and Bringing Sanity to Rails’ Logging
#249 Notifications in Rails 3
Digging Deep with ActiveSupport::Notifications