source: trunk/user/sort/sort.c @ 596

Last change on this file since 596 was 596, checked in by alain, 5 years ago

Fix a big bug in ksh: wrong handling of illegal command, blocking ksh.

  • Property svn:executable set to *
File size: 12.0 KB
RevLine 
[417]1///////////////////////////////////////////////////////////////////////////////
2// File   :  sort.c
3// Date   :  November 2013
4// Author :  Cesar Fuguet Tortolero <cesar.fuguet-tortolero@lip6.fr>
5///////////////////////////////////////////////////////////////////////////////
6// This multi-threaded application implement a multi-stage sort application.
7// The various stages are separated by synchronisation barriers.
8// There is one thread per physical cores.
9// Computation is organised as a binary tree:
10// - All threads execute in parallel a buble sort on a sub-array during the
11//   the first stage of parallel sort,
12// - The number of participating threads is divided by 2 at each next stage,
13//   to make a merge sort, on two subsets of previous stage.
14//
15//       Number_of_stages = number of barriers = log2(Number_of_threads)
16//
17// Constraints :
18// - It supports up to 1024 cores: x_size, y_size, and ncores must be
19//   power of 2 (max 16*16 clusters / max 4 cores per cluster)
20// _ The array of values to be sorted (ARRAY_LENGTH) must be power of 2
21//   larger than the number of cores.
22///////////////////////////////////////////////////////////////////////////////
23
24#include <stdio.h>
25#include <stdlib.h>
[596]26#include <unistd.h>
[417]27#include <pthread.h>
[445]28#include <almosmkh.h>
[459]29#include <hal_macros.h>
[417]30
[588]31#define ARRAY_LENGTH        0x1000    // 4096 values
[417]32
[588]33#define MAX_THREADS         1024       // 16 * 16 * 4
[440]34
35#define DISPLAY_ARRAY       0
[457]36#define INTERACTIVE_MODE    0
[440]37
[417]38/////////////////////////////////////////////////////////////
39// argument for the sort() function (one thread per core)
40/////////////////////////////////////////////////////////////
41
42typedef struct
43{
44    unsigned int threads;      // total number of threads
45    unsigned int thread_uid;    // thread user index (0 to threads -1)
46    unsigned int main_uid;      // main thread user index
47}
48args_t;
49
50//////////////////////////////////////////
51//      Global variables
52//////////////////////////////////////////
53
54int                 array0[ARRAY_LENGTH];    // values to sort
55int                 array1[ARRAY_LENGTH];   
56
57pthread_barrier_t   barrier;                 // synchronisation variables
58
[440]59pthread_attr_t      attr[MAX_THREADS];       // thread attributes (one per thread)
60args_t              arg[MAX_THREADS];        // sort function arguments (one per thread)
[417]61
62////////////////////////////////////
[588]63static void bubbleSort( int * array,
64                        unsigned int length,
65                        unsigned int init_pos )
[417]66{
[473]67    unsigned int i;
68    unsigned int j;
69    int          aux;
[417]70
71    for(i = 0; i < length; i++)
72    {
73        for(j = init_pos; j < (init_pos + length - i - 1); j++)
74        {
75            if(array[j] > array[j + 1])
76            {
77                aux          = array[j + 1];
78                array[j + 1] = array[j];
79                array[j]     = aux;
80            }
81        }
82    }
83}  // end bubbleSort()
84
85
[588]86///////////////////////////////////
87static void merge( const int * src,
88                   int       * dst,
89                   int         length,
90                   int         init_pos_src_a,
91                   int         init_pos_src_b,
92                   int         init_pos_dst )
[417]93{
94    int i;
95    int j;
96    int k;
97
98    i = 0;
99    j = 0;
100    k = init_pos_dst;
101
102    while((i < length) || (j < length))
103    {
104        if((i < length) && (j < length))
105        {
106            if(src[init_pos_src_a + i] < src[init_pos_src_b + j])
107            {
108                dst[k++] = src[init_pos_src_a + i];
109                i++;
110            }
111            else
112            {
113                dst[k++] = src[init_pos_src_b + j];
114                j++;
115            }
116        }
117        else if(i < length)
118        {
119            dst[k++] = src[init_pos_src_a + i];
120            i++;
121        }
122        else
123        {
124            dst[k++] = src[init_pos_src_b + j];
125            j++;
126        }
127    }
128}  // end merge()
129
[588]130//////////////////////////////////////
[504]131static void sort( const args_t * ptr )
[417]132{
133    unsigned int       i;
134    unsigned long long cycle;
[440]135    unsigned int       cxy;
136    unsigned int       lid;
[417]137
138    int         * src_array  = NULL;
139    int         * dst_array  = NULL;
140
[440]141    // get core coordinates an date
142    get_core( &cxy , &lid );
143    get_cycle( &cycle );
144
[417]145    unsigned int  thread_uid = ptr->thread_uid;
146    unsigned int  threads    = ptr->threads;
147    unsigned int  main_uid   = ptr->main_uid;
148
149    unsigned int  items      = ARRAY_LENGTH / threads;
150    unsigned int  stages     = __builtin_ctz( threads ) + 1;
151
[442]152    printf("\n[SORT] thread[%d] : start\n", thread_uid );
153
[417]154    bubbleSort( array0, items, items * thread_uid );
155
[445]156    printf("\n[SORT] thread[%d] : stage 0 completed\n", thread_uid );
[417]157
158    /////////////////////////////////
159    pthread_barrier_wait( &barrier ); 
[596]160    printf("\n[SORT] thread[%d] exit barrier 0\n", thread_uid );
[440]161
[417]162    // the number of threads contributing to sort
163    // is divided by 2 at each next stage
164    for ( i = 1 ; i < stages ; i++ )
165    {
166        pthread_barrier_wait( &barrier );
167
168        if( (thread_uid & ((1<<i)-1)) == 0 )
169        {
[445]170            printf("\n[SORT] thread[%d] : stage %d start\n", thread_uid , i );
[417]171
172            if((i % 2) == 1)               // odd stage
173            {
174                src_array = array0;
175                dst_array = array1;
176            }
177            else                           // even stage
178            {
179                src_array = array1;
180                dst_array = array0;
181            }
182
183            merge( src_array, 
184                   dst_array,
185                   items << i,
186                   items * thread_uid,
187                   items * (thread_uid + (1 << (i-1))),
188                   items * thread_uid );
189
[445]190            printf("\n[SORT] thread[%d] : stage %d completed\n", thread_uid , i );
[417]191        }
192
193        /////////////////////////////////
194        pthread_barrier_wait( &barrier );
[596]195        printf("\n[SORT] thread[%d] exit barrier %d\n", thread_uid , i );
[417]196
197    }
198
199    // all threads but the main thread exit
200    if( thread_uid != main_uid ) pthread_exit( NULL );
201
202} // end sort()
203
204
[574]205/////////////////
[475]206void main( void )
[417]207{
208    unsigned int           x_size;             // number of rows
209    unsigned int           y_size;             // number of columns
210    unsigned int           ncores;             // number of cores per cluster
211    unsigned int           threads;            // total number of threads
212    unsigned int           thread_uid;         // user defined thread index
213    unsigned int           main_cxy;           // cluster identifier for main
214    unsigned int           main_x;             // X coordinate for main thread
215    unsigned int           main_y;             // Y coordinate for main thread
216    unsigned int           main_lid;           // core local index for main thread
217    unsigned int           main_uid;           // thread user index for main thread
218    unsigned int           x;                  // X coordinate for a thread
219    unsigned int           y;                  // Y coordinate for a thread
220    unsigned int           lid;                // core local index for a thread
221    unsigned int           n;                  // index in array to sort
222    unsigned long long     cycle;              // current date for log
223    pthread_t              trdid;              // kernel allocated thread index (unused)
224    pthread_barrierattr_t  barrier_attr;       // barrier attributes
225
[596]226    // compute number of threads (one thread per core)
[417]227    get_config( &x_size , &y_size , &ncores );
228    threads = x_size * y_size * ncores;
229
230    // get core coordinates and user index for the main thread
231    get_core( &main_cxy , & main_lid );
[459]232    main_x   = HAL_X_FROM_CXY( main_cxy );
233    main_y   = HAL_Y_FROM_CXY( main_cxy );
[417]234    main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid; 
235
236    // checks number of threads
237    if ( (threads != 1)   && (threads != 2)   && (threads != 4)   && 
238         (threads != 8)   && (threads != 16 ) && (threads != 32)  && 
239         (threads != 64)  && (threads != 128) && (threads != 256) && 
240         (threads != 512) && (threads != 1024) )
241    {
242        printf("\n[SORT ERROR] number of cores must be power of 2\n");
[434]243        exit( 0 );
[417]244    }
245
246    // check array size
247    if ( ARRAY_LENGTH % threads) 
248    {
249        printf("\n[SORT ERROR] array size must be multiple of number of threads\n");
[434]250        exit( 0 );
[417]251    }
252
[588]253    printf("\n[SORT] main starts on core[%x,%d] / %d thread(s) / %d values / PID %x\n",
254    main_cxy, main_lid, threads, ARRAY_LENGTH, getpid() );
[417]255
256    // Barrier initialization
257    barrier_attr.x_size   = x_size; 
258    barrier_attr.y_size   = y_size;
259    barrier_attr.nthreads = ncores;
260    if( pthread_barrier_init( &barrier, &barrier_attr , threads ) )
261    {
262        printf("\n[SORT ERROR] cannot initialise barrier\n" );
[434]263        exit( 0 );
[417]264    }
265
[473]266    printf("\n[SORT] main completes barrier init\n");
[417]267
268    // Array to sort initialization
269    for ( n = 0 ; n < ARRAY_LENGTH ; n++ )
270    {
[596]271        array0[n] = ARRAY_LENGTH - n - 1;
[417]272    }
273
[440]274#if DISPLAY_ARRAY
[417]275printf("\n*** array before sort\n");
276for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
277#endif
278
[473]279    printf("\n[SORT] main completes array init\n");
[417]280
281    // launch other threads to execute sort() function
282    // on cores other than the core running the main thread
283    for ( x=0 ; x<x_size ; x++ )
284    {
285        for ( y=0 ; y<y_size ; y++ )
286        {
287            for ( lid=0 ; lid<ncores ; lid++ )
288            {
289                thread_uid = (((x * y_size) + y) * ncores) + lid;
290
291                // set sort arguments for all threads
292                arg[thread_uid].threads      = threads;
293                arg[thread_uid].thread_uid   = thread_uid;
294                arg[thread_uid].main_uid     = main_uid;
295
296                // set thread attributes for all threads
[574]297                attr[thread_uid].attributes = PT_ATTR_DETACH          |
298                                              PT_ATTR_CLUSTER_DEFINED | 
299                                              PT_ATTR_CORE_DEFINED;
[459]300                attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
[417]301                attr[thread_uid].lid        = lid;
302
303                if( thread_uid != main_uid )
304                {
305                    if ( pthread_create( &trdid,              // not used because no join
306                                         &attr[thread_uid],   // thread attributes
307                                         &sort,               // entry function
308                                         &arg[thread_uid] ) ) // sort arguments
309                    {
[442]310                        printf("\n[SORT ERROR] main cannot create thread %x \n", thread_uid );
[434]311                        exit( 0 );
[417]312                    }
[442]313                    else
314                    {
315                        printf("\n[SORT] main created thread %x \n", thread_uid );
316                    }
[440]317                }
[417]318            }
319        }
320    }
[442]321   
[417]322    get_cycle( &cycle );
[440]323    printf("\n[SORT] main completes threads create at cycle %d\n", (unsigned int)cycle );
[417]324
[442]325#if INTERACTIVE_MODE
326idbg();
327#endif
[441]328   
[445]329    // the main thread run also the sort() function
330    sort( &arg[main_uid] );
331
[417]332    // Check result
333    int    success = 1;
334    int*   res_array = ( (threads==  2) ||
335                         (threads==  8) || 
336                         (threads== 32) || 
337                         (threads==128) || 
338                         (threads==512) ) ? array1 : array0;
339   
[441]340    for( n=0 ; n<(ARRAY_LENGTH-2) ; n++ )
[417]341    {
342        if ( res_array[n] > res_array[n+1] )
343        {
[440]344            printf("\n[SORT] array[%d] = %d > array[%d] = %d\n",
345            n , res_array[n] , n+1 , res_array[n+1] );
[417]346            success = 0;
347            break;
348        }
349    }
350
[440]351#if DISPLAY_ARRAY
[417]352printf("\n*** array after sort\n");
353for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , res_array[n] );
354#endif
355
356    get_cycle( &cycle );
357
358    if ( success )
359    {
360        printf("\n[SORT] success at cycle %d\n", (unsigned int)cycle );
[434]361        exit( 0 );
[417]362    }
363    else
364    {
365        printf("\n[SORT] failure at cycle %d\n", (unsigned int)cycle );
[436]366        exit( 1 );
[417]367    }
368
369}  // end main()
370
371
372/*
373vim: tabstop=4 : shiftwidth=4 : expandtab
374*/
Note: See TracBrowser for help on using the repository browser.