DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Ruby Thread Pool

01.12.2007
| 26747 views |
  • submit to reddit
        This is useful for a variety of scenarios. You create a thread pool, give it a maximum size, and pass a block to it everytime you need something processed. If all threads are busy, you block until a thread becomes free.

For example:

pool = ThreadPool.new(10) # up to 10 threads
email_addresses.each do |addr|
  pool.process {send_mail_to addr}
end

require 'thread'

class ThreadPool
  class Worker
    def initialize
      @mutex = Mutex.new
      @thread = Thread.new do
        while true
          sleep 0.001
          block = get_block
          if block
            block.call
            reset_block
          end
        end
      end
    end
    
    def get_block
      @mutex.synchronize {@block}
    end
    
    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
      end
    end
    
    def reset_block
      @mutex.synchronize {@block = nil}
    end
    
    def busy?
      @mutex.synchronize {!@block.nil?}
    end
  end
  
  attr_accessor :max_size
  attr_reader :workers

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
  end
  
  def size
    @mutex.synchronize {@workers.size}
  end
  
  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end
  
  def join
    sleep 0.01 while busy?
  end
  
  def process(&block)
    wait_for_worker.set_block(block)
  end
  
  def wait_for_worker
    while true
      worker = find_available_worker
      return worker if worker
      sleep 0.01
    end
  end
  
  def find_available_worker
    @mutex.synchronize {free_worker || create_worker}
  end
  
  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end
  
  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new
    @workers << worker
    worker
  end
end
    

Comments

Snippets Manager replied on Mon, 2012/03/19 - 12:29am

A good thread pool doesn't need any sleep statements. Here's a thread pool implemented using Ruby's native SizedQueue class, and a couple helper methods to make enumerable processing easier: require 'thread' class ThreadPool def initialize size=4 @size = size @queue = SizedQueue.new 1 @threads = @size.times.map { Thread.new &method(:worker) } end def enqueue *arguments, &work @queue << [work, arguments] end def join @queue << nil @threads.each do |thread| thread[:mutex].synchronize do thread.kill end end end private def worker mutex = Thread.current[:mutex] = Mutex.new loop do mutex.synchronize do if work = @queue.shift work.shift.call *work end end end end end module Enumerable def in_thread_pool size=4, &block to_enum.in_thread_pool size, &block end def in_threads &block to_enum.in_threads &block end end class Enumerator def in_thread_pool size=4, &block size = size[:of] if size.is_a? Hash pool = ThreadPool.new size each do |*args| pool.enqueue { block.call *args } end pool.join end def in_threads &block map do |*args| Thread.new *args, &block end.each(&:join) end end Example: irb> 2.times { (1..10).in_thread_pool(of: 2) { |number| print "#{number}\n" } } 1 2 3 4 5 6 7 8 9 10 1 2 3 4 5 6 8 7 9 10

Snippets Manager replied on Sat, 2009/05/09 - 2:35am

why the sleep 0.01? I believe it's possible to use sleep(0) and #wakeup or something to avoid it :) -r

Snippets Manager replied on Tue, 2007/03/20 - 1:58pm

One other thing is that the example of usage provided pool = ThreadPool.new(10) # up to 10 threads email_addresses.each do |addr| pool.process {send_mail_to addr} end Should really read pool = ThreadPool.new(10) # up to 10 threads email_addresses.each do |addr| pool.process {send_mail_to addr} end pool.join() Otherwise your program will terminate before your pool has finished running

Snippets Manager replied on Fri, 2007/01/26 - 10:34pm

There seems to be a race condition when executing a block- two jobs may try to grab the same thread. Changing find_available_worker() and wait_for_worker() like this solves the problem: def process(&block) while true @mutex.synchronize do worker = find_available_worker if worker return worker.set_block(block) end end sleep 0.01 end end def find_available_worker free_worker || create_worker end