DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Producer/Consumer Test

04.19.2007
| 1963 views |
  • submit to reddit
        
Simple threading test using producer/consumer model.
A single producer running in its own thread generates 
a bunch of random numbers, writes them to a globally 
accessible problem queue, and sleeps for a while before
repeating.

A set of twenty consumer objects, each operating in their
own threads, repeatedly check the problem queue for new values.

If discovered, they sleep for a brief, random period to simulate 
crazy amounts of calculations taking place, and then spit out
the problem / 2 to the solutions queue.
 
Meanwhile, the main method spins in an infinite loop, printing
the status of all of the consumers, the total number of problems
generated, and the total number of solutions calculated.

Monitors are used when r/w activity takes place on the
problem and solution queues to prevent the nasties. 

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace ThreadTest {
    class Program {
        private static int totalProblems;
        private static Random r;

        public static Queue<int> problems;
        public static Queue<int> solutions;        

        static void Main(string[] args) {
            problems = new Queue<int>();
            solutions = new Queue<int>();
            r = new Random();

            //Initialize threads
            Thread producerThread = new Thread(producer);            
            List<Consumer> consumers = new List<Consumer>();
            for (int x = 1; x <= 20; x++) {                
                consumers.Add(new Consumer(x.ToString()));
            }           

            //Activate threads
            producerThread.Start();
            foreach (Consumer c in consumers) { c.startConsuming(); }            

            //Idle & display status
            while (true) {
                Console.Clear();
                string output = "Problems: " + totalProblems + " | Solutions: " + solutions.Count + "\n";
                foreach (Consumer c in consumers) {                   
                    output += c.ToString() + "\n";
                }
                Console.Write(output);
                
                Thread.Sleep(1000);            
            }
        }

        static void producer() {
            while (true) {
                int numNewItems = 500;
                totalProblems += numNewItems;

                Monitor.Enter(Program.problems);
                for (int x = 0; x < numNewItems; x++) {
                    problems.Enqueue(r.Next(1, 1000));
                }
                Monitor.Exit(Program.problems);                
                Thread.Sleep(20000);
            }
        }        
    }

    enum Status { WORKING, SLEEPING };
    class Consumer {
        public Thread thread;
        public Status status;
        private Random r;

        public Consumer(string name) {
            thread = new Thread(this.consume);
            thread.Name = name;
            r = new Random();
            status = Status.SLEEPING;
        }

        public void startConsuming() {
            thread.Start();
        }

        public override string ToString() {
            string statusName = (status == Status.SLEEPING) ? "sleeping" : "working";
            return thread.Name + ": " + statusName;
        }

        private void consume() {
            while (true) {
                Monitor.Enter(Program.problems);
                int count = Program.problems.Count;
                Monitor.Exit(Program.problems);

                //If there is an item to grab, grab it, pretend to work, and
                //queue up the "solution."
                if (count != 0) {
                    status = Status.WORKING;
                    Monitor.Enter(Program.problems);
                    int problem = Program.problems.Dequeue();
                    Monitor.Exit(Program.problems);

                    Thread.Sleep(r.Next(200, 300)); //Pretend to work

                    Monitor.Enter(Program.solutions);
                    Program.solutions.Enqueue(problem / 2); //Sweet!  A divide-by-two machine!
                    Monitor.Exit(Program.solutions);

                    status = Status.SLEEPING;
                }
            }
        }
    }
}