/* * sort.c - Parallel sort * * Author Cesar Fuguet Tortolero (2013) * Alain Greiner (2019) * * Copyright (c) UPMC Sorbonne Universites * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by * the Free Software Foundation; version 2.0 of the License. * * It is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ALMOS-MKH; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /////////////////////////////////////////////////////////////////////////////// // This multi-threaded application implement a multi-stage sort. // It has been writen by Cesar Fuget Tortolero in 2013. // It has been ported on ALMOS-MKH by Alain Greiner in 2019. // // There is one thread per physical cores. // Computation is organised as a binary tree: // - All threads execute in parallel a buble sort on a sub-array during the // the first stage of parallel sort, // - The number of participating threads is divided by 2 at each next stage, // to make a merge sort, on two subsets of previous stage. // // Number_of_stages = number of barriers = log2(Number_of_threads) // // The various stages are separated by synchronisation barriers, and the // main thread uses the join syscall to check that all threads completed // before printing the computation time (sequencial & parallel). // These results can be - optionnaly - registered in an instrumentation file. // // Constraints : // - It supports up to 1024 cores: x_size, y_size, and ncores must be // power of 2 (max 16*16 clusters / max 4 cores per cluster) // _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2 // larger than the number of cores. /////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #define ARRAY_LENGTH 2048 // number of items #define MAX_THREADS 1024 // 16 * 16 * 4 #define X_MAX 16 // max number of clusters in a row #define Y_MAX 16 // max number of clusters in a column #define CORES_MAX 4 // max number of cores in a cluster #define CLUSTERS_MAX X_MAX * Y_MAX #define USE_DQT_BARRIER 1 // use DQT barrier if non zero #define DISPLAY_ARRAY 0 // display items values before and after #define DEBUG_MAIN 0 // trace main function #define DEBUG_SORT 0 // trace sort function #define CHECK_RESULT 0 // for debug #define INSTRUMENTATION 1 // register computation times on file /////////////////////////////////////////////////////////////////////////////////// // Arguments for the sort() function /////////////////////////////////////////////////////////////////////////////////// typedef struct { unsigned int tid; // continuous thread index unsigned int threads; // total number of threads pthread_barrier_t * parent_barrier; // pointer on termination barrier } sort_args_t; //////////////////////////////////////////////////////////////////////////////////// // Sort specific global variables //////////////////////////////////////////////////////////////////////////////////// int array0[ARRAY_LENGTH]; // values to sort int array1[ARRAY_LENGTH]; pthread_barrier_t barrier; // synchronisation variables ///////////////////////////////////////////////////////////////////////////////////// // Global variables required by parallel_pthread_create() ///////////////////////////////////////////////////////////////////////////////////// // 2D arrays of input arguments for the threads // These arrays are initialised by the application main thread sort_args_t sort_args[CLUSTERS_MAX][CORES_MAX]; // sort function arguments sort_args_t * sort_ptrs[CLUSTERS_MAX][CORES_MAX]; // pointers on arguments // 1D array of barriers to allow the threads to signal termination // this array is initialised by the pthread_parallel_create() function pthread_barrier_t parent_barriers[CLUSTERS_MAX]; // termination barrier //////////////////////////////////// static void bubbleSort( int * array, unsigned int length, unsigned int init_pos ) { unsigned int i; unsigned int j; int aux; for(i = 0; i < length; i++) { for(j = init_pos; j < (init_pos + length - i - 1); j++) { if(array[j] > array[j + 1]) { aux = array[j + 1]; array[j + 1] = array[j]; array[j] = aux; } } } } // end bubbleSort() /////////////////////////////////// static void merge( const int * src, // source array int * dst, // destination array int length, // number of items in a subset int init_pos_src_a, // index first item in src subset A int init_pos_src_b, // index first item in src subset B int init_pos_dst ) // index first item in destination { int i; int j; int k; i = 0; j = 0; k = init_pos_dst; while((i < length) || (j < length)) { if((i < length) && (j < length)) { if(src[init_pos_src_a + i] < src[init_pos_src_b + j]) { dst[k++] = src[init_pos_src_a + i]; i++; } else { dst[k++] = src[init_pos_src_b + j]; j++; } } else if(i < length) { dst[k++] = src[init_pos_src_a + i]; i++; } else { dst[k++] = src[init_pos_src_b + j]; j++; } } } // end merge() ////////////////////////////// void sort( sort_args_t * ptr ) { unsigned int i; int * src_array = NULL; int * dst_array = NULL; // get arguments unsigned int tid = ptr->tid; unsigned int threads = ptr->threads; pthread_barrier_t * parent_barrier = ptr->parent_barrier; unsigned int items = ARRAY_LENGTH / threads; unsigned int stages = __builtin_ctz( threads ) + 1; #if DEBUG_SORT printf("\n[sort] start : ptr %x / tid %d / threads %d / barrier %x\n", ptr, tid, threads, parent_barrier ); #endif bubbleSort( array0, items, items * tid ); #if DEBUG_SORT printf("\n[sort] thread[%d] : stage 0 completed\n", tid ); #endif ///////////////////////////////// pthread_barrier_wait( &barrier ); #if DEBUG_SORT printf("\n[sort] thread[%d] exit barrier 0\n", tid ); #endif // the number of threads contributing to sort is divided by 2 // and the number of items is multiplied by 2 at each next stage for ( i = 1 ; i < stages ; i++ ) { if((i % 2) == 1) // odd stage { src_array = array0; dst_array = array1; } else // even stage { src_array = array1; dst_array = array0; } if( (tid & ((1< y_size) ? x_size : y_size; unsigned int root_level = (z == 1) ? 0 : (z == 2) ? 1 : (z == 4) ? 2 : (z == 8) ? 3 : 4; // checks number of threads if ( (total_threads != 1) && (total_threads != 2) && (total_threads != 4) && (total_threads != 8) && (total_threads != 16 ) && (total_threads != 32) && (total_threads != 64) && (total_threads != 128) && (total_threads != 256) && (total_threads != 512) && (total_threads != 1024) ) { printf("\n[sort] ERROR : number of cores must be power of 2\n"); exit( 0 ); } // check array size if ( ARRAY_LENGTH % total_threads) { printf("\n[sort] ERROR : array size must be multiple of number of threads\n"); exit( 0 ); } printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n", total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle ); // initialize barrier if( USE_DQT_BARRIER ) { barrier_attr.x_size = x_size; barrier_attr.y_size = y_size; barrier_attr.nthreads = ncores; error = pthread_barrier_init( &barrier, &barrier_attr , total_threads ); } else // use SIMPLE_BARRIER { error = pthread_barrier_init( &barrier, NULL , total_threads ); } if( error ) { printf("\n[sort] ERROR : cannot initialise barrier\n" ); exit( 0 ); } #if DEBUG_MAIN if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n"); else printf("\n[sort] main completes simple barrier init\n"); #endif // Array to sort initialization for ( n = 0 ; n < ARRAY_LENGTH ; n++ ) { array0[n] = ARRAY_LENGTH - n - 1; } #if DISPLAY_ARRAY printf("\n*** array before sort\n"); for( n=0; n threads for (x = 0 ; x < x_size ; x++) { for (y = 0 ; y < y_size ; y++) { // compute cluster identifier cxy = HAL_CXY_FROM_XY( x , y ); for ( lid = 0 ; lid < ncores ; lid++ ) { // compute thread continuous index tid = (((x * y_size) + y) * ncores) + lid; // initialize 2D array of arguments sort_args[cxy][lid].tid = tid; sort_args[cxy][lid].threads = total_threads; sort_args[cxy][lid].parent_barrier = &parent_barriers[cxy]; // initialize 2D array of pointers sort_ptrs[cxy][lid] = &sort_args[cxy][lid]; } } } /////////////////////////// get_cycle( &seq_end_cycle ); #if DEBUG_MAIN printf("\n[sort] main completes sequencial init at cycle %d\n", (unsigned int)seq_end_cycle ); #endif // create and execute the working threads if( pthread_parallel_create( root_level, &sort, &sort_ptrs[0][0], &parent_barriers[0] ) ) { printf("\n[sort] ERROR : cannot create threads\n"); exit( 0 ); } //////////////////////////// get_cycle( ¶_end_cycle ); #if DEBUG_main printf("\n[sort] main completes parallel sort at cycle %d\n", (unsigned int)para_end_cycle ); #endif // destroy barrier pthread_barrier_destroy( &barrier ); #if DISPLAY_ARRAY printf("\n*** array after merge %d\n", i ); for( n=0; n res_array[n+1] ) { printf("\n[sort] array[%d] = %d > array[%d] = %d\n", n , res_array[n] , n+1 , res_array[n+1] ); success = 0; break; } } if ( success ) printf("\n[sort] success\n"); else printf("\n[sort] failure\n"); #endif #if INSTRUMENTATION char name[64]; char path[128]; unsigned long long instru_cycle; // build file name if( USE_DQT_BARRIER ) snprintf( name , 64 , "p_sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores ); else snprintf( name , 64 , "p_sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores ); // build file pathname snprintf( path , 128 , "home/%s" , name ); // compute results unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle); unsigned int parallel = (unsigned int)(para_end_cycle - seq_end_cycle); // display results on process terminal printf("\n----- %s -----\n" " - sequencial : %d cycles\n" " - parallel : %d cycles\n", name, sequencial, parallel ); // open file get_cycle( &instru_cycle ); FILE * stream = fopen( path , NULL ); if( stream == NULL ) { printf("\n[sort] ERROR : cannot open instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle ); #if IDBG idbg(); #endif // register results to file get_cycle( &instru_cycle ); int ret = fprintf( stream , "\n----- %s -----\n" " - sequencial : %d cycles\n" " - parallel : %d cycles\n", name, sequencial, parallel ); if( ret < 0 ) { printf("\n[sort] ERROR : cannot write to instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle ); #if IDBG idbg(); #endif // close instrumentation file get_cycle( &instru_cycle ); ret = fclose( stream ); if( ret ) { printf("\n[sort] ERROR : cannot close instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle ); #endif exit( 0 ); } // end main() /* vim: tabstop=4 : shiftwidth=4 : expandtab */