source: trunk/user/sort/sort.c @ 626

Last change on this file since 626 was 626, checked in by alain, 5 years ago

This version has been tested on the sort multithreaded application
for TSAR_IOB architectures ranging from 1 to 8 clusters.
It fixes three bigs bugs:
1) the dev_ioc device API has been modified: the dev_ioc_sync_read()
and dev_ioc_sync_write() function use now extended pointers on the
kernel buffer to access a mapper stored in any cluster.
2) the hal_uspace API has been modified: the hal_copy_to_uspace()
and hal_copy_from_uspace() functions use now a (cxy,ptr) couple
to identify the target buffer (equivalent to an extended pointer.
3) an implementation bug has been fixed in the assembly code contained
in the hal_copy_to_uspace() and hal_copy_from_uspace() functions.

  • Property svn:executable set to *
File size: 15.8 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <pthread.h>
28#include <almosmkh.h>
29#include <hal_macros.h>
30
31#define ARRAY_LENGTH        1024       // number of items
32#define MAX_THREADS         1024       // 16 * 16 * 4
33
34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define VERBOSE             0          // for debug
37#define INTERACTIVE_MODE    0          // for debug
38#define CHECK_RESULT        0          // for debug
39#define INSTRUMENTATION     1          // register computation times on file
40#define IDBG                0          // activate interactive debug in main
41
42/////////////////////////////////////////////////////////////
43// argument for the sort() function (one thread per core)
44/////////////////////////////////////////////////////////////
45
46typedef struct
47{
48    unsigned int threads;       // total number of threads
49    unsigned int thread_uid;    // thread user index (0 to threads -1)
50    unsigned int main_uid;      // main thread user index
51}
52args_t;
53
54//////////////////////////////////////////
55//      Global variables
56//////////////////////////////////////////
57
58int                 array0[ARRAY_LENGTH];    // values to sort
59int                 array1[ARRAY_LENGTH];   
60
61pthread_barrier_t   barrier;                 // synchronisation variables
62
63pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
64args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
65
66////////////////////////////////////
67static void bubbleSort( int * array,
68                        unsigned int length,
69                        unsigned int init_pos )
70{
71    unsigned int i;
72    unsigned int j;
73    int          aux;
74
75    for(i = 0; i < length; i++)
76    {
77        for(j = init_pos; j < (init_pos + length - i - 1); j++)
78        {
79            if(array[j] > array[j + 1])
80            {
81                aux          = array[j + 1];
82                array[j + 1] = array[j];
83                array[j]     = aux;
84            }
85        }
86    }
87}  // end bubbleSort()
88
89
90///////////////////////////////////
91static void merge( const int * src,               // source array
92                   int       * dst,               // destination array
93                   int         length,            // number of items in a subset
94                   int         init_pos_src_a,    // index first item in src subset A
95                   int         init_pos_src_b,    // index first item in src subset B
96                   int         init_pos_dst )     // index first item in destination
97{
98    int i;
99    int j;
100    int k;
101
102    i = 0;
103    j = 0;
104    k = init_pos_dst;
105
106    while((i < length) || (j < length))
107    {
108        if((i < length) && (j < length))
109        {
110            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
111            {
112                dst[k++] = src[init_pos_src_a + i];
113                i++;
114            }
115            else
116            {
117                dst[k++] = src[init_pos_src_b + j];
118                j++;
119            }
120        }
121        else if(i < length)
122        {
123            dst[k++] = src[init_pos_src_a + i];
124            i++;
125        }
126        else
127        {
128            dst[k++] = src[init_pos_src_b + j];
129            j++;
130        }
131    }
132}  // end merge()
133
134//////////////////////////////////////
135static void sort( const args_t * ptr )
136{
137    unsigned int       i;
138    unsigned long long cycle;
139    unsigned int       cxy;
140    unsigned int       lid;
141
142    int              * src_array  = NULL;
143    int              * dst_array  = NULL;
144
145    // get core coordinates an date
146    get_core( &cxy , &lid );
147    get_cycle( &cycle );
148
149    unsigned int  thread_uid = ptr->thread_uid;
150    unsigned int  threads    = ptr->threads;
151    unsigned int  main_uid   = ptr->main_uid;
152
153#if DISPLAY_ARRAY
154unsigned int n;
155if( thread_uid == main_uid )
156{
157    printf("\n*** array before sort\n");
158    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
159}
160#endif
161
162    /////////////////////////////////
163    pthread_barrier_wait( &barrier ); 
164
165#if VERBOSE
166printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
167#endif
168
169    unsigned int  items      = ARRAY_LENGTH / threads;
170    unsigned int  stages     = __builtin_ctz( threads ) + 1;
171
172#if VERBOSE
173printf("\n[sort] thread[%d] : start\n", thread_uid );
174#endif
175
176    bubbleSort( array0, items, items * thread_uid );
177
178#if VERBOSE
179printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
180#endif
181
182    /////////////////////////////////
183    pthread_barrier_wait( &barrier ); 
184
185#if VERBOSE
186printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
187#endif
188
189#if DISPLAY_ARRAY
190if( thread_uid == main_uid )
191{
192    printf("\n*** array after bubble sort\n");
193    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
194}
195#endif
196
197    // the number of threads contributing to sort is divided by 2
198    // and the number of items is multiplied by 2 at each next stage
199    for ( i = 1 ; i < stages ; i++ )
200    {
201        if((i % 2) == 1)               // odd stage
202        {
203            src_array = array0;
204            dst_array = array1;
205        }
206        else                           // even stage
207        {
208            src_array = array1;
209            dst_array = array0;
210        }
211
212        if( (thread_uid & ((1<<i)-1)) == 0 )
213        {
214
215#if VERBOSE
216printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
217#endif
218            merge( src_array, 
219                   dst_array,
220                   items << (i-1),
221                   items * thread_uid,
222                   items * (thread_uid + (1 << (i-1))),
223                   items * thread_uid );
224
225#if VERBOSE
226printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
227#endif
228        }
229
230        /////////////////////////////////
231        pthread_barrier_wait( &barrier );
232
233#if VERBOSE
234printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
235#endif
236
237#if DISPLAY_ARRAY
238if( thread_uid == main_uid )
239{
240    printf("\n*** array after merge %d\n", i );
241    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
242}
243#endif
244
245    }  // en for stages
246
247    // all threads but the main thread exit
248    if( thread_uid != main_uid ) pthread_exit( NULL );
249
250} // end sort()
251
252
253/////////////////
254void main( void )
255{
256    int                    error;
257    unsigned int           x_size;             // number of rows
258    unsigned int           y_size;             // number of columns
259    unsigned int           ncores;             // number of cores per cluster
260    unsigned int           total_threads;      // total number of threads
261    unsigned int           thread_uid;         // user defined thread index
262    unsigned int           main_cxy;           // cluster identifier for main
263    unsigned int           main_x;             // X coordinate for main thread
264    unsigned int           main_y;             // Y coordinate for main thread
265    unsigned int           main_lid;           // core local index for main thread
266    unsigned int           main_uid;           // thread user index for main thread
267    unsigned int           x;                  // X coordinate for a thread
268    unsigned int           y;                  // Y coordinate for a thread
269    unsigned int           lid;                // core local index for a thread
270    unsigned int           n;                  // index in array to sort
271    pthread_t              trdid;              // kernel allocated thread index (unused)
272    pthread_barrierattr_t  barrier_attr;       // barrier attributes
273
274    unsigned long long     start_cycle;
275    unsigned long long     seq_end_cycle;
276    unsigned long long     para_end_cycle;
277
278    /////////////////////////
279    get_cycle( &start_cycle );
280 
281    // compute number of threads (one thread per core)
282    get_config( &x_size , &y_size , &ncores );
283    total_threads = x_size * y_size * ncores;
284
285    // get core coordinates and user index for the main thread
286    get_core( &main_cxy , & main_lid );
287    main_x   = HAL_X_FROM_CXY( main_cxy );
288    main_y   = HAL_Y_FROM_CXY( main_cxy );
289    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
290
291    // checks number of threads
292    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
293         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
294         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
295         (total_threads != 512) && (total_threads != 1024) )
296    {
297        printf("\n[sort error] number of cores must be power of 2\n");
298        exit( 0 );
299    }
300
301    // check array size
302    if ( ARRAY_LENGTH % total_threads) 
303    {
304        printf("\n[sort error] array size must be multiple of number of threads\n");
305        exit( 0 );
306    }
307
308    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
309    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
310
311    // initialize barrier
312    if( USE_DQT_BARRIER )
313    {
314        barrier_attr.x_size   = x_size; 
315        barrier_attr.y_size   = y_size;
316        barrier_attr.nthreads = ncores;
317        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
318    }
319    else // use SIMPLE_BARRIER
320    {
321        error = pthread_barrier_init( &barrier, NULL , total_threads );
322    }
323
324    if( error )
325    {
326        printf("\n[sort error] cannot initialise barrier\n" );
327        exit( 0 );
328    }
329
330#if VERBOSE
331printf("\n[sort] main completes barrier init\n");
332#endif
333
334    // Array to sort initialization
335    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
336    {
337        array0[n] = ARRAY_LENGTH - n - 1;
338    }
339
340#if VERBOSE
341printf("\n[sort] main completes array init\n");
342#endif
343
344    // launch other threads to execute sort() function
345    // on cores other than the core running the main thread
346    for ( x=0 ; x<x_size ; x++ )
347    {
348        for ( y=0 ; y<y_size ; y++ )
349        {
350            for ( lid=0 ; lid<ncores ; lid++ )
351            {
352                thread_uid = (((x * y_size) + y) * ncores) + lid;
353
354                // set sort arguments for all threads
355                arg[thread_uid].threads      = total_threads;
356                arg[thread_uid].thread_uid   = thread_uid;
357                arg[thread_uid].main_uid     = main_uid;
358
359                // set thread attributes for all threads
360                attr[thread_uid].attributes = PT_ATTR_DETACH          |
361                                              PT_ATTR_CLUSTER_DEFINED | 
362                                              PT_ATTR_CORE_DEFINED;
363                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
364                attr[thread_uid].lid        = lid;
365
366                if( thread_uid != main_uid )
367                {
368                    if ( pthread_create( &trdid,              // not used because no join
369                                         &attr[thread_uid],   // thread attributes
370                                         &sort,               // entry function
371                                         &arg[thread_uid] ) ) // sort arguments
372                    {
373                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
374                        exit( 0 );
375                    }
376                    else
377                    {
378#if VERBOSE
379printf("\n[sort] main created thread %x \n", thread_uid );
380#endif
381                    }
382                }
383            }
384        }
385    }
386   
387    ///////////////////////////
388    get_cycle( &seq_end_cycle );
389
390#if VERBOSE
391printf("\n[sort] main completes sequencial init at cycle %d\n",
392(unsigned int)seq_end_cycle );
393#endif
394
395#if INTERACTIVE_MODE
396idbg();
397#endif
398   
399    // the main thread run also the sort() function
400    sort( &arg[main_uid] );
401
402    ////////////////////////////
403    get_cycle( &para_end_cycle );
404
405    printf("\n[sort] main completes parallel sort at cycle %d\n", 
406    (unsigned int)para_end_cycle );
407
408    // destroy barrier
409    pthread_barrier_destroy( &barrier );
410
411#if INTERACTIVE_MODE
412idbg();
413#endif
414
415#if CHECK_RESULT
416   
417    int    success = 1;
418    int *  res_array = ( (total_threads ==   2) ||
419                         (total_threads ==   8) || 
420                         (total_threads ==  32) || 
421                         (total_threads == 128) || 
422                         (total_threads == 512) ) ? array1 : array0;
423
424    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
425    {
426        if ( res_array[n] > res_array[n+1] )
427        {
428            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
429            n , res_array[n] , n+1 , res_array[n+1] );
430            success = 0;
431            break;
432        }
433    }
434
435    if ( success ) printf("\n[sort] success\n");
436    else           printf("\n[sort] failure\n");
437
438#endif
439
440#if INSTRUMENTATION
441
442    char               name[64];
443    char               path[128];
444    unsigned long long instru_cycle;
445
446    // build a file name from n_items / n_clusters / n_cores
447    if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", 
448                          ARRAY_LENGTH, x_size * y_size, ncores );
449    else                  snprintf( name , 64 , "sort_smp_%d_%d_%d", 
450                          ARRAY_LENGTH, x_size * y_size, ncores );
451
452    // build file pathname
453    snprintf( path , 128 , "home/%s" , name );
454
455    // compute results
456    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
457    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
458
459    // display results on process terminal
460    printf("\n----- %s -----\n"
461           " - sequencial : %d cycles\n"
462           " - parallel   : %d cycles\n", 
463           name, sequencial, parallel );
464
465#if IDBG
466idbg();
467#endif
468
469    // open file
470    get_cycle( &instru_cycle );
471    FILE * stream = fopen( path , NULL );
472
473    if( stream == NULL )
474    {
475        printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
476        exit(0);
477    }
478
479    printf("\n[sort] file <%s> open at cycle %d\n", path, (unsigned int)instru_cycle );
480
481#if IDBG
482idbg();
483#endif
484
485    // register results to file
486    get_cycle( &instru_cycle );
487    int ret = fprintf( stream , "\n----- %s -----\n"
488                                " - sequencial : %d cycles\n"
489                                " - parallel   : %d cycles\n", name, sequencial, parallel );
490    if( ret < 0 )
491    {
492        printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
493        exit(0);
494    }
495
496    printf("\n[sort] file <%s> written at cycle %d\n", path, (unsigned int)instru_cycle );
497
498#if IDBG
499idbg();
500#endif
501
502    // close instrumentation file
503    get_cycle( &instru_cycle );
504    ret = fclose( stream );
505
506    if( ret )
507    {
508        printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
509        exit(0);
510    }
511
512    printf("\n[sort] file <%s> closed at cycle %d\n", path, (unsigned int)instru_cycle );
513
514#endif
515
516    exit( 0 );
517
518}  // end main()
519
520/*
521vim: tabstop=4 : shiftwidth=4 : expandtab
522*/
Note: See TracBrowser for help on using the repository browser.