source: trunk/user/sort/sort.c @ 637

Last change on this file since 637 was 637, checked in by alain, 5 years ago

Introduce the non-standard pthread_parallel_create() system call
and re-write the <fft> and <sort> applications to improve the
intrinsic paralelism in applications.

  • Property svn:executable set to *
File size: 16.2 KB
RevLine 
[635]1/*
2 * sort.c - Parallel sort
3 *
4 * Author     Cesar Fuguet Tortolero (2013)
5 *            Alain Greiner (2019)
6 *
7 * Copyright (c) UPMC Sorbonne Universites
8 *
9 * This is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; version 2.0 of the License.
12 *
13 * It is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
[417]23///////////////////////////////////////////////////////////////////////////////
[635]24// This multi-threaded application implement a multi-stage sort.
25// It has been writen by Cesar Fuget Tortolero in 2013.
26// It has been ported on ALMOS-MKH by Alain Greiner in 2019.
27//
[417]28// There is one thread per physical cores.
29// Computation is organised as a binary tree:
30// - All threads execute in parallel a buble sort on a sub-array during the
31//   the first stage of parallel sort,
32// - The number of participating threads is divided by 2 at each next stage,
33//   to make a merge sort, on two subsets of previous stage.
34//
35//       Number_of_stages = number of barriers = log2(Number_of_threads)
36//
[635]37// The various stages are separated by synchronisation barriers, and the
38// main thread uses the join syscall to check that all threads completed
39// before printing the computation time (sequencial & parallel).
40// These results can be - optionnaly - registered in an instrumentation file.
41//
[417]42// Constraints :
43// - It supports up to 1024 cores: x_size, y_size, and ncores must be
44//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
45// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
46//   larger than the number of cores.
47///////////////////////////////////////////////////////////////////////////////
48
49#include <stdio.h>
50#include <stdlib.h>
[596]51#include <unistd.h>
[417]52#include <pthread.h>
[445]53#include <almosmkh.h>
[459]54#include <hal_macros.h>
[417]55
[637]56#define ARRAY_LENGTH        2048            // number of items
57#define MAX_THREADS         1024            // 16 * 16 * 4
[440]58
[637]59#define X_MAX               16              // max number of clusters in a row
60#define Y_MAX               16              // max number of clusters in a column
61#define CORES_MAX           4               // max number of cores in a cluster
62#define CLUSTERS_MAX        X_MAX * Y_MAX
[623]63
[637]64#define USE_DQT_BARRIER     1               // use DQT barrier if non zero
65#define DISPLAY_ARRAY       0               // display items values before and after
66#define DEBUG_MAIN          0               // trace main function
67#define DEBUG_SORT          0               // trace sort function
68#define CHECK_RESULT        0               // for debug
69#define INSTRUMENTATION     1               // register computation times on file
[417]70
[637]71///////////////////////////////////////////////////////////////////////////////////
72//            Arguments for the sort() function
73///////////////////////////////////////////////////////////////////////////////////
74
[417]75typedef struct
76{
[637]77    unsigned int        tid;                // continuous thread index
78    unsigned int        threads;            // total number of threads
79    pthread_barrier_t * parent_barrier;     // pointer on termination barrier
[417]80}
[637]81sort_args_t;
[417]82
[637]83////////////////////////////////////////////////////////////////////////////////////
84//            Sort specific global variables
85////////////////////////////////////////////////////////////////////////////////////
[417]86
87int                 array0[ARRAY_LENGTH];    // values to sort
88int                 array1[ARRAY_LENGTH];   
89
90pthread_barrier_t   barrier;                 // synchronisation variables
91
[637]92/////////////////////////////////////////////////////////////////////////////////////
93//             Global variables required by parallel_pthread_create()
94/////////////////////////////////////////////////////////////////////////////////////
[417]95
[637]96// 2D arrays of input arguments for the <sort> threads
97// These arrays are initialised by the application main thread
98
99sort_args_t       sort_args[CLUSTERS_MAX][CORES_MAX];  // sort function arguments
100sort_args_t     * sort_ptrs[CLUSTERS_MAX][CORES_MAX];  // pointers on arguments
101
102// 1D array of barriers to allow the <sort> threads to signal termination
103// this array is initialised by the pthread_parallel_create() function
104 
105pthread_barrier_t parent_barriers[CLUSTERS_MAX];       // termination barrier
106
107
[417]108////////////////////////////////////
[588]109static void bubbleSort( int * array,
110                        unsigned int length,
111                        unsigned int init_pos )
[417]112{
[473]113    unsigned int i;
114    unsigned int j;
115    int          aux;
[417]116
117    for(i = 0; i < length; i++)
118    {
119        for(j = init_pos; j < (init_pos + length - i - 1); j++)
120        {
121            if(array[j] > array[j + 1])
122            {
123                aux          = array[j + 1];
124                array[j + 1] = array[j];
125                array[j]     = aux;
126            }
127        }
128    }
129}  // end bubbleSort()
130
131
[588]132///////////////////////////////////
[623]133static void merge( const int * src,               // source array
134                   int       * dst,               // destination array
135                   int         length,            // number of items in a subset
136                   int         init_pos_src_a,    // index first item in src subset A
137                   int         init_pos_src_b,    // index first item in src subset B
138                   int         init_pos_dst )     // index first item in destination
[417]139{
140    int i;
141    int j;
142    int k;
143
144    i = 0;
145    j = 0;
146    k = init_pos_dst;
147
148    while((i < length) || (j < length))
149    {
150        if((i < length) && (j < length))
151        {
152            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
153            {
154                dst[k++] = src[init_pos_src_a + i];
155                i++;
156            }
157            else
158            {
159                dst[k++] = src[init_pos_src_b + j];
160                j++;
161            }
162        }
163        else if(i < length)
164        {
165            dst[k++] = src[init_pos_src_a + i];
166            i++;
167        }
168        else
169        {
170            dst[k++] = src[init_pos_src_b + j];
171            j++;
172        }
173    }
174}  // end merge()
175
[637]176//////////////////////////////
177void sort( sort_args_t * ptr )
[417]178{
[637]179    unsigned int        i;
180    int               * src_array  = NULL;
181    int               * dst_array  = NULL;
[417]182
[637]183    // get arguments
184    unsigned int        tid            = ptr->tid;
185    unsigned int        threads        = ptr->threads;
186    pthread_barrier_t * parent_barrier = ptr->parent_barrier;
[417]187
[637]188    unsigned int        items      = ARRAY_LENGTH / threads;
189    unsigned int        stages     = __builtin_ctz( threads ) + 1;
[440]190
[627]191#if DEBUG_SORT
[637]192printf("\n[sort] start : ptr %x / tid %d / threads %d / barrier %x\n",
193ptr, tid, threads, parent_barrier );
[623]194#endif
195
[637]196    bubbleSort( array0, items, items * tid );
[417]197
[627]198#if DEBUG_SORT
[637]199printf("\n[sort] thread[%d] : stage 0 completed\n", tid );
[623]200#endif
[442]201
[417]202    /////////////////////////////////
203    pthread_barrier_wait( &barrier ); 
[440]204
[627]205#if DEBUG_SORT
[637]206printf("\n[sort] thread[%d] exit barrier 0\n", tid );
[623]207#endif
208
209    // the number of threads contributing to sort is divided by 2
210    // and the number of items is multiplied by 2 at each next stage
[417]211    for ( i = 1 ; i < stages ; i++ )
212    {
[623]213        if((i % 2) == 1)               // odd stage
214        {
215            src_array = array0;
216            dst_array = array1;
217        }
218        else                           // even stage
219        {
220            src_array = array1;
221            dst_array = array0;
222        }
[417]223
[637]224        if( (tid & ((1<<i)-1)) == 0 )
[417]225        {
226
[627]227#if DEBUG_SORT
[637]228printf("\n[sort] thread[%d] : stage %d start\n", tid , i );
[623]229#endif
[417]230            merge( src_array, 
231                   dst_array,
[623]232                   items << (i-1),
[637]233                   items * tid,
234                   items * (tid + (1 << (i-1))),
235                   items * tid );
[417]236
[627]237#if DEBUG_SORT
[637]238printf("\n[sort] thread[%d] : stage %d completed\n", tid , i );
[623]239#endif
[417]240        }
241
242        /////////////////////////////////
243        pthread_barrier_wait( &barrier );
244
[627]245#if DEBUG_SORT
[637]246printf("\n[sort] thread[%d] exit barrier %d\n", tid , i );
[623]247#endif
[417]248
[637]249    }  // en for stages
250
251    // sort thread signal completion to main thread
252    pthread_barrier_wait( parent_barrier );
253
254#if DEBUG_SORT
255printf("\n[sort] thread[%d] exit\n", tid );
[623]256#endif
257
[637]258    // sort thread exit
259    pthread_exit( NULL );
[623]260
[417]261} // end sort()
262
263
[574]264/////////////////
[475]265void main( void )
[417]266{
[619]267    int                    error;
[417]268    unsigned int           x_size;             // number of rows
269    unsigned int           y_size;             // number of columns
270    unsigned int           ncores;             // number of cores per cluster
[619]271    unsigned int           total_threads;      // total number of threads
[637]272    unsigned int           x;                  // X coordinate for a sort thread
273    unsigned int           y;                  // Y coordinate for a sort thread
274    unsigned int           cxy;                // cluster identifier for a sort thead
[417]275    unsigned int           lid;                // core local index for a thread
[637]276    unsigned int           tid;                // sort thread continuous index
277    pthread_barrierattr_t  barrier_attr;       // barrier attributes (used for DQT)
[417]278    unsigned int           n;                  // index in array to sort
279
[623]280    unsigned long long     start_cycle;
281    unsigned long long     seq_end_cycle;
282    unsigned long long     para_end_cycle;
283
284    /////////////////////////
285    get_cycle( &start_cycle );
286 
[596]287    // compute number of threads (one thread per core)
[417]288    get_config( &x_size , &y_size , &ncores );
[619]289    total_threads = x_size * y_size * ncores;
[417]290
[637]291    // compute covering DQT size an level
292    unsigned int z = (x_size > y_size) ? x_size : y_size;
293    unsigned int root_level = (z == 1) ? 0 : (z == 2) ? 1 : (z == 4) ? 2 : (z == 8) ? 3 : 4;
[417]294
295    // checks number of threads
[619]296    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
297         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
298         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
299         (total_threads != 512) && (total_threads != 1024) )
[417]300    {
[637]301        printf("\n[sort] ERROR : number of cores must be power of 2\n");
[434]302        exit( 0 );
[417]303    }
304
305    // check array size
[619]306    if ( ARRAY_LENGTH % total_threads) 
[417]307    {
[637]308        printf("\n[sort] ERROR : array size must be multiple of number of threads\n");
[434]309        exit( 0 );
[417]310    }
311
[624]312    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
[623]313    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
[417]314
[619]315    // initialize barrier
316    if( USE_DQT_BARRIER )
[417]317    {
[619]318        barrier_attr.x_size   = x_size; 
319        barrier_attr.y_size   = y_size;
320        barrier_attr.nthreads = ncores;
321        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
322    }
323    else // use SIMPLE_BARRIER
324    {
325        error = pthread_barrier_init( &barrier, NULL , total_threads );
326    }
327
328    if( error )
329    {
[637]330        printf("\n[sort] ERROR : cannot initialise barrier\n" );
[434]331        exit( 0 );
[417]332    }
333
[627]334#if DEBUG_MAIN
[628]335if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
336else                  printf("\n[sort] main completes simple barrier init\n");
[623]337#endif
[417]338
339    // Array to sort initialization
340    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
341    {
[596]342        array0[n] = ARRAY_LENGTH - n - 1;
[417]343    }
344
[637]345#if DISPLAY_ARRAY
346    printf("\n*** array before sort\n");
347    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
348#endif
349
[627]350#if DEBUG_MAIN
[623]351printf("\n[sort] main completes array init\n");
[417]352#endif
353
[637]354    // build array of arguments for the <sort> threads
355    for (x = 0 ; x < x_size ; x++)
[417]356    {
[637]357        for (y = 0 ; y < y_size ; y++)
[417]358        {
[637]359            // compute cluster identifier
360            cxy = HAL_CXY_FROM_XY( x , y );
361
[635]362            for ( lid = 0 ; lid < ncores ; lid++ )
[417]363            {
[637]364                // compute thread continuous index
365                tid = (((x * y_size) + y) * ncores) + lid;
[417]366
[637]367                // initialize 2D array of arguments
368                sort_args[cxy][lid].tid            = tid;
369                sort_args[cxy][lid].threads        = total_threads;
370                sort_args[cxy][lid].parent_barrier = &parent_barriers[cxy];
[417]371
[637]372                // initialize 2D array of pointers
373                sort_ptrs[cxy][lid] = &sort_args[cxy][lid];
[417]374            }
375        }
376    }
[637]377
[623]378    ///////////////////////////
379    get_cycle( &seq_end_cycle );
[417]380
[627]381#if DEBUG_MAIN
[623]382printf("\n[sort] main completes sequencial init at cycle %d\n",
383(unsigned int)seq_end_cycle );
384#endif
385
[637]386    // create and execute the working threads
387    if( pthread_parallel_create( root_level,
388                                 &sort,
389                                 &sort_ptrs[0][0],
390                                 &parent_barriers[0] ) )
[635]391    {
[637]392        printf("\n[sort] ERROR : cannot create threads\n");
393        exit( 0 );
[635]394    }
395
[623]396    ////////////////////////////
397    get_cycle( &para_end_cycle );
[619]398
[637]399#if DEBUG_main
400printf("\n[sort] main completes parallel sort at cycle %d\n", 
401(unsigned int)para_end_cycle );
402#endif
[623]403
[619]404    // destroy barrier
405    pthread_barrier_destroy( &barrier );
406
[637]407#if DISPLAY_ARRAY
408    printf("\n*** array after merge %d\n", i );
409    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
410#endif
411
[625]412#if CHECK_RESULT
413    int    success = 1;
414    int *  res_array = ( (total_threads ==   2) ||
415                         (total_threads ==   8) || 
416                         (total_threads ==  32) || 
417                         (total_threads == 128) || 
418                         (total_threads == 512) ) ? array1 : array0;
[623]419
[625]420    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]421    {
[625]422        if ( res_array[n] > res_array[n+1] )
423        {
424            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
425            n , res_array[n] , n+1 , res_array[n+1] );
426            success = 0;
427            break;
428        }
[417]429    }
430
[625]431    if ( success ) printf("\n[sort] success\n");
432    else           printf("\n[sort] failure\n");
[417]433#endif
434
[623]435#if INSTRUMENTATION
[626]436    char               name[64];
437    char               path[128];
438    unsigned long long instru_cycle;
[417]439
[629]440    // build file name
441    if( USE_DQT_BARRIER )
[637]442    snprintf( name , 64 , "p_sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
[629]443    else
[637]444    snprintf( name , 64 , "p_sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
[623]445
[625]446    // build file pathname
447    snprintf( path , 128 , "home/%s" , name );
[623]448
[625]449    // compute results
450    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
451    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
[623]452
[625]453    // display results on process terminal
454    printf("\n----- %s -----\n"
455           " - sequencial : %d cycles\n"
456           " - parallel   : %d cycles\n", 
457           name, sequencial, parallel );
[623]458
[625]459    // open file
[626]460    get_cycle( &instru_cycle );
[625]461    FILE * stream = fopen( path , NULL );
[626]462
[625]463    if( stream == NULL )
464    {
[637]465        printf("\n[sort] ERROR : cannot open instrumentation file <%s>\n", path );
[625]466        exit(0);
467    }
[623]468
[626]469    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
[625]470
[626]471#if IDBG
472idbg();
473#endif
474
[625]475    // register results to file
[626]476    get_cycle( &instru_cycle );
[625]477    int ret = fprintf( stream , "\n----- %s -----\n"
478                                " - sequencial : %d cycles\n"
479                                " - parallel   : %d cycles\n", name, sequencial, parallel );
480    if( ret < 0 )
481    {
[637]482        printf("\n[sort] ERROR : cannot write to instrumentation file <%s>\n", path );
[625]483        exit(0);
484    }
485
[626]486    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
[625]487
[626]488#if IDBG
489idbg();
490#endif
491
[625]492    // close instrumentation file
[626]493    get_cycle( &instru_cycle );
494    ret = fclose( stream );
[625]495
[626]496    if( ret )
[625]497    {
[637]498        printf("\n[sort] ERROR : cannot close instrumentation file <%s>\n", path );
[625]499        exit(0);
500    }
501
[626]502    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
[625]503
[619]504#endif
505
506    exit( 0 );
507
[417]508}  // end main()
509
510/*
511vim: tabstop=4 : shiftwidth=4 : expandtab
512*/
Note: See TracBrowser for help on using the repository browser.