source: trunk/user/sort/sort.c @ 626

Last change on this file since 626 was 626, checked in by alain, 5 years ago

This version has been tested on the sort multithreaded application
for TSAR_IOB architectures ranging from 1 to 8 clusters.
It fixes three bigs bugs:
1) the dev_ioc device API has been modified: the dev_ioc_sync_read()
and dev_ioc_sync_write() function use now extended pointers on the
kernel buffer to access a mapper stored in any cluster.
2) the hal_uspace API has been modified: the hal_copy_to_uspace()
and hal_copy_from_uspace() functions use now a (cxy,ptr) couple
to identify the target buffer (equivalent to an extended pointer.
3) an implementation bug has been fixed in the assembly code contained
in the hal_copy_to_uspace() and hal_copy_from_uspace() functions.

  • Property svn:executable set to *
File size: 15.8 KB
RevLine 
[417]1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
[596]26#include <unistd.h>
[417]27#include <pthread.h>
[445]28#include <almosmkh.h>
[459]29#include <hal_macros.h>
[417]30
[626]31#define ARRAY_LENGTH        1024       // number of items
[588]32#define MAX_THREADS         1024       // 16 * 16 * 4
[440]33
[623]34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define VERBOSE             0          // for debug
37#define INTERACTIVE_MODE    0          // for debug
38#define CHECK_RESULT        0          // for debug
39#define INSTRUMENTATION     1          // register computation times on file
[626]40#define IDBG                0          // activate interactive debug in main
[623]41
[417]42/////////////////////////////////////////////////////////////
43// argument for the sort() function (one thread per core)
44/////////////////////////////////////////////////////////////
45
46typedef struct
47{
[619]48    unsigned int threads;       // total number of threads
[417]49    unsigned int thread_uid;    // thread user index (0 to threads -1)
50    unsigned int main_uid;      // main thread user index
51}
52args_t;
53
54//////////////////////////////////////////
55//      Global variables
56//////////////////////////////////////////
57
58int                 array0[ARRAY_LENGTH];    // values to sort
59int                 array1[ARRAY_LENGTH];   
60
61pthread_barrier_t   barrier;                 // synchronisation variables
62
[440]63pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
64args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
[417]65
66////////////////////////////////////
[588]67static void bubbleSort( int * array,
68                        unsigned int length,
69                        unsigned int init_pos )
[417]70{
[473]71    unsigned int i;
72    unsigned int j;
73    int          aux;
[417]74
75    for(i = 0; i < length; i++)
76    {
77        for(j = init_pos; j < (init_pos + length - i - 1); j++)
78        {
79            if(array[j] > array[j + 1])
80            {
81                aux          = array[j + 1];
82                array[j + 1] = array[j];
83                array[j]     = aux;
84            }
85        }
86    }
87}  // end bubbleSort()
88
89
[588]90///////////////////////////////////
[623]91static void merge( const int * src,               // source array
92                   int       * dst,               // destination array
93                   int         length,            // number of items in a subset
94                   int         init_pos_src_a,    // index first item in src subset A
95                   int         init_pos_src_b,    // index first item in src subset B
96                   int         init_pos_dst )     // index first item in destination
[417]97{
98    int i;
99    int j;
100    int k;
101
102    i = 0;
103    j = 0;
104    k = init_pos_dst;
105
106    while((i < length) || (j < length))
107    {
108        if((i < length) && (j < length))
109        {
110            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
111            {
112                dst[k++] = src[init_pos_src_a + i];
113                i++;
114            }
115            else
116            {
117                dst[k++] = src[init_pos_src_b + j];
118                j++;
119            }
120        }
121        else if(i < length)
122        {
123            dst[k++] = src[init_pos_src_a + i];
124            i++;
125        }
126        else
127        {
128            dst[k++] = src[init_pos_src_b + j];
129            j++;
130        }
131    }
132}  // end merge()
133
[588]134//////////////////////////////////////
[504]135static void sort( const args_t * ptr )
[417]136{
137    unsigned int       i;
138    unsigned long long cycle;
[440]139    unsigned int       cxy;
140    unsigned int       lid;
[417]141
[623]142    int              * src_array  = NULL;
143    int              * dst_array  = NULL;
[417]144
[440]145    // get core coordinates an date
146    get_core( &cxy , &lid );
147    get_cycle( &cycle );
148
[417]149    unsigned int  thread_uid = ptr->thread_uid;
150    unsigned int  threads    = ptr->threads;
151    unsigned int  main_uid   = ptr->main_uid;
152
[623]153#if DISPLAY_ARRAY
154unsigned int n;
155if( thread_uid == main_uid )
156{
157    printf("\n*** array before sort\n");
158    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
159}
160#endif
161
162    /////////////////////////////////
163    pthread_barrier_wait( &barrier ); 
164
165#if VERBOSE
166printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
167#endif
168
[417]169    unsigned int  items      = ARRAY_LENGTH / threads;
170    unsigned int  stages     = __builtin_ctz( threads ) + 1;
171
[623]172#if VERBOSE
173printf("\n[sort] thread[%d] : start\n", thread_uid );
174#endif
[442]175
[417]176    bubbleSort( array0, items, items * thread_uid );
177
[623]178#if VERBOSE
179printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
180#endif
[417]181
182    /////////////////////////////////
183    pthread_barrier_wait( &barrier ); 
[440]184
[623]185#if VERBOSE
186printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
187#endif
188
189#if DISPLAY_ARRAY
190if( thread_uid == main_uid )
191{
192    printf("\n*** array after bubble sort\n");
193    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
194}
195#endif
196
197    // the number of threads contributing to sort is divided by 2
198    // and the number of items is multiplied by 2 at each next stage
[417]199    for ( i = 1 ; i < stages ; i++ )
200    {
[623]201        if((i % 2) == 1)               // odd stage
202        {
203            src_array = array0;
204            dst_array = array1;
205        }
206        else                           // even stage
207        {
208            src_array = array1;
209            dst_array = array0;
210        }
[417]211
212        if( (thread_uid & ((1<<i)-1)) == 0 )
213        {
214
[623]215#if VERBOSE
216printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
217#endif
[417]218            merge( src_array, 
219                   dst_array,
[623]220                   items << (i-1),
[417]221                   items * thread_uid,
222                   items * (thread_uid + (1 << (i-1))),
223                   items * thread_uid );
224
[623]225#if VERBOSE
226printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
227#endif
[417]228        }
229
230        /////////////////////////////////
231        pthread_barrier_wait( &barrier );
232
[623]233#if VERBOSE
234printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
235#endif
[417]236
[623]237#if DISPLAY_ARRAY
238if( thread_uid == main_uid )
239{
240    printf("\n*** array after merge %d\n", i );
241    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
242}
243#endif
244
245    }  // en for stages
246
[417]247    // all threads but the main thread exit
248    if( thread_uid != main_uid ) pthread_exit( NULL );
249
250} // end sort()
251
252
[574]253/////////////////
[475]254void main( void )
[417]255{
[619]256    int                    error;
[417]257    unsigned int           x_size;             // number of rows
258    unsigned int           y_size;             // number of columns
259    unsigned int           ncores;             // number of cores per cluster
[619]260    unsigned int           total_threads;      // total number of threads
[417]261    unsigned int           thread_uid;         // user defined thread index
262    unsigned int           main_cxy;           // cluster identifier for main
263    unsigned int           main_x;             // X coordinate for main thread
264    unsigned int           main_y;             // Y coordinate for main thread
265    unsigned int           main_lid;           // core local index for main thread
266    unsigned int           main_uid;           // thread user index for main thread
267    unsigned int           x;                  // X coordinate for a thread
268    unsigned int           y;                  // Y coordinate for a thread
269    unsigned int           lid;                // core local index for a thread
270    unsigned int           n;                  // index in array to sort
271    pthread_t              trdid;              // kernel allocated thread index (unused)
272    pthread_barrierattr_t  barrier_attr;       // barrier attributes
273
[623]274    unsigned long long     start_cycle;
275    unsigned long long     seq_end_cycle;
276    unsigned long long     para_end_cycle;
277
278    /////////////////////////
279    get_cycle( &start_cycle );
280 
[596]281    // compute number of threads (one thread per core)
[417]282    get_config( &x_size , &y_size , &ncores );
[619]283    total_threads = x_size * y_size * ncores;
[417]284
285    // get core coordinates and user index for the main thread
286    get_core( &main_cxy , & main_lid );
[459]287    main_x   = HAL_X_FROM_CXY( main_cxy );
288    main_y   = HAL_Y_FROM_CXY( main_cxy );
[417]289    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
290
291    // checks number of threads
[619]292    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
293         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
294         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
295         (total_threads != 512) && (total_threads != 1024) )
[417]296    {
[623]297        printf("\n[sort error] number of cores must be power of 2\n");
[434]298        exit( 0 );
[417]299    }
300
301    // check array size
[619]302    if ( ARRAY_LENGTH % total_threads) 
[417]303    {
[623]304        printf("\n[sort error] array size must be multiple of number of threads\n");
[434]305        exit( 0 );
[417]306    }
307
[624]308    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
[623]309    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
[417]310
[619]311    // initialize barrier
312    if( USE_DQT_BARRIER )
[417]313    {
[619]314        barrier_attr.x_size   = x_size; 
315        barrier_attr.y_size   = y_size;
316        barrier_attr.nthreads = ncores;
317        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
318    }
319    else // use SIMPLE_BARRIER
320    {
321        error = pthread_barrier_init( &barrier, NULL , total_threads );
322    }
323
324    if( error )
325    {
[623]326        printf("\n[sort error] cannot initialise barrier\n" );
[434]327        exit( 0 );
[417]328    }
329
[623]330#if VERBOSE
331printf("\n[sort] main completes barrier init\n");
332#endif
[417]333
334    // Array to sort initialization
335    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
336    {
[596]337        array0[n] = ARRAY_LENGTH - n - 1;
[417]338    }
339
[623]340#if VERBOSE
341printf("\n[sort] main completes array init\n");
[417]342#endif
343
344    // launch other threads to execute sort() function
345    // on cores other than the core running the main thread
346    for ( x=0 ; x<x_size ; x++ )
347    {
348        for ( y=0 ; y<y_size ; y++ )
349        {
350            for ( lid=0 ; lid<ncores ; lid++ )
351            {
352                thread_uid = (((x * y_size) + y) * ncores) + lid;
353
354                // set sort arguments for all threads
[619]355                arg[thread_uid].threads      = total_threads;
[417]356                arg[thread_uid].thread_uid   = thread_uid;
357                arg[thread_uid].main_uid     = main_uid;
358
359                // set thread attributes for all threads
[574]360                attr[thread_uid].attributes = PT_ATTR_DETACH          |
361                                              PT_ATTR_CLUSTER_DEFINED | 
362                                              PT_ATTR_CORE_DEFINED;
[459]363                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
[417]364                attr[thread_uid].lid        = lid;
365
366                if( thread_uid != main_uid )
367                {
368                    if ( pthread_create( &trdid,              // not used because no join
369                                         &attr[thread_uid],   // thread attributes
370                                         &sort,               // entry function
371                                         &arg[thread_uid] ) ) // sort arguments
372                    {
[623]373                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
[434]374                        exit( 0 );
[417]375                    }
[442]376                    else
377                    {
[623]378#if VERBOSE
379printf("\n[sort] main created thread %x \n", thread_uid );
380#endif
[442]381                    }
[440]382                }
[417]383            }
384        }
385    }
[442]386   
[623]387    ///////////////////////////
388    get_cycle( &seq_end_cycle );
[417]389
[623]390#if VERBOSE
391printf("\n[sort] main completes sequencial init at cycle %d\n",
392(unsigned int)seq_end_cycle );
393#endif
394
[442]395#if INTERACTIVE_MODE
396idbg();
397#endif
[441]398   
[445]399    // the main thread run also the sort() function
400    sort( &arg[main_uid] );
401
[623]402    ////////////////////////////
403    get_cycle( &para_end_cycle );
[619]404
[623]405    printf("\n[sort] main completes parallel sort at cycle %d\n", 
406    (unsigned int)para_end_cycle );
407
[619]408    // destroy barrier
409    pthread_barrier_destroy( &barrier );
410
411#if INTERACTIVE_MODE
412idbg();
413#endif
414
[625]415#if CHECK_RESULT
416   
417    int    success = 1;
418    int *  res_array = ( (total_threads ==   2) ||
419                         (total_threads ==   8) || 
420                         (total_threads ==  32) || 
421                         (total_threads == 128) || 
422                         (total_threads == 512) ) ? array1 : array0;
[623]423
[625]424    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]425    {
[625]426        if ( res_array[n] > res_array[n+1] )
427        {
428            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
429            n , res_array[n] , n+1 , res_array[n+1] );
430            success = 0;
431            break;
432        }
[417]433    }
434
[625]435    if ( success ) printf("\n[sort] success\n");
436    else           printf("\n[sort] failure\n");
437
[417]438#endif
439
[623]440#if INSTRUMENTATION
[417]441
[626]442    char               name[64];
443    char               path[128];
444    unsigned long long instru_cycle;
[417]445
[625]446    // build a file name from n_items / n_clusters / n_cores
447    if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", 
448                          ARRAY_LENGTH, x_size * y_size, ncores );
449    else                  snprintf( name , 64 , "sort_smp_%d_%d_%d", 
450                          ARRAY_LENGTH, x_size * y_size, ncores );
[623]451
[625]452    // build file pathname
453    snprintf( path , 128 , "home/%s" , name );
[623]454
[625]455    // compute results
456    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
457    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
[623]458
[625]459    // display results on process terminal
460    printf("\n----- %s -----\n"
461           " - sequencial : %d cycles\n"
462           " - parallel   : %d cycles\n", 
463           name, sequencial, parallel );
[623]464
[626]465#if IDBG
466idbg();
467#endif
468
[625]469    // open file
[626]470    get_cycle( &instru_cycle );
[625]471    FILE * stream = fopen( path , NULL );
[626]472
[625]473    if( stream == NULL )
474    {
[626]475        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
[625]476        exit(0);
477    }
[623]478
[626]479    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
[625]480
[626]481#if IDBG
482idbg();
483#endif
484
[625]485    // register results to file
[626]486    get_cycle( &instru_cycle );
[625]487    int ret = fprintf( stream , "\n----- %s -----\n"
488                                " - sequencial : %d cycles\n"
489                                " - parallel   : %d cycles\n", name, sequencial, parallel );
490    if( ret < 0 )
491    {
[626]492        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
[625]493        exit(0);
494    }
495
[626]496    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
[625]497
[626]498#if IDBG
499idbg();
500#endif
501
[625]502    // close instrumentation file
[626]503    get_cycle( &instru_cycle );
504    ret = fclose( stream );
[625]505
[626]506    if( ret )
[625]507    {
[626]508        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
[625]509        exit(0);
510    }
511
[626]512    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
[625]513
[619]514#endif
515
516    exit( 0 );
517
[417]518}  // end main()
519
520/*
521vim: tabstop=4 : shiftwidth=4 : expandtab
522*/
Note: See TracBrowser for help on using the repository browser.