source: trunk/user/sort/sort.c @ 629

Last change on this file since 629 was 629, checked in by alain, 5 years ago

Remove the "giant" rwlock protecting the GPT, and
use the GPT_LOCKED attribute in each PTE to prevent
concurrent modifications of one GPT entry.
The version number has been incremented to 2.1.

  • Property svn:executable set to *
File size: 16.1 KB
RevLine 
[417]1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
[596]26#include <unistd.h>
[417]27#include <pthread.h>
[445]28#include <almosmkh.h>
[459]29#include <hal_macros.h>
[417]30
[629]31#define ARRAY_LENGTH        4096       // number of items
[588]32#define MAX_THREADS         1024       // 16 * 16 * 4
[440]33
[623]34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
[627]36#define DEBUG_MAIN          1          // trace main function
[628]37#define DEBUG_SORT          1          // trace sort function
[627]38#define INTERACTIVE_MODE    0          // activate idbg() during instrumentation
[623]39#define CHECK_RESULT        0          // for debug
40#define INSTRUMENTATION     1          // register computation times on file
[626]41#define IDBG                0          // activate interactive debug in main
[623]42
[417]43/////////////////////////////////////////////////////////////
44// argument for the sort() function (one thread per core)
45/////////////////////////////////////////////////////////////
46
47typedef struct
48{
[619]49    unsigned int threads;       // total number of threads
[417]50    unsigned int thread_uid;    // thread user index (0 to threads -1)
51    unsigned int main_uid;      // main thread user index
52}
53args_t;
54
55//////////////////////////////////////////
56//      Global variables
57//////////////////////////////////////////
58
59int                 array0[ARRAY_LENGTH];    // values to sort
60int                 array1[ARRAY_LENGTH];   
61
62pthread_barrier_t   barrier;                 // synchronisation variables
63
[440]64pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
65args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
[417]66
67////////////////////////////////////
[588]68static void bubbleSort( int * array,
69                        unsigned int length,
70                        unsigned int init_pos )
[417]71{
[473]72    unsigned int i;
73    unsigned int j;
74    int          aux;
[417]75
76    for(i = 0; i < length; i++)
77    {
78        for(j = init_pos; j < (init_pos + length - i - 1); j++)
79        {
80            if(array[j] > array[j + 1])
81            {
82                aux          = array[j + 1];
83                array[j + 1] = array[j];
84                array[j]     = aux;
85            }
86        }
87    }
88}  // end bubbleSort()
89
90
[588]91///////////////////////////////////
[623]92static void merge( const int * src,               // source array
93                   int       * dst,               // destination array
94                   int         length,            // number of items in a subset
95                   int         init_pos_src_a,    // index first item in src subset A
96                   int         init_pos_src_b,    // index first item in src subset B
97                   int         init_pos_dst )     // index first item in destination
[417]98{
99    int i;
100    int j;
101    int k;
102
103    i = 0;
104    j = 0;
105    k = init_pos_dst;
106
107    while((i < length) || (j < length))
108    {
109        if((i < length) && (j < length))
110        {
111            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
112            {
113                dst[k++] = src[init_pos_src_a + i];
114                i++;
115            }
116            else
117            {
118                dst[k++] = src[init_pos_src_b + j];
119                j++;
120            }
121        }
122        else if(i < length)
123        {
124            dst[k++] = src[init_pos_src_a + i];
125            i++;
126        }
127        else
128        {
129            dst[k++] = src[init_pos_src_b + j];
130            j++;
131        }
132    }
133}  // end merge()
134
[588]135//////////////////////////////////////
[504]136static void sort( const args_t * ptr )
[417]137{
138    unsigned int       i;
139    unsigned long long cycle;
[440]140    unsigned int       cxy;
141    unsigned int       lid;
[417]142
[623]143    int              * src_array  = NULL;
144    int              * dst_array  = NULL;
[417]145
[440]146    // get core coordinates an date
147    get_core( &cxy , &lid );
148    get_cycle( &cycle );
149
[417]150    unsigned int  thread_uid = ptr->thread_uid;
151    unsigned int  threads    = ptr->threads;
152    unsigned int  main_uid   = ptr->main_uid;
153
[623]154#if DISPLAY_ARRAY
155unsigned int n;
156if( thread_uid == main_uid )
157{
158    printf("\n*** array before sort\n");
159    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
160}
161#endif
162
163    /////////////////////////////////
164    pthread_barrier_wait( &barrier ); 
165
[627]166#if DEBUG_SORT
167if( thread_uid == 0 )
[623]168printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
169#endif
170
[417]171    unsigned int  items      = ARRAY_LENGTH / threads;
172    unsigned int  stages     = __builtin_ctz( threads ) + 1;
173
[627]174#if DEBUG_SORT
175if( thread_uid == 0 )
[623]176printf("\n[sort] thread[%d] : start\n", thread_uid );
177#endif
[442]178
[417]179    bubbleSort( array0, items, items * thread_uid );
180
[627]181#if DEBUG_SORT
182if( thread_uid == 0 )
[623]183printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
184#endif
[417]185
186    /////////////////////////////////
187    pthread_barrier_wait( &barrier ); 
[440]188
[627]189#if DEBUG_SORT
190if( thread_uid == 0 )
[623]191printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
192#endif
193
194#if DISPLAY_ARRAY
195if( thread_uid == main_uid )
196{
197    printf("\n*** array after bubble sort\n");
198    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
199}
200#endif
201
202    // the number of threads contributing to sort is divided by 2
203    // and the number of items is multiplied by 2 at each next stage
[417]204    for ( i = 1 ; i < stages ; i++ )
205    {
[623]206        if((i % 2) == 1)               // odd stage
207        {
208            src_array = array0;
209            dst_array = array1;
210        }
211        else                           // even stage
212        {
213            src_array = array1;
214            dst_array = array0;
215        }
[417]216
217        if( (thread_uid & ((1<<i)-1)) == 0 )
218        {
219
[627]220#if DEBUG_SORT
221if( thread_uid == 0 )
[623]222printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
223#endif
[417]224            merge( src_array, 
225                   dst_array,
[623]226                   items << (i-1),
[417]227                   items * thread_uid,
228                   items * (thread_uid + (1 << (i-1))),
229                   items * thread_uid );
230
[627]231#if DEBUG_SORT
232if( thread_uid == 0 )
[623]233printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
234#endif
[417]235        }
236
237        /////////////////////////////////
238        pthread_barrier_wait( &barrier );
239
[627]240#if DEBUG_SORT
241if( thread_uid == 0 )
[623]242printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
243#endif
[417]244
[623]245#if DISPLAY_ARRAY
246if( thread_uid == main_uid )
247{
248    printf("\n*** array after merge %d\n", i );
249    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
250}
251#endif
252
253    }  // en for stages
254
[417]255    // all threads but the main thread exit
256    if( thread_uid != main_uid ) pthread_exit( NULL );
257
258} // end sort()
259
260
[574]261/////////////////
[475]262void main( void )
[417]263{
[619]264    int                    error;
[417]265    unsigned int           x_size;             // number of rows
266    unsigned int           y_size;             // number of columns
267    unsigned int           ncores;             // number of cores per cluster
[619]268    unsigned int           total_threads;      // total number of threads
[417]269    unsigned int           thread_uid;         // user defined thread index
270    unsigned int           main_cxy;           // cluster identifier for main
271    unsigned int           main_x;             // X coordinate for main thread
272    unsigned int           main_y;             // Y coordinate for main thread
273    unsigned int           main_lid;           // core local index for main thread
274    unsigned int           main_uid;           // thread user index for main thread
275    unsigned int           x;                  // X coordinate for a thread
276    unsigned int           y;                  // Y coordinate for a thread
277    unsigned int           lid;                // core local index for a thread
278    unsigned int           n;                  // index in array to sort
279    pthread_t              trdid;              // kernel allocated thread index (unused)
280    pthread_barrierattr_t  barrier_attr;       // barrier attributes
281
[623]282    unsigned long long     start_cycle;
283    unsigned long long     seq_end_cycle;
284    unsigned long long     para_end_cycle;
285
286    /////////////////////////
287    get_cycle( &start_cycle );
288 
[596]289    // compute number of threads (one thread per core)
[417]290    get_config( &x_size , &y_size , &ncores );
[619]291    total_threads = x_size * y_size * ncores;
[417]292
293    // get core coordinates and user index for the main thread
294    get_core( &main_cxy , & main_lid );
[459]295    main_x   = HAL_X_FROM_CXY( main_cxy );
296    main_y   = HAL_Y_FROM_CXY( main_cxy );
[417]297    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
298
299    // checks number of threads
[619]300    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
301         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
302         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
303         (total_threads != 512) && (total_threads != 1024) )
[417]304    {
[623]305        printf("\n[sort error] number of cores must be power of 2\n");
[434]306        exit( 0 );
[417]307    }
308
309    // check array size
[619]310    if ( ARRAY_LENGTH % total_threads) 
[417]311    {
[623]312        printf("\n[sort error] array size must be multiple of number of threads\n");
[434]313        exit( 0 );
[417]314    }
315
[624]316    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
[623]317    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
[417]318
[619]319    // initialize barrier
320    if( USE_DQT_BARRIER )
[417]321    {
[619]322        barrier_attr.x_size   = x_size; 
323        barrier_attr.y_size   = y_size;
324        barrier_attr.nthreads = ncores;
325        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
326    }
327    else // use SIMPLE_BARRIER
328    {
329        error = pthread_barrier_init( &barrier, NULL , total_threads );
330    }
331
332    if( error )
333    {
[623]334        printf("\n[sort error] cannot initialise barrier\n" );
[434]335        exit( 0 );
[417]336    }
337
[627]338#if DEBUG_MAIN
[628]339if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
340else                  printf("\n[sort] main completes simple barrier init\n");
[623]341#endif
[417]342
343    // Array to sort initialization
344    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
345    {
[596]346        array0[n] = ARRAY_LENGTH - n - 1;
[417]347    }
348
[627]349#if DEBUG_MAIN
[623]350printf("\n[sort] main completes array init\n");
[417]351#endif
352
353    // launch other threads to execute sort() function
354    // on cores other than the core running the main thread
355    for ( x=0 ; x<x_size ; x++ )
356    {
357        for ( y=0 ; y<y_size ; y++ )
358        {
359            for ( lid=0 ; lid<ncores ; lid++ )
360            {
361                thread_uid = (((x * y_size) + y) * ncores) + lid;
362
363                // set sort arguments for all threads
[619]364                arg[thread_uid].threads      = total_threads;
[417]365                arg[thread_uid].thread_uid   = thread_uid;
366                arg[thread_uid].main_uid     = main_uid;
367
368                // set thread attributes for all threads
[574]369                attr[thread_uid].attributes = PT_ATTR_DETACH          |
370                                              PT_ATTR_CLUSTER_DEFINED | 
371                                              PT_ATTR_CORE_DEFINED;
[459]372                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
[417]373                attr[thread_uid].lid        = lid;
374
375                if( thread_uid != main_uid )
376                {
377                    if ( pthread_create( &trdid,              // not used because no join
378                                         &attr[thread_uid],   // thread attributes
379                                         &sort,               // entry function
380                                         &arg[thread_uid] ) ) // sort arguments
381                    {
[623]382                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
[434]383                        exit( 0 );
[417]384                    }
[442]385                    else
386                    {
[627]387#if DEBUG_MAIN
[623]388printf("\n[sort] main created thread %x \n", thread_uid );
389#endif
[442]390                    }
[440]391                }
[417]392            }
393        }
394    }
[442]395   
[623]396    ///////////////////////////
397    get_cycle( &seq_end_cycle );
[417]398
[627]399#if DEBUG_MAIN
[623]400printf("\n[sort] main completes sequencial init at cycle %d\n",
401(unsigned int)seq_end_cycle );
402#endif
403
[442]404#if INTERACTIVE_MODE
405idbg();
406#endif
[441]407   
[445]408    // the main thread run also the sort() function
409    sort( &arg[main_uid] );
410
[623]411    ////////////////////////////
412    get_cycle( &para_end_cycle );
[619]413
[623]414    printf("\n[sort] main completes parallel sort at cycle %d\n", 
415    (unsigned int)para_end_cycle );
416
[619]417    // destroy barrier
418    pthread_barrier_destroy( &barrier );
419
420#if INTERACTIVE_MODE
421idbg();
422#endif
423
[625]424#if CHECK_RESULT
425    int    success = 1;
426    int *  res_array = ( (total_threads ==   2) ||
427                         (total_threads ==   8) || 
428                         (total_threads ==  32) || 
429                         (total_threads == 128) || 
430                         (total_threads == 512) ) ? array1 : array0;
[623]431
[625]432    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]433    {
[625]434        if ( res_array[n] > res_array[n+1] )
435        {
436            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
437            n , res_array[n] , n+1 , res_array[n+1] );
438            success = 0;
439            break;
440        }
[417]441    }
442
[625]443    if ( success ) printf("\n[sort] success\n");
444    else           printf("\n[sort] failure\n");
[417]445#endif
446
[623]447#if INSTRUMENTATION
[626]448    char               name[64];
449    char               path[128];
450    unsigned long long instru_cycle;
[417]451
[629]452    // build file name
453    if( USE_DQT_BARRIER )
454    snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
455    else
456    snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
[623]457
[625]458    // build file pathname
459    snprintf( path , 128 , "home/%s" , name );
[623]460
[625]461    // compute results
462    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
463    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
[623]464
[625]465    // display results on process terminal
466    printf("\n----- %s -----\n"
467           " - sequencial : %d cycles\n"
468           " - parallel   : %d cycles\n", 
469           name, sequencial, parallel );
[623]470
[625]471    // open file
[626]472    get_cycle( &instru_cycle );
[625]473    FILE * stream = fopen( path , NULL );
[626]474
[625]475    if( stream == NULL )
476    {
[626]477        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
[625]478        exit(0);
479    }
[623]480
[626]481    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
[625]482
[626]483#if IDBG
484idbg();
485#endif
486
[625]487    // register results to file
[626]488    get_cycle( &instru_cycle );
[625]489    int ret = fprintf( stream , "\n----- %s -----\n"
490                                " - sequencial : %d cycles\n"
491                                " - parallel   : %d cycles\n", name, sequencial, parallel );
492    if( ret < 0 )
493    {
[626]494        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
[625]495        exit(0);
496    }
497
[626]498    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
[625]499
[626]500#if IDBG
501idbg();
502#endif
503
[625]504    // close instrumentation file
[626]505    get_cycle( &instru_cycle );
506    ret = fclose( stream );
[625]507
[626]508    if( ret )
[625]509    {
[626]510        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
[625]511        exit(0);
512    }
513
[626]514    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
[625]515
[619]516#endif
517
518    exit( 0 );
519
[417]520}  // end main()
521
522/*
523vim: tabstop=4 : shiftwidth=4 : expandtab
524*/
Note: See TracBrowser for help on using the repository browser.