DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world
Ruby Thread Pool
This is useful for a variety of scenarios. You create a thread pool, give it a maximum size, and pass a block to it everytime you need something processed. If all threads are busy, you block until a thread becomes free.
For example:
pool = ThreadPool.new(10) # up to 10 threads
email_addresses.each do |addr|
pool.process {send_mail_to addr}
end
require 'thread'
class ThreadPool
class Worker
def initialize
@mutex = Mutex.new
@thread = Thread.new do
while true
sleep 0.001
block = get_block
if block
block.call
reset_block
end
end
end
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
end
attr_accessor :max_size
attr_reader :workers
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def join
sleep 0.01 while busy?
end
def process(&block)
wait_for_worker.set_block(block)
end
def wait_for_worker
while true
worker = find_available_worker
return worker if worker
sleep 0.01
end
end
def find_available_worker
@mutex.synchronize {free_worker || create_worker}
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new
@workers << worker
worker
end
end






Comments
Snippets Manager replied on Mon, 2012/03/19 - 12:29am
require 'thread' class ThreadPool def initialize size=4 @size = size @queue = SizedQueue.new 1 @threads = @size.times.map { Thread.new &method(:worker) } end def enqueue *arguments, &work @queue << [work, arguments] end def join @queue << nil @threads.each do |thread| thread[:mutex].synchronize do thread.kill end end end private def worker mutex = Thread.current[:mutex] = Mutex.new loop do mutex.synchronize do if work = @queue.shift work.shift.call *work end end end end end module Enumerable def in_thread_pool size=4, &block to_enum.in_thread_pool size, &block end def in_threads &block to_enum.in_threads &block end end class Enumerator def in_thread_pool size=4, &block size = size[:of] if size.is_a? Hash pool = ThreadPool.new size each do |*args| pool.enqueue { block.call *args } end pool.join end def in_threads &block map do |*args| Thread.new *args, &block end.each(&:join) end endExample:irb> 2.times { (1..10).in_thread_pool(of: 2) { |number| print "#{number}\n" } } 1 2 3 4 5 6 7 8 9 10 1 2 3 4 5 6 8 7 9 10Snippets Manager replied on Sat, 2009/05/09 - 2:35am
Snippets Manager replied on Tue, 2007/03/20 - 1:58pm
pool = ThreadPool.new(10) # up to 10 threads email_addresses.each do |addr| pool.process {send_mail_to addr} endShould really readpool = ThreadPool.new(10) # up to 10 threads email_addresses.each do |addr| pool.process {send_mail_to addr} end pool.join()Otherwise your program will terminate before your pool has finished runningSnippets Manager replied on Fri, 2007/01/26 - 10:34pm
def process(&block) while true @mutex.synchronize do worker = find_available_worker if worker return worker.set_block(block) end end sleep 0.01 end end def find_available_worker free_worker || create_worker end