/* * sort.c - Parallel sort * * Author Cesar Fuguet Tortolero (2013) * Alain Greiner (2019) * * Copyright (c) UPMC Sorbonne Universites * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by * the Free Software Foundation; version 2.0 of the License. * * It is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ALMOS-MKH; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /////////////////////////////////////////////////////////////////////////////// // This multi-threaded application implement a multi-stage sort. // It has been writen by Cesar Fuget Tortolero in 2013. // It has been ported on ALMOS-MKH by Alain Greiner in 2019. // // There is one thread per physical cores. // Computation is organised as a binary tree: // - All threads execute in parallel a buble sort on a sub-array during the // the first stage of parallel sort, // - The number of participating threads is divided by 2 at each next stage, // to make a merge sort, on two subsets of previous stage. // // Number_of_stages = number of barriers = log2(Number_of_threads) // // The various stages are separated by synchronisation barriers, and the // main thread uses the join syscall to check that all threads completed // before printing the computation time (sequencial & parallel). // These results can be - optionnaly - registered in an instrumentation file. // // Constraints : // - It supports up to 1024 cores: x_size, y_size, and ncores must be // power of 2 (max 16*16 clusters / max 4 cores per cluster) // _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2 // larger than the number of cores. /////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #define ARRAY_LENGTH 2048 // number of items #define MAX_THREADS 1024 // 16 * 16 * 4 #define USE_DQT_BARRIER 1 // use DQT barrier if non zero #define DISPLAY_ARRAY 0 // display items values before and after #define DEBUG_MAIN 0 // trace main function #define DEBUG_SORT 0 // trace sort function #define CHECK_RESULT 0 // for debug #define INSTRUMENTATION 1 // register computation times on file ///////////////////////////////////////////////////////////// // argument for the sort() function (one thread per core) ///////////////////////////////////////////////////////////// typedef struct { unsigned int threads; // total number of threads unsigned int thread_uid; // thread user index (0 to threads -1) unsigned int main_uid; // main thread user index } args_t; ////////////////////////////////////////// // Global variables ////////////////////////////////////////// int array0[ARRAY_LENGTH]; // values to sort int array1[ARRAY_LENGTH]; pthread_barrier_t barrier; // synchronisation variables pthread_t trdid[MAX_THREADS]; // kernel identifiers pthread_attr_t attr[MAX_THREADS]; // thread attributes args_t arg[MAX_THREADS]; // sort function arguments //////////////////////////////////// static void bubbleSort( int * array, unsigned int length, unsigned int init_pos ) { unsigned int i; unsigned int j; int aux; for(i = 0; i < length; i++) { for(j = init_pos; j < (init_pos + length - i - 1); j++) { if(array[j] > array[j + 1]) { aux = array[j + 1]; array[j + 1] = array[j]; array[j] = aux; } } } } // end bubbleSort() /////////////////////////////////// static void merge( const int * src, // source array int * dst, // destination array int length, // number of items in a subset int init_pos_src_a, // index first item in src subset A int init_pos_src_b, // index first item in src subset B int init_pos_dst ) // index first item in destination { int i; int j; int k; i = 0; j = 0; k = init_pos_dst; while((i < length) || (j < length)) { if((i < length) && (j < length)) { if(src[init_pos_src_a + i] < src[init_pos_src_b + j]) { dst[k++] = src[init_pos_src_a + i]; i++; } else { dst[k++] = src[init_pos_src_b + j]; j++; } } else if(i < length) { dst[k++] = src[init_pos_src_a + i]; i++; } else { dst[k++] = src[init_pos_src_b + j]; j++; } } } // end merge() ////////////////////////////////////// static void sort( const args_t * ptr ) { unsigned int i; unsigned long long cycle; unsigned int cxy; unsigned int lid; int * src_array = NULL; int * dst_array = NULL; // get core coordinates an date get_core( &cxy , &lid ); get_cycle( &cycle ); unsigned int thread_uid = ptr->thread_uid; unsigned int threads = ptr->threads; unsigned int main_uid = ptr->main_uid; #if DISPLAY_ARRAY unsigned int n; if( thread_uid == main_uid ) { printf("\n*** array before sort\n"); for( n=0; n res_array[n+1] ) { printf("\n[sort] array[%d] = %d > array[%d] = %d\n", n , res_array[n] , n+1 , res_array[n+1] ); success = 0; break; } } if ( success ) printf("\n[sort] success\n"); else printf("\n[sort] failure\n"); #endif #if INSTRUMENTATION char name[64]; char path[128]; unsigned long long instru_cycle; // build file name if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores ); else snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores ); // build file pathname snprintf( path , 128 , "home/%s" , name ); // compute results unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle); unsigned int parallel = (unsigned int)(para_end_cycle - seq_end_cycle); // display results on process terminal printf("\n----- %s -----\n" " - sequencial : %d cycles\n" " - parallel : %d cycles\n", name, sequencial, parallel ); // open file get_cycle( &instru_cycle ); FILE * stream = fopen( path , NULL ); if( stream == NULL ) { printf("\n[sort error] cannot open instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle ); #if IDBG idbg(); #endif // register results to file get_cycle( &instru_cycle ); int ret = fprintf( stream , "\n----- %s -----\n" " - sequencial : %d cycles\n" " - parallel : %d cycles\n", name, sequencial, parallel ); if( ret < 0 ) { printf("\n[sort error] cannot write to instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle ); #if IDBG idbg(); #endif // close instrumentation file get_cycle( &instru_cycle ); ret = fclose( stream ); if( ret ) { printf("\n[sort error] cannot close instrumentation file <%s>\n", path ); exit(0); } printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle ); #endif exit( 0 ); } // end main() /* vim: tabstop=4 : shiftwidth=4 : expandtab */