source: trunk/user/sort/sort.c @ 627

Last change on this file since 627 was 627, checked in by alain, 5 years ago

Replace the queuelock protectingthe FAT by a rwlock in the FATFS.

  • Property svn:executable set to *
File size: 16.1 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <pthread.h>
28#include <almosmkh.h>
29#include <hal_macros.h>
30
31#define ARRAY_LENGTH        4096       // number of items
32#define MAX_THREADS         1024       // 16 * 16 * 4
33
34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define DEBUG_MAIN          1          // trace main function
37#define DEBUG_SORT          0          // trace sort function
38#define INTERACTIVE_MODE    0          // activate idbg() during instrumentation
39#define CHECK_RESULT        0          // for debug
40#define INSTRUMENTATION     1          // register computation times on file
41#define IDBG                0          // activate interactive debug in main
42
43/////////////////////////////////////////////////////////////
44// argument for the sort() function (one thread per core)
45/////////////////////////////////////////////////////////////
46
47typedef struct
48{
49    unsigned int threads;       // total number of threads
50    unsigned int thread_uid;    // thread user index (0 to threads -1)
51    unsigned int main_uid;      // main thread user index
52}
53args_t;
54
55//////////////////////////////////////////
56//      Global variables
57//////////////////////////////////////////
58
59int                 array0[ARRAY_LENGTH];    // values to sort
60int                 array1[ARRAY_LENGTH];   
61
62pthread_barrier_t   barrier;                 // synchronisation variables
63
64pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
65args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
66
67////////////////////////////////////
68static void bubbleSort( int * array,
69                        unsigned int length,
70                        unsigned int init_pos )
71{
72    unsigned int i;
73    unsigned int j;
74    int          aux;
75
76    for(i = 0; i < length; i++)
77    {
78        for(j = init_pos; j < (init_pos + length - i - 1); j++)
79        {
80            if(array[j] > array[j + 1])
81            {
82                aux          = array[j + 1];
83                array[j + 1] = array[j];
84                array[j]     = aux;
85            }
86        }
87    }
88}  // end bubbleSort()
89
90
91///////////////////////////////////
92static void merge( const int * src,               // source array
93                   int       * dst,               // destination array
94                   int         length,            // number of items in a subset
95                   int         init_pos_src_a,    // index first item in src subset A
96                   int         init_pos_src_b,    // index first item in src subset B
97                   int         init_pos_dst )     // index first item in destination
98{
99    int i;
100    int j;
101    int k;
102
103    i = 0;
104    j = 0;
105    k = init_pos_dst;
106
107    while((i < length) || (j < length))
108    {
109        if((i < length) && (j < length))
110        {
111            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
112            {
113                dst[k++] = src[init_pos_src_a + i];
114                i++;
115            }
116            else
117            {
118                dst[k++] = src[init_pos_src_b + j];
119                j++;
120            }
121        }
122        else if(i < length)
123        {
124            dst[k++] = src[init_pos_src_a + i];
125            i++;
126        }
127        else
128        {
129            dst[k++] = src[init_pos_src_b + j];
130            j++;
131        }
132    }
133}  // end merge()
134
135//////////////////////////////////////
136static void sort( const args_t * ptr )
137{
138    unsigned int       i;
139    unsigned long long cycle;
140    unsigned int       cxy;
141    unsigned int       lid;
142
143    int              * src_array  = NULL;
144    int              * dst_array  = NULL;
145
146    // get core coordinates an date
147    get_core( &cxy , &lid );
148    get_cycle( &cycle );
149
150    unsigned int  thread_uid = ptr->thread_uid;
151    unsigned int  threads    = ptr->threads;
152    unsigned int  main_uid   = ptr->main_uid;
153
154#if DISPLAY_ARRAY
155unsigned int n;
156if( thread_uid == main_uid )
157{
158    printf("\n*** array before sort\n");
159    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
160}
161#endif
162
163    /////////////////////////////////
164    pthread_barrier_wait( &barrier ); 
165
166#if DEBUG_SORT
167if( thread_uid == 0 )
168printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
169#endif
170
171    unsigned int  items      = ARRAY_LENGTH / threads;
172    unsigned int  stages     = __builtin_ctz( threads ) + 1;
173
174#if DEBUG_SORT
175if( thread_uid == 0 )
176printf("\n[sort] thread[%d] : start\n", thread_uid );
177#endif
178
179    bubbleSort( array0, items, items * thread_uid );
180
181#if DEBUG_SORT
182if( thread_uid == 0 )
183printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
184#endif
185
186    /////////////////////////////////
187    pthread_barrier_wait( &barrier ); 
188
189#if DEBUG_SORT
190if( thread_uid == 0 )
191printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
192#endif
193
194#if DISPLAY_ARRAY
195if( thread_uid == main_uid )
196{
197    printf("\n*** array after bubble sort\n");
198    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
199}
200#endif
201
202    // the number of threads contributing to sort is divided by 2
203    // and the number of items is multiplied by 2 at each next stage
204    for ( i = 1 ; i < stages ; i++ )
205    {
206        if((i % 2) == 1)               // odd stage
207        {
208            src_array = array0;
209            dst_array = array1;
210        }
211        else                           // even stage
212        {
213            src_array = array1;
214            dst_array = array0;
215        }
216
217        if( (thread_uid & ((1<<i)-1)) == 0 )
218        {
219
220#if DEBUG_SORT
221if( thread_uid == 0 )
222printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
223#endif
224            merge( src_array, 
225                   dst_array,
226                   items << (i-1),
227                   items * thread_uid,
228                   items * (thread_uid + (1 << (i-1))),
229                   items * thread_uid );
230
231#if DEBUG_SORT
232if( thread_uid == 0 )
233printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
234#endif
235        }
236
237        /////////////////////////////////
238        pthread_barrier_wait( &barrier );
239
240#if DEBUG_SORT
241if( thread_uid == 0 )
242printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
243#endif
244
245#if DISPLAY_ARRAY
246if( thread_uid == main_uid )
247{
248    printf("\n*** array after merge %d\n", i );
249    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
250}
251#endif
252
253    }  // en for stages
254
255    // all threads but the main thread exit
256    if( thread_uid != main_uid ) pthread_exit( NULL );
257
258} // end sort()
259
260
261/////////////////
262void main( void )
263{
264    int                    error;
265    unsigned int           x_size;             // number of rows
266    unsigned int           y_size;             // number of columns
267    unsigned int           ncores;             // number of cores per cluster
268    unsigned int           total_threads;      // total number of threads
269    unsigned int           thread_uid;         // user defined thread index
270    unsigned int           main_cxy;           // cluster identifier for main
271    unsigned int           main_x;             // X coordinate for main thread
272    unsigned int           main_y;             // Y coordinate for main thread
273    unsigned int           main_lid;           // core local index for main thread
274    unsigned int           main_uid;           // thread user index for main thread
275    unsigned int           x;                  // X coordinate for a thread
276    unsigned int           y;                  // Y coordinate for a thread
277    unsigned int           lid;                // core local index for a thread
278    unsigned int           n;                  // index in array to sort
279    pthread_t              trdid;              // kernel allocated thread index (unused)
280    pthread_barrierattr_t  barrier_attr;       // barrier attributes
281
282    unsigned long long     start_cycle;
283    unsigned long long     seq_end_cycle;
284    unsigned long long     para_end_cycle;
285
286    /////////////////////////
287    get_cycle( &start_cycle );
288 
289    // compute number of threads (one thread per core)
290    get_config( &x_size , &y_size , &ncores );
291    total_threads = x_size * y_size * ncores;
292
293    // get core coordinates and user index for the main thread
294    get_core( &main_cxy , & main_lid );
295    main_x   = HAL_X_FROM_CXY( main_cxy );
296    main_y   = HAL_Y_FROM_CXY( main_cxy );
297    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
298
299    // checks number of threads
300    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
301         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
302         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
303         (total_threads != 512) && (total_threads != 1024) )
304    {
305        printf("\n[sort error] number of cores must be power of 2\n");
306        exit( 0 );
307    }
308
309    // check array size
310    if ( ARRAY_LENGTH % total_threads) 
311    {
312        printf("\n[sort error] array size must be multiple of number of threads\n");
313        exit( 0 );
314    }
315
316    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
317    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
318
319    // initialize barrier
320    if( USE_DQT_BARRIER )
321    {
322        barrier_attr.x_size   = x_size; 
323        barrier_attr.y_size   = y_size;
324        barrier_attr.nthreads = ncores;
325        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
326    }
327    else // use SIMPLE_BARRIER
328    {
329        error = pthread_barrier_init( &barrier, NULL , total_threads );
330    }
331
332    if( error )
333    {
334        printf("\n[sort error] cannot initialise barrier\n" );
335        exit( 0 );
336    }
337
338#if DEBUG_MAIN
339printf("\n[sort] main completes barrier init\n");
340#endif
341
342    // Array to sort initialization
343    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
344    {
345        array0[n] = ARRAY_LENGTH - n - 1;
346    }
347
348#if DEBUG_MAIN
349printf("\n[sort] main completes array init\n");
350#endif
351
352    // launch other threads to execute sort() function
353    // on cores other than the core running the main thread
354    for ( x=0 ; x<x_size ; x++ )
355    {
356        for ( y=0 ; y<y_size ; y++ )
357        {
358            for ( lid=0 ; lid<ncores ; lid++ )
359            {
360                thread_uid = (((x * y_size) + y) * ncores) + lid;
361
362                // set sort arguments for all threads
363                arg[thread_uid].threads      = total_threads;
364                arg[thread_uid].thread_uid   = thread_uid;
365                arg[thread_uid].main_uid     = main_uid;
366
367                // set thread attributes for all threads
368                attr[thread_uid].attributes = PT_ATTR_DETACH          |
369                                              PT_ATTR_CLUSTER_DEFINED | 
370                                              PT_ATTR_CORE_DEFINED;
371                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
372                attr[thread_uid].lid        = lid;
373
374                if( thread_uid != main_uid )
375                {
376                    if ( pthread_create( &trdid,              // not used because no join
377                                         &attr[thread_uid],   // thread attributes
378                                         &sort,               // entry function
379                                         &arg[thread_uid] ) ) // sort arguments
380                    {
381                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
382                        exit( 0 );
383                    }
384                    else
385                    {
386#if DEBUG_MAIN
387printf("\n[sort] main created thread %x \n", thread_uid );
388#endif
389                    }
390                }
391            }
392        }
393    }
394   
395    ///////////////////////////
396    get_cycle( &seq_end_cycle );
397
398#if DEBUG_MAIN
399printf("\n[sort] main completes sequencial init at cycle %d\n",
400(unsigned int)seq_end_cycle );
401#endif
402
403#if INTERACTIVE_MODE
404idbg();
405#endif
406   
407    // the main thread run also the sort() function
408    sort( &arg[main_uid] );
409
410    ////////////////////////////
411    get_cycle( &para_end_cycle );
412
413    printf("\n[sort] main completes parallel sort at cycle %d\n", 
414    (unsigned int)para_end_cycle );
415
416    // destroy barrier
417    pthread_barrier_destroy( &barrier );
418
419#if INTERACTIVE_MODE
420idbg();
421#endif
422
423#if CHECK_RESULT
424   
425    int    success = 1;
426    int *  res_array = ( (total_threads ==   2) ||
427                         (total_threads ==   8) || 
428                         (total_threads ==  32) || 
429                         (total_threads == 128) || 
430                         (total_threads == 512) ) ? array1 : array0;
431
432    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
433    {
434        if ( res_array[n] > res_array[n+1] )
435        {
436            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
437            n , res_array[n] , n+1 , res_array[n+1] );
438            success = 0;
439            break;
440        }
441    }
442
443    if ( success ) printf("\n[sort] success\n");
444    else           printf("\n[sort] failure\n");
445
446#endif
447
448#if INSTRUMENTATION
449
450    char               name[64];
451    char               path[128];
452    unsigned long long instru_cycle;
453
454    // build a file name from n_items / n_clusters / n_cores
455    if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", 
456                          ARRAY_LENGTH, x_size * y_size, ncores );
457    else                  snprintf( name , 64 , "sort_smp_%d_%d_%d", 
458                          ARRAY_LENGTH, x_size * y_size, ncores );
459
460    // build file pathname
461    snprintf( path , 128 , "home/%s" , name );
462
463    // compute results
464    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
465    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
466
467    // display results on process terminal
468    printf("\n----- %s -----\n"
469           " - sequencial : %d cycles\n"
470           " - parallel   : %d cycles\n", 
471           name, sequencial, parallel );
472
473#if IDBG
474idbg();
475#endif
476
477    // open file
478    get_cycle( &instru_cycle );
479    FILE * stream = fopen( path , NULL );
480
481    if( stream == NULL )
482    {
483        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
484        exit(0);
485    }
486
487    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
488
489#if IDBG
490idbg();
491#endif
492
493    // register results to file
494    get_cycle( &instru_cycle );
495    int ret = fprintf( stream , "\n----- %s -----\n"
496                                " - sequencial : %d cycles\n"
497                                " - parallel   : %d cycles\n", name, sequencial, parallel );
498    if( ret < 0 )
499    {
500        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
501        exit(0);
502    }
503
504    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
505
506#if IDBG
507idbg();
508#endif
509
510    // close instrumentation file
511    get_cycle( &instru_cycle );
512    ret = fclose( stream );
513
514    if( ret )
515    {
516        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
517        exit(0);
518    }
519
520    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
521
522#endif
523
524    exit( 0 );
525
526}  // end main()
527
528/*
529vim: tabstop=4 : shiftwidth=4 : expandtab
530*/
Note: See TracBrowser for help on using the repository browser.