DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

A CountingSort Parallelized Using OpenMP.

07.28.2007
| 6723 views |
  • submit to reddit
        CountingSort in C++ parallelized with OpenMP.

/***************************************************************************
 *   Copyright (C) 2007 by Manuel Holtgrewe                                *
 *   holtgrewe@ira.uka.de                                                  *
 *   Distributed under the Boost Software License, Version 1.0.            *
 *   (See accompanying file LICENSE_1_0.txt or copy at                     *
 *   http://www.boost.org/LICENSE_1_0.txt)                                 *
 ***************************************************************************/

#include <algorithm>
#include <meta/mcstl_timing.h>
#include <omp.h>

mcstl::Timing<mcstl::active_tag> timer;

int benchmark = false;

/* OpenMP Parallel Counting Sort
 *
 * A simple implementation of a parallel counting sort algorithm.
 *
 * The bucket filling and final result sorting have been parallelized. Copying
 * back the sorted elements into the input iterators is done using std::copy
 * which is already parallelized in the MCSTL.
 *
 * See:
 *  - http://de.wikipedia.org/wiki/Countingsort
 *  - http://www.umiacs.umd.edu/research/EXPAR/papers/3548/node8.html
 *
 * Measurements on a dual CPU, dual core Intel Xeon with 2.4Ghz yielded no speedup
 * with more than 2 threads, speed broke down, possibly because of the very simple
 * algorithm becoming memory bound (GCC 4.2). Results were the same on a 4 core
 * AMD Opteron system. These results hold for array sizes of up to 2**28 (filling
 * 2 GB of RAM on a 64bit machine leaving enough room for the buffer and the OS).
 *
 * See ExperimentalResults.txt.
 *
 * Parameters
 *
 * Sorts elements [begin, end) "in place". The elements must be in the range of
 * [0, max_key].
 */
template <typename RandomIterator, typename HashFunctor>
void counting_sort(RandomIterator *begin, RandomIterator *end, int max_key, HashFunctor hash)
{
	if (benchmark) timer.tic("start");
	
	assert(max_key >= 0);
	
	// number of threads and number of elements to use
	int num_threads = mcstl::HEURISTIC::num_threads;
	int n = end - begin;
	assert(n >= 0);
	
	// *********** STEP 1: COUNTING ***********
	// result of this step: local rank bases
	
	// create one counter array for each thread
	int **thread_counters = new int* [num_threads];
	for (int i = 0; i < num_threads; i++)
		thread_counters[i] = new int[max_key + 1];
	if (benchmark) timer.tic("counting initialization           ");
	
	// count occurences
	int *split_positions = mcstl::equal_splitting(n, num_threads);
	#pragma omp parallel num_threads(num_threads)
	{
		int iam = omp_get_thread_num();
		int *&c = thread_counters[iam];
		
		// reset counters
		for (int i = 0; i <= max_key; i++)
			c[i] = 0;
		
		// count occurences
		for (int i = split_positions[iam]; i < split_positions[iam + 1]; i++)
		{
			c[hash(begin[i])]++;
		}
	}
	if (benchmark) timer.tic("counting & local prefix sums      ");
	
	// Compute global prefix sums / ranks from local ones. We *could*
	// make this parallel, too, but there are only num_threads * (max_key + 1)
	// entries in total.
	for (int i = 0, sum = 0; i <= max_key; i++)
		{
			for (int j = 0; j < num_threads; j++)
				{
					int t = thread_counters[j][i];
					thread_counters[j][i] = sum;
					sum += t;
				}
		}
	if (benchmark) timer.tic("global ranks (prefix sums)        ");
	
	// *********** STEP 2: SORTING ***********
	
	int *buffer = new int[n]; // backbuffer, copied back to input later
	
	// write sorted result to backbuffer
	#pragma omp parallel num_threads(num_threads)
	{
		int iam = omp_get_thread_num();
		int *&c = thread_counters[iam];
		
		for (int i = split_positions[iam]; i < split_positions[iam + 1]; i++)
			{
				assert(hash(begin[i]) <= max_key);
				buffer[c[hash(begin[i])]++] = begin[i];
			}
	}	
	if (benchmark) timer.tic("sorting into backbuffer           ");
	
	// write result from buffer back into input
	std::copy(buffer, buffer + n, begin);
	
	if (benchmark) timer.tic("copying back into input           ");
	
	// cleanup
	delete [] buffer;

	for (int i = 0; i < num_threads; i++)
		delete [] thread_counters[i];
	delete [] thread_counters;

	delete [] split_positions;
	if (benchmark) timer.tic("cleanup                           ");
}