DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world
Producer/Consumer Test
Simple threading test using producer/consumer model.
A single producer running in its own thread generates
a bunch of random numbers, writes them to a globally
accessible problem queue, and sleeps for a while before
repeating.
A set of twenty consumer objects, each operating in their
own threads, repeatedly check the problem queue for new values.
If discovered, they sleep for a brief, random period to simulate
crazy amounts of calculations taking place, and then spit out
the problem / 2 to the solutions queue.
Meanwhile, the main method spins in an infinite loop, printing
the status of all of the consumers, the total number of problems
generated, and the total number of solutions calculated.
Monitors are used when r/w activity takes place on the
problem and solution queues to prevent the nasties.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace ThreadTest {
class Program {
private static int totalProblems;
private static Random r;
public static Queue<int> problems;
public static Queue<int> solutions;
static void Main(string[] args) {
problems = new Queue<int>();
solutions = new Queue<int>();
r = new Random();
//Initialize threads
Thread producerThread = new Thread(producer);
List<Consumer> consumers = new List<Consumer>();
for (int x = 1; x <= 20; x++) {
consumers.Add(new Consumer(x.ToString()));
}
//Activate threads
producerThread.Start();
foreach (Consumer c in consumers) { c.startConsuming(); }
//Idle & display status
while (true) {
Console.Clear();
string output = "Problems: " + totalProblems + " | Solutions: " + solutions.Count + "\n";
foreach (Consumer c in consumers) {
output += c.ToString() + "\n";
}
Console.Write(output);
Thread.Sleep(1000);
}
}
static void producer() {
while (true) {
int numNewItems = 500;
totalProblems += numNewItems;
Monitor.Enter(Program.problems);
for (int x = 0; x < numNewItems; x++) {
problems.Enqueue(r.Next(1, 1000));
}
Monitor.Exit(Program.problems);
Thread.Sleep(20000);
}
}
}
enum Status { WORKING, SLEEPING };
class Consumer {
public Thread thread;
public Status status;
private Random r;
public Consumer(string name) {
thread = new Thread(this.consume);
thread.Name = name;
r = new Random();
status = Status.SLEEPING;
}
public void startConsuming() {
thread.Start();
}
public override string ToString() {
string statusName = (status == Status.SLEEPING) ? "sleeping" : "working";
return thread.Name + ": " + statusName;
}
private void consume() {
while (true) {
Monitor.Enter(Program.problems);
int count = Program.problems.Count;
Monitor.Exit(Program.problems);
//If there is an item to grab, grab it, pretend to work, and
//queue up the "solution."
if (count != 0) {
status = Status.WORKING;
Monitor.Enter(Program.problems);
int problem = Program.problems.Dequeue();
Monitor.Exit(Program.problems);
Thread.Sleep(r.Next(200, 300)); //Pretend to work
Monitor.Enter(Program.solutions);
Program.solutions.Enqueue(problem / 2); //Sweet! A divide-by-two machine!
Monitor.Exit(Program.solutions);
status = Status.SLEEPING;
}
}
}
}
}





