Development

Multi-threading with Celluloid


Last month, a coworker and I were working with complex tasks that required a lot of time to process, so he suggested we use Celluloid to reduce the amount of time by making our tasks multi-thread. So we decided to give it a a try, but first we had to do a lot of reading to learn how to use it.

They say the best way to learn something is by teaching others, so I will give you a few examples on how to implement Celluloid and I hope you find them useful.

Setting up

First you want to make sure you have the gem installed by adding Celluloid to the Gemfile gem 'celluloid', then just bundle install and don't forget to require 'celluloid/autostart' wherever you need to use it and you should be all set and ready.

Basic Usage

Now let's get you through which I believe is the most basic example.

require 'celluloid/autostart'

class Test
  include Celluloid

  def foo
    sleep 1
    puts 'Foo'
  end

end

Now, you should be able to use this class both sync and async way, if you wish to use the foo method as usual you should do the following.

Test.new.foo
=> 'Foo'

This will sleep for 1 second and then print the result. But if you want to do this async, well that's pretty simple.

Test.new.async.foo
=> nil
> 'Foo'

This code will free the current thread so you can keep working and, one second after, it will print the result. It's that easy, just chain the async method before any other instance method.

  • But how is this improved if it always waits for 1 second?
  • Isn't this the same as calling foo normally?

Well, kind off, yeah, but imagine we have the following:

10.times{ Test.new.foo }

This will take 10 seconds to finish, and if we do this:

10.times{ Test.new.async.foo }

How long do you think this will take? If you thought of 1 second more or less, well, you guessed right.

Pools

Using Celluloid as in the following example could cause your computer to freeze.

1000000000.times{ #some complex async celluloid operation }

So, instead, we should use pools, as simply as this.

Test.pool.async.foo

This would create a pool which size is equal to the number of cores in your computer, but if you want to specify the size of the pool you can just pass an argument to the pool method, being 2 the minimum.

Test.pool(size: 2)

And if your class requires an argument for the initializer you can do the following:

Test.pool(size: 2, args: [:foo])

So the first example would end up like this:

pool = Test.pool(4)
100000000.times{ pool.complex_operation }

This will assign the operation to a pool if it is free and would use only those.

Futures

What the hell is a future you may ask, well, futures in Celluloid are ways to request a computation and get the result later in the future. That's why they are called futures. Let's take a look into the following class.

require 'celluloid/autostart'

class FutureTest

  include Celluloid

  def foo
    sleep 5
    'Foo'
  end

end

The way we use futures is the same as we use the async method, you can just write:

FutureTest.new.future.foo

Now the line above would return a future object, something that looks like this #<Celluloid::Future:0x007fef5eb01698> if you want get the result of the foo method you should call value on the future object.

future = FutureTest.new.future.foo
sleep 2
future.value

According to the class, the foo method takes 5 seconds to complete, so if our other operations take 2 seconds to complete, by the time we call value it should take only 3 seconds to return the result. But, if our other computation takes 5 seconds then the value should return the result immediately.

But what if you want to know when the thread has finished the execution? Well for those cases we have a way to do it, and that is through notifications.

Notifications

Notifications are a Celluloid submodule that allow the actors to send and receive notifications. Look at the following class:

require 'celluloid/autostart'

class NotificationTest

  include Celluloid
  include Celluloid::Notifications

  def foo
    sleep 10
    publish 'done', 'Slept for 10 secs I\'m awake now'
  end
end

As you can see, there are two different things in this class

  • The Notifications module inclusion
  • The publish method

The publish method receives two arguments, the first one is the channel name and the second is the argument to pass to the receiving method. Now we need a class to listen to that channel.

class Observer

  include Celluloid
  include Celluloid::Notifications

  def initialize
    subscribe 'done', :on_completion
  end

  def on_completion(*args)
    puts args.inspect
  end
end

In the Observer class you can see how we are subscribing to the same channel as the NotificationTest class is publishing to, the subscribe method receives two arguments: the channel and the method which will be called. This method will be called with multiple arguments, being the first one the channel and the others would be whatever you passed into the publish method.

Now let's create an example, if you put the following into the irb console:

Observer.new

NotificationTest.new.async.foo
=> nil

As you can see, we created an instance of the observer class to subscribe to the 'done' channel before calling foo that would actually publish to it. There is no need to save the instance of the observer, it would work anytime as long as that channel is the one being published to.

So after the observer gets instantiated, we just need to asynchronously call foo and wait 10 seconds until the work is done to trigger the observer method.

And that would be all from me, these were basic examples that would be enough for many projects, if you need something more advanced or complex you can read Celluloid's wiki.

Thanks for reading and if you have any questions don't hesitate in contacting me.

AEM
How to use internal redirects in AEM?
Community
De Código, Café y Cervezas 06 – ActiveModel::Serializer
Best Practices
De Código, Café y Cervezas 04 – ReactJS + AngularJS (Parte 2)