source: trunk/user/sort/sort.c @ 625

Last change on this file since 625 was 625, checked in by alain, 5 years ago

Fix a bug in the vmm_remove_vseg() function: the physical pages
associated to an user DATA vseg were released to the kernel when
the target process descriptor was in the reference cluster.
This physical pages release should be done only when the page
forks counter value is zero.
All other modifications are cosmetic.

  • Property svn:executable set to *
File size: 15.4 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <pthread.h>
28#include <almosmkh.h>
29#include <hal_macros.h>
30
31#define ARRAY_LENGTH        256        // number of items
32#define MAX_THREADS         1024       // 16 * 16 * 4
33
34#define USE_DQT_BARRIER     1          // use DQT barrier if non zero
35#define DISPLAY_ARRAY       0          // display items values before and after
36#define VERBOSE             0          // for debug
37#define INTERACTIVE_MODE    0          // for debug
38#define CHECK_RESULT        0          // for debug
39#define INSTRUMENTATION     1          // register computation times on file
40
41/////////////////////////////////////////////////////////////
42// argument for the sort() function (one thread per core)
43/////////////////////////////////////////////////////////////
44
45typedef struct
46{
47    unsigned int threads;       // total number of threads
48    unsigned int thread_uid;    // thread user index (0 to threads -1)
49    unsigned int main_uid;      // main thread user index
50}
51args_t;
52
53//////////////////////////////////////////
54//      Global variables
55//////////////////////////////////////////
56
57int                 array0[ARRAY_LENGTH];    // values to sort
58int                 array1[ARRAY_LENGTH];   
59
60pthread_barrier_t   barrier;                 // synchronisation variables
61
62pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
63args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
64
65////////////////////////////////////
66static void bubbleSort( int * array,
67                        unsigned int length,
68                        unsigned int init_pos )
69{
70    unsigned int i;
71    unsigned int j;
72    int          aux;
73
74    for(i = 0; i < length; i++)
75    {
76        for(j = init_pos; j < (init_pos + length - i - 1); j++)
77        {
78            if(array[j] > array[j + 1])
79            {
80                aux          = array[j + 1];
81                array[j + 1] = array[j];
82                array[j]     = aux;
83            }
84        }
85    }
86}  // end bubbleSort()
87
88
89///////////////////////////////////
90static void merge( const int * src,               // source array
91                   int       * dst,               // destination array
92                   int         length,            // number of items in a subset
93                   int         init_pos_src_a,    // index first item in src subset A
94                   int         init_pos_src_b,    // index first item in src subset B
95                   int         init_pos_dst )     // index first item in destination
96{
97    int i;
98    int j;
99    int k;
100
101    i = 0;
102    j = 0;
103    k = init_pos_dst;
104
105    while((i < length) || (j < length))
106    {
107        if((i < length) && (j < length))
108        {
109            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
110            {
111                dst[k++] = src[init_pos_src_a + i];
112                i++;
113            }
114            else
115            {
116                dst[k++] = src[init_pos_src_b + j];
117                j++;
118            }
119        }
120        else if(i < length)
121        {
122            dst[k++] = src[init_pos_src_a + i];
123            i++;
124        }
125        else
126        {
127            dst[k++] = src[init_pos_src_b + j];
128            j++;
129        }
130    }
131}  // end merge()
132
133//////////////////////////////////////
134static void sort( const args_t * ptr )
135{
136    unsigned int       i;
137    unsigned long long cycle;
138    unsigned int       cxy;
139    unsigned int       lid;
140
141    int              * src_array  = NULL;
142    int              * dst_array  = NULL;
143
144    // get core coordinates an date
145    get_core( &cxy , &lid );
146    get_cycle( &cycle );
147
148    unsigned int  thread_uid = ptr->thread_uid;
149    unsigned int  threads    = ptr->threads;
150    unsigned int  main_uid   = ptr->main_uid;
151
152#if DISPLAY_ARRAY
153unsigned int n;
154if( thread_uid == main_uid )
155{
156    printf("\n*** array before sort\n");
157    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
158}
159#endif
160
161    /////////////////////////////////
162    pthread_barrier_wait( &barrier ); 
163
164#if VERBOSE
165printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
166#endif
167
168    unsigned int  items      = ARRAY_LENGTH / threads;
169    unsigned int  stages     = __builtin_ctz( threads ) + 1;
170
171#if VERBOSE
172printf("\n[sort] thread[%d] : start\n", thread_uid );
173#endif
174
175    bubbleSort( array0, items, items * thread_uid );
176
177#if VERBOSE
178printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
179#endif
180
181    /////////////////////////////////
182    pthread_barrier_wait( &barrier ); 
183
184#if VERBOSE
185printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
186#endif
187
188#if DISPLAY_ARRAY
189if( thread_uid == main_uid )
190{
191    printf("\n*** array after bubble sort\n");
192    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
193}
194#endif
195
196    // the number of threads contributing to sort is divided by 2
197    // and the number of items is multiplied by 2 at each next stage
198    for ( i = 1 ; i < stages ; i++ )
199    {
200        if((i % 2) == 1)               // odd stage
201        {
202            src_array = array0;
203            dst_array = array1;
204        }
205        else                           // even stage
206        {
207            src_array = array1;
208            dst_array = array0;
209        }
210
211        if( (thread_uid & ((1<<i)-1)) == 0 )
212        {
213
214#if VERBOSE
215printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
216#endif
217            merge( src_array, 
218                   dst_array,
219                   items << (i-1),
220                   items * thread_uid,
221                   items * (thread_uid + (1 << (i-1))),
222                   items * thread_uid );
223
224#if VERBOSE
225printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
226#endif
227        }
228
229        /////////////////////////////////
230        pthread_barrier_wait( &barrier );
231
232#if VERBOSE
233printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
234#endif
235
236#if DISPLAY_ARRAY
237if( thread_uid == main_uid )
238{
239    printf("\n*** array after merge %d\n", i );
240    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
241}
242#endif
243
244    }  // en for stages
245
246    // all threads but the main thread exit
247    if( thread_uid != main_uid ) pthread_exit( NULL );
248
249} // end sort()
250
251
252/////////////////
253void main( void )
254{
255    int                    error;
256    unsigned int           x_size;             // number of rows
257    unsigned int           y_size;             // number of columns
258    unsigned int           ncores;             // number of cores per cluster
259    unsigned int           total_threads;      // total number of threads
260    unsigned int           thread_uid;         // user defined thread index
261    unsigned int           main_cxy;           // cluster identifier for main
262    unsigned int           main_x;             // X coordinate for main thread
263    unsigned int           main_y;             // Y coordinate for main thread
264    unsigned int           main_lid;           // core local index for main thread
265    unsigned int           main_uid;           // thread user index for main thread
266    unsigned int           x;                  // X coordinate for a thread
267    unsigned int           y;                  // Y coordinate for a thread
268    unsigned int           lid;                // core local index for a thread
269    unsigned int           n;                  // index in array to sort
270    pthread_t              trdid;              // kernel allocated thread index (unused)
271    pthread_barrierattr_t  barrier_attr;       // barrier attributes
272
273    unsigned long long     start_cycle;
274    unsigned long long     seq_end_cycle;
275    unsigned long long     para_end_cycle;
276
277    /////////////////////////
278    get_cycle( &start_cycle );
279 
280    // compute number of threads (one thread per core)
281    get_config( &x_size , &y_size , &ncores );
282    total_threads = x_size * y_size * ncores;
283
284    // get core coordinates and user index for the main thread
285    get_core( &main_cxy , & main_lid );
286    main_x   = HAL_X_FROM_CXY( main_cxy );
287    main_y   = HAL_Y_FROM_CXY( main_cxy );
288    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
289
290    // checks number of threads
291    if ( (total_threads != 1)   && (total_threads != 2)   && (total_threads != 4)   && 
292         (total_threads != 8)   && (total_threads != 16 ) && (total_threads != 32)  && 
293         (total_threads != 64)  && (total_threads != 128) && (total_threads != 256) && 
294         (total_threads != 512) && (total_threads != 1024) )
295    {
296        printf("\n[sort error] number of cores must be power of 2\n");
297        exit( 0 );
298    }
299
300    // check array size
301    if ( ARRAY_LENGTH % total_threads) 
302    {
303        printf("\n[sort error] array size must be multiple of number of threads\n");
304        exit( 0 );
305    }
306
307    printf("\n[sort] main starts / %d threads / %d items / pid %x / cycle %d\n",
308    total_threads, ARRAY_LENGTH, getpid(), (unsigned int)start_cycle );
309
310    // initialize barrier
311    if( USE_DQT_BARRIER )
312    {
313        barrier_attr.x_size   = x_size; 
314        barrier_attr.y_size   = y_size;
315        barrier_attr.nthreads = ncores;
316        error = pthread_barrier_init( &barrier, &barrier_attr , total_threads );
317    }
318    else // use SIMPLE_BARRIER
319    {
320        error = pthread_barrier_init( &barrier, NULL , total_threads );
321    }
322
323    if( error )
324    {
325        printf("\n[sort error] cannot initialise barrier\n" );
326        exit( 0 );
327    }
328
329#if VERBOSE
330printf("\n[sort] main completes barrier init\n");
331#endif
332
333    // Array to sort initialization
334    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
335    {
336        array0[n] = ARRAY_LENGTH - n - 1;
337    }
338
339#if VERBOSE
340printf("\n[sort] main completes array init\n");
341#endif
342
343    // launch other threads to execute sort() function
344    // on cores other than the core running the main thread
345    for ( x=0 ; x<x_size ; x++ )
346    {
347        for ( y=0 ; y<y_size ; y++ )
348        {
349            for ( lid=0 ; lid<ncores ; lid++ )
350            {
351                thread_uid = (((x * y_size) + y) * ncores) + lid;
352
353                // set sort arguments for all threads
354                arg[thread_uid].threads      = total_threads;
355                arg[thread_uid].thread_uid   = thread_uid;
356                arg[thread_uid].main_uid     = main_uid;
357
358                // set thread attributes for all threads
359                attr[thread_uid].attributes = PT_ATTR_DETACH          |
360                                              PT_ATTR_CLUSTER_DEFINED | 
361                                              PT_ATTR_CORE_DEFINED;
362                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
363                attr[thread_uid].lid        = lid;
364
365                if( thread_uid != main_uid )
366                {
367                    if ( pthread_create( &trdid,              // not used because no join
368                                         &attr[thread_uid],   // thread attributes
369                                         &sort,               // entry function
370                                         &arg[thread_uid] ) ) // sort arguments
371                    {
372                        printf("\n[sort error] main cannot create thread %x \n", thread_uid );
373                        exit( 0 );
374                    }
375                    else
376                    {
377#if VERBOSE
378printf("\n[sort] main created thread %x \n", thread_uid );
379#endif
380                    }
381                }
382            }
383        }
384    }
385   
386    ///////////////////////////
387    get_cycle( &seq_end_cycle );
388
389#if VERBOSE
390printf("\n[sort] main completes sequencial init at cycle %d\n",
391(unsigned int)seq_end_cycle );
392#endif
393
394#if INTERACTIVE_MODE
395idbg();
396#endif
397   
398    // the main thread run also the sort() function
399    sort( &arg[main_uid] );
400
401    ////////////////////////////
402    get_cycle( &para_end_cycle );
403
404    printf("\n[sort] main completes parallel sort at cycle %d\n", 
405    (unsigned int)para_end_cycle );
406
407    // destroy barrier
408    pthread_barrier_destroy( &barrier );
409
410#if INTERACTIVE_MODE
411idbg();
412#endif
413
414#if CHECK_RESULT
415   
416    int    success = 1;
417    int *  res_array = ( (total_threads ==   2) ||
418                         (total_threads ==   8) || 
419                         (total_threads ==  32) || 
420                         (total_threads == 128) || 
421                         (total_threads == 512) ) ? array1 : array0;
422
423    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
424    {
425        if ( res_array[n] > res_array[n+1] )
426        {
427            printf("\n[sort] array[%d] = %d > array[%d] = %d\n",
428            n , res_array[n] , n+1 , res_array[n+1] );
429            success = 0;
430            break;
431        }
432    }
433
434    if ( success ) printf("\n[sort] success\n");
435    else           printf("\n[sort] failure\n");
436
437#endif
438
439#if INSTRUMENTATION
440
441    char   name[64];
442    char   path[128];
443
444    // build a file name from n_items / n_clusters / n_cores
445    if( USE_DQT_BARRIER ) snprintf( name , 64 , "sort_dqt_%d_%d_%d", 
446                          ARRAY_LENGTH, x_size * y_size, ncores );
447    else                  snprintf( name , 64 , "sort_smp_%d_%d_%d", 
448                          ARRAY_LENGTH, x_size * y_size, ncores );
449
450    // build file pathname
451    snprintf( path , 128 , "home/%s" , name );
452
453    // compute results
454    unsigned int sequencial = (unsigned int)(seq_end_cycle - start_cycle);
455    unsigned int parallel   = (unsigned int)(para_end_cycle - seq_end_cycle);
456
457    // display results on process terminal
458    printf("\n----- %s -----\n"
459           " - sequencial : %d cycles\n"
460           " - parallel   : %d cycles\n", 
461           name, sequencial, parallel );
462
463    // open file
464    FILE * stream = fopen( path , NULL );
465    if( stream == NULL )
466    {
467        printf("\n[sort error] cannot open instrumentation file <%s>\n", name );
468        exit(0);
469    }
470
471    printf("\n[sort] file %s successfully open\n", path);
472
473    // register results to file
474    int ret = fprintf( stream , "\n----- %s -----\n"
475                                " - sequencial : %d cycles\n"
476                                " - parallel   : %d cycles\n", name, sequencial, parallel );
477    if( ret < 0 )
478    {
479        printf("\n[sort error] cannot write to instrumentation file <%s>\n", name );
480        exit(0);
481    }
482
483    printf("\n[sort] file %s successfully written\n", path);
484
485    // close instrumentation file
486
487    if( fclose( stream ) )
488    {
489        printf("\n[sort error] cannot close the file <%s>\n", name );
490        exit(0);
491    }
492
493    printf("\n[sort] file %s successfully closed\n", path);
494
495#endif
496
497    exit( 0 );
498
499}  // end main()
500
501/*
502vim: tabstop=4 : shiftwidth=4 : expandtab
503*/
Note: See TracBrowser for help on using the repository browser.