A threading example implementation of the producer - consumer paradigm implemented in Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
class Mgr | |
# This initiates the queue and the threads | |
def self.run | |
@mutex = Mutex.new | |
@cv = ConditionVariable.new | |
@working = true | |
@clue_bank = 0 | |
10.times{|i| | |
puts "#{i} started from (#{Thread.current.inspect}" | |
Wrk.new.init(i) | |
} | |
end | |
# this methos services the threads to propogate a stop signal | |
def self.working? | |
@working | |
end | |
# this method is used by the workers to grab a task from the queue | |
# if no talk is available the worker is set to sleep untill awakened | |
# the next time new items are pushed into the queue | |
def self.get_a_clue | |
@mutex.synchronize { | |
if @clue_bank>1 | |
@clue_bank -=1 | |
@clue_bank | |
else | |
@cv.wait(@mutex) | |
false | |
end | |
} | |
end | |
#this method emulates adding new tasks to the queue | |
# note: it is not thread safe because its just a simple example. | |
# if you need this to be thread safe, then wrap it in a mutex.sync | |
# just like in the get_a_clue method | |
def self.pop(c) | |
@clue_bank += c | |
@cv.broadcast | |
end | |
#this method is used to announce all the threds to stop and terminate | |
def self.stop | |
@working = false | |
@cv.broadcast | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'open-uri' | |
class Wrk | |
#the main loop around for a particular thread | |
def run | |
while Mgr.working? | |
clue = Mgr.get_a_clue | |
puts "runner #{@i} got #{clue} (#{Thread.current.inspect})" | |
if clue | |
puts "runner #{@i} working..." | |
open("http://giladmanor.blogspot.co.il/") | |
puts "runner #{@i} done." | |
end | |
end | |
end | |
# an initialization method for a worker | |
# note that the thread is browken off at this point | |
def init(i) | |
@i = i | |
Thread.new { | |
self.run | |
} | |
end | |
end | |