source: trunk/user/sort/sort.c @ 629

Last change on this file since 629 was 629, checked in by alain, 18 months ago

Remove the "giant" rwlock protecting the GPT, and
use the GPT_LOCKED attribute in each PTE to prevent
concurrent modifications of one GPT entry.
The version number has been incremented to 2.1.

  • Property svn:executable set to *
File size: 16.1 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <pthread.h>
28#include <almosmkh.h>
29#include <hal_macros.h>
30
31#define ARRAY_LENGTH        4096       // number of items
32#define MAX_THREADS         1024       // 16 * 16 * 4
33
34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define DEBUG_MAIN          1          // trace main function
37#define DEBUG_SORT          1          // trace sort function
38#define INTERACTIVE_MODE    0          // activate idbg() during instrumentation
39#define CHECK_RESULT        0          // for debug
40#define INSTRUMENTATION     1          // register computation times on file
41#define IDBG                0          // activate interactive debug in main
42
43/////////////////////////////////////////////////////////////
44// argument for the sort() function (one thread per core)
45/////////////////////////////////////////////////////////////
46
47typedef struct
48{
49    unsigned int threads;       // total number of threads
50    unsigned int thread_uid;    // thread user index (0 to threads -1)
51    unsigned int main_uid;      // main thread user index
52}
53args_t;
54
55//////////////////////////////////////////
56//      Global variables
57//////////////////////////////////////////
58
59int                 array0[ARRAY_LENGTH];    // values to sort
60int                 array1[ARRAY_LENGTH];   
61
62pthread_barrier_t   barrier;                 // synchronisation variables
63
64pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
65args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
66
67////////////////////////////////////
68static void bubbleSort( int * array,
69                        unsigned int length,
70                        unsigned int init_pos )
71{
72    unsigned int i;
73    unsigned int j;
74    int          aux;
75
76    for(i = 0; i < length; i++)
77    {
78        for(j = init_pos; j < (init_pos + length - i - 1); j++)
79        {
80            if(array[j] > array[j + 1])
81            {
82                aux          = array[j + 1];
83                array[j + 1] = array[j];
84                array[j]     = aux;
85            }
86        }
87    }
88}  // end bubbleSort()
89
90
91///////////////////////////////////
92static void merge( const int * src,               // source array
93                   int       * dst,               // destination array
94                   int         length,            // number of items in a subset
95                   int         init_pos_src_a,    // index first item in src subset A
96                   int         init_pos_src_b,    // index first item in src subset B
97                   int         init_pos_dst )     // index first item in destination
98{
99    int i;
100    int j;
101    int k;
102
103    i = 0;
104    j = 0;
105    k = init_pos_dst;
106
107    while((i < length) || (j < length))
108    {
109        if((i < length) && (j < length))
110        {
111            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
112            {
113                dst[k++] = src[init_pos_src_a + i];
114                i++;
115            }
116            else
117            {
118                dst[k++] = src[init_pos_src_b + j];
119                j++;
120            }
121        }
122        else if(i < length)
123        {
124            dst[k++] = src[init_pos_src_a + i];
125            i++;
126        }
127        else
128        {
129            dst[k++] = src[init_pos_src_b + j];
130            j++;
131        }
132    }
133}  // end merge()
134
135//////////////////////////////////////
136static void sort( const args_t * ptr )
137{
138    unsigned int       i;
139    unsigned long long cycle;
140    unsigned int       cxy;
141    unsigned int       lid;
142
143    int              * src_array  = NULL;
144    int              * dst_array  = NULL;
145
146    // get core coordinates an date
147    get_core( &cxy , &lid );
148    get_cycle( &cycle );
149
150    unsigned int  thread_uid = ptr->thread_uid;
151    unsigned int  threads    = ptr->threads;
152    unsigned int  main_uid   = ptr->main_uid;
153
154#if DISPLAY_ARRAY
155unsigned int n;
156if( thread_uid == main_uid )
157{
158    printf("\n*** array before sort\n");
159    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
160}
161#endif
162
163    /////////////////////////////////
164    pthread_barrier_wait( &barrier ); 
165
166#if DEBUG_SORT
167if( thread_uid == 0 )
168printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
169#endif
170
171    unsigned int  items      = ARRAY_LENGTH / threads;
172    unsigned int  stages     = __builtin_ctz( threads ) + 1;
173
174#if DEBUG_SORT
175if( thread_uid == 0 )
176printf("\n[sort] thread[%d] : start\n", thread_uid );
177#endif
178
179    bubbleSort( array0, items, items * thread_uid );
180
181#if DEBUG_SORT
182if( thread_uid == 0 )
183printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
184#endif
185
186    /////////////////////////////////
187    pthread_barrier_wait( &barrier ); 
188
189#if DEBUG_SORT
190if( thread_uid == 0 )
191printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
192#endif
193
194#if DISPLAY_ARRAY
195if( thread_uid == main_uid )
196{
197    printf("\n*** array after bubble sort\n");
198    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
199}
200#endif
201
202    // the number of threads contributing to sort is divided by 2
203    // and the number of items is multiplied by 2 at each next stage
204    for ( i = 1 ; i < stages ; i++ )
205    {
206        if((i % 2) == 1)               // odd stage
207        {
208            src_array = array0;
209            dst_array = array1;
210        }
211        else                           // even stage
212        {
213            src_array = array1;
214            dst_array = array0;
215        }
216
217        if( (thread_uid & ((1<<i)-1)) == 0 )
218        {
219
220#if DEBUG_SORT
221if( thread_uid == 0 )
222printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
223#endif
224            merge( src_array, 
225                   dst_array,
226                   items << (i-1),
227                   items * thread_uid,
228                   items * (thread_uid + (1 << (i-1))),
229                   items * thread_uid );
230
231#if DEBUG_SORT
232if( thread_uid == 0 )
233printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
234#endif
235        }
236
237        /////////////////////////////////
238        pthread_barrier_wait( &barrier );
239
240#if DEBUG_SORT
241if( thread_uid == 0 )
242printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
243#endif
244
245#if DISPLAY_ARRAY
246if( thread_uid == main_uid )
247{
248    printf("\n*** array after merge %d\n", i );
249    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
250}
251#endif
252
253    }  // en for stages
254
255    // all threads but the main thread exit
256    if( thread_uid != main_uid ) pthread_exit( NULL );
257
258} // end sort()
259
260
261/////////////////
262void main( void )
263{
264    int                    error;
265    unsigned int           x_size;             // number of rows
266    unsigned int           y_size;             // number of columns
267    unsigned int           ncores;             // number of cores per cluster
268    unsigned int           total_threads;      // total number of threads
269    unsigned int           thread_uid;         // user defined thread index
270    unsigned int           main_cxy;           // cluster identifier for main
271    unsigned int           main_x;             // X coordinate for main thread
272    unsigned int           main_y;             // Y coordinate for main thread
273    unsigned int           main_lid;           // core local index for main thread
274    unsigned int           main_uid;           // thread user index for main thread
275    unsigned int           x;                  // X coordinate for a thread
276    unsigned int           y;                  // Y coordinate for a thread
277    unsigned int           lid;                // core local index for a thread
278    unsigned int           n;                  // index in array to sort
279    pthread_t              trdid;              // kernel allocated thread index (unused)
280    pthread_barrierattr_t  barrier_attr;       // barrier attributes
281
282    unsigned long long     start_cycle;
283    unsigned long long     seq_end_cycle;
284    unsigned long long     para_end_cycle;
285
286    /////////////////////////
287    get_cycle( &start_cycle );
288 
289    // compute number of threads (one thread per core)
290    get_config( &x_size , &y_size , &ncores );
291    total_threads = x_size * y_size * ncores;
292
293    // get core coordinates and user index for the main thread
294    get_core( &main_cxy , & main_lid );
295    main_x   = HAL_X_FROM_CXY( main_cxy );
296    main_y   = HAL_Y_FROM_CXY( main_cxy );
297    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
298
299    // checks number of threads
300    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
301         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
302         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
303         (total_threads != 512) && (total_threads != 1024) )
304    {
305        printf("\n[sort error] number of cores must be power of 2\n");
306        exit( 0 );
307    }
308
309    // check array size
310    if ( ARRAY_LENGTH % total_threads) 
311    {
312        printf("\n[sort error] array size must be multiple of number of threads\n");
313        exit( 0 );
314    }
315
316    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
317    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
318
319    // initialize barrier
320    if( USE_DQT_BARRIER )
321    {
322        barrier_attr.x_size   = x_size; 
323        barrier_attr.y_size   = y_size;
324        barrier_attr.nthreads = ncores;
325        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
326    }
327    else // use SIMPLE_BARRIER
328    {
329        error = pthread_barrier_init( &barrier, NULL , total_threads );
330    }
331
332    if( error )
333    {
334        printf("\n[sort error] cannot initialise barrier\n" );
335        exit( 0 );
336    }
337
338#if DEBUG_MAIN
339if( USE_DQT_BARRIER ) printf("\n[sort] main completes DQT barrier init\n");
340else                  printf("\n[sort] main completes simple barrier init\n");
341#endif
342
343    // Array to sort initialization
344    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
345    {
346        array0[n] = ARRAY_LENGTH - n - 1;
347    }
348
349#if DEBUG_MAIN
350printf("\n[sort] main completes array init\n");
351#endif
352
353    // launch other threads to execute sort() function
354    // on cores other than the core running the main thread
355    for ( x=0 ; x<x_size ; x++ )
356    {
357        for ( y=0 ; y<y_size ; y++ )
358        {
359            for ( lid=0 ; lid<ncores ; lid++ )
360            {
361                thread_uid = (((x * y_size) + y) * ncores) + lid;
362
363                // set sort arguments for all threads
364                arg[thread_uid].threads      = total_threads;
365                arg[thread_uid].thread_uid   = thread_uid;
366                arg[thread_uid].main_uid     = main_uid;
367
368                // set thread attributes for all threads
369                attr[thread_uid].attributes = PT_ATTR_DETACH          |
370                                              PT_ATTR_CLUSTER_DEFINED | 
371                                              PT_ATTR_CORE_DEFINED;
372                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
373                attr[thread_uid].lid        = lid;
374
375                if( thread_uid != main_uid )
376                {
377                    if ( pthread_create( &trdid,              // not used because no join
378                                         &attr[thread_uid],   // thread attributes
379                                         &sort,               // entry function
380                                         &arg[thread_uid] ) ) // sort arguments
381                    {
382                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
383                        exit( 0 );
384                    }
385                    else
386                    {
387#if DEBUG_MAIN
388printf("\n[sort] main created thread %x \n", thread_uid );
389#endif
390                    }
391                }
392            }
393        }
394    }
395   
396    ///////////////////////////
397    get_cycle( &seq_end_cycle );
398
399#if DEBUG_MAIN
400printf("\n[sort] main completes sequencial init at cycle %d\n",
401(unsigned int)seq_end_cycle );
402#endif
403
404#if INTERACTIVE_MODE
405idbg();
406#endif
407   
408    // the main thread run also the sort() function
409    sort( &arg[main_uid] );
410
411    ////////////////////////////
412    get_cycle( &para_end_cycle );
413
414    printf("\n[sort] main completes parallel sort at cycle %d\n", 
415    (unsigned int)para_end_cycle );
416
417    // destroy barrier
418    pthread_barrier_destroy( &barrier );
419
420#if INTERACTIVE_MODE
421idbg();
422#endif
423
424#if CHECK_RESULT
425    int    success = 1;
426    int *  res_array = ( (total_threads ==   2) ||
427                         (total_threads ==   8) || 
428                         (total_threads ==  32) || 
429                         (total_threads == 128) || 
430                         (total_threads == 512) ) ? array1 : array0;
431
432    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
433    {
434        if ( res_array[n] > res_array[n+1] )
435        {
436            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
437            n , res_array[n] , n+1 , res_array[n+1] );
438            success = 0;
439            break;
440        }
441    }
442
443    if ( success ) printf("\n[sort] success\n");
444    else           printf("\n[sort] failure\n");
445#endif
446
447#if INSTRUMENTATION
448    char               name[64];
449    char               path[128];
450    unsigned long long instru_cycle;
451
452    // build file name
453    if( USE_DQT_BARRIER )
454    snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
455    else
456    snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
457
458    // build file pathname
459    snprintf( path , 128 , "home/%s" , name );
460
461    // compute results
462    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
463    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
464
465    // display results on process terminal
466    printf("\n----- %s -----\n"
467           " - sequencial : %d cycles\n"
468           " - parallel   : %d cycles\n", 
469           name, sequencial, parallel );
470
471    // open file
472    get_cycle( &instru_cycle );
473    FILE * stream = fopen( path , NULL );
474
475    if( stream == NULL )
476    {
477        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
478        exit(0);
479    }
480
481    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
482
483#if IDBG
484idbg();
485#endif
486
487    // register results to file
488    get_cycle( &instru_cycle );
489    int ret = fprintf( stream , "\n----- %s -----\n"
490                                " - sequencial : %d cycles\n"
491                                " - parallel   : %d cycles\n", name, sequencial, parallel );
492    if( ret < 0 )
493    {
494        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
495        exit(0);
496    }
497
498    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
499
500#if IDBG
501idbg();
502#endif
503
504    // close instrumentation file
505    get_cycle( &instru_cycle );
506    ret = fclose( stream );
507
508    if( ret )
509    {
510        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
511        exit(0);
512    }
513
514    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
515
516#endif
517
518    exit( 0 );
519
520}  // end main()
521
522/*
523vim: tabstop=4 : shiftwidth=4 : expandtab
524*/
Note: See TracBrowser for help on using the repository browser.