source: trunk/kernel/kern/pipe.c @ 669

Last change on this file since 669 was 669, checked in by alain, 3 years ago

1) Introduce up to 4 command lines arguments in the KSH "load" command.
These arguments are transfered to the user process through the
argc/argv mechanism, using the user space "args" vseg.

2) Introduce the named and anonymous "pipes", for inter-process communication
through the pipe() and mkfifo() syscalls.

3) Introduce the "chat" application to validate the two above mechanisms.

4) Improve printk() and assert() fonctions in printk.c.

File size: 7.5 KB
Line 
1/*
2 * pipe.c - single writer, single reader pipe implementation           
3 *
4 * Author     Alain Greiner (2016,2017,2018,2019,2020)
5 *
6 * Copyright (c) UPMC Sorbonne Universites
7 *
8 * This file is part of ALMOS-MKH.
9 *
10 * ALMOS-MKH is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; version 2.0 of the License.
13 *
14 * ALMOS-MKH is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
21 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24#include <kernel_config.h>
25#include <hal_kernel_types.h>
26#include <printk.h>
27#include <thread.h>
28#include <pipe.h>
29
30///////////////////////////////////
31pipe_t * pipe_create( cxy_t    cxy,
32                      uint32_t size )
33{
34    kmem_req_t     req;
35    remote_buf_t * buf;
36    pipe_t       * pipe;
37    error_t        error;
38
39    // 2. allocate memory for remote_buf descriptor
40    buf = remote_buf_alloc( cxy );
41
42    if( buf == NULL )
43    {
44       return NULL;
45    }
46
47    // 1. initialize it, and allocate memory for associated data buffer
48    error = remote_buf_init( XPTR( cxy , buf ),
49                             bits_log2( size ) );
50    if( error )
51    {
52        remote_buf_destroy( XPTR( cxy , buf ) );
53        return NULL;
54    }
55
56    // 3. allocate memory for pipe descriptor
57    req.type  = KMEM_KCM;
58    req.order = bits_log2( sizeof(pipe_t) );
59    req.flags = AF_ZERO;
60    pipe = kmem_remote_alloc( cxy , &req );
61
62    if( pipe == NULL )
63    {
64        remote_buf_destroy( XPTR( cxy , buf ) );
65        return NULL;
66    }
67
68    // initialise it
69    hal_remote_spt( XPTR( cxy , &pipe->buffer ) , buf );
70
71    return pipe;
72
73}  // end pipe_create()
74
75///////////////////////////////////
76void pipe_destroy( xptr_t pipe_xp )
77{
78    kmem_req_t req;
79
80    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
81    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
82
83    // get pointer on remote_buf descriptor
84    remote_buf_t * buf = hal_remote_lpt( XPTR( pipe_cxy , &pipe_ptr->buffer ));
85
86    // release remote_buf descriptor and data buffer
87    remote_buf_destroy( XPTR( pipe_cxy , buf ) );
88
89    // release pipe descriptor
90    req.type = KMEM_KCM;
91    req.ptr  = pipe_ptr;
92    kmem_remote_free( pipe_cxy , &req );
93
94}  // end pipe_destroy()
95
96
97//////////////////////////////////////////
98void pipe_register_writer( xptr_t pipe_xp,
99                           xptr_t thread_xp )
100{
101    // get cluster and local pointer on pipe descriptor
102    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
103    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
104
105    // update "writer_xp" field
106    hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->writer_xp ) , thread_xp );
107
108}  // end pipe_register_writer()
109
110//////////////////////////////////////////
111void pipe_register_reader( xptr_t pipe_xp,
112                           xptr_t thread_xp )
113{
114    // get cluster and local pointer on pipe descriptor
115    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
116    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
117
118    // update "reader_xp" field
119    hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->reader_xp ) , thread_xp );
120
121}  // end pipe_register_reader()
122
123///////////////////////////////////////
124int pipe_user_move( bool_t   to_buffer,
125                    xptr_t   file_xp,
126                    void   * u_buf,
127                    uint32_t size )
128{
129    int        nbytes;      // number of bytes actually moved
130
131    thread_t * this  = CURRENT_THREAD;
132   
133assert( __FUNCTION__, ( file_xp != XPTR_NULL ) , "file_xp == XPTR_NULL" );
134
135#if DEBUG_PIPE_MOVE
136uint32_t   cycle = (uint32_t)hal_get_cycles();
137if( DEBUG_PIPE_MOVE < cycle )
138printk("\n[%s] thread[%x,%x] enter / to_user %d / cycle %d\n",
139__FUNCTION__, this->process->pid, this->trdid, to_buffer, cycle );
140#endif
141
142    // get cluster and local pointer on file descriptor
143    cxy_t        file_cxy = GET_CXY( file_xp );
144    vfs_file_t * file_ptr = GET_PTR( file_xp );
145
146    // get local pointer on remote pipe, and extended pointers on reader & writer
147    pipe_t * pipe_ptr = hal_remote_lpt( XPTR( file_cxy , &file_ptr->pipe ));
148    xptr_t reader_xp  = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->reader_xp ));
149    xptr_t writer_xp  = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->writer_xp ));
150
151assert( __FUNCTION__ , (pipe_ptr != NULL)       , "pipe_ptr cannot be NULL" );
152assert( __FUNCTION__ , (reader_xp != XPTR_NULL) , "reader_xp cannot be NULL" );
153assert( __FUNCTION__ , (writer_xp != XPTR_NULL) , "writer_xp cannot be NULL" );
154
155    // get pointers on associated remote_buf
156    remote_buf_t * buf_ptr = hal_remote_lpt( XPTR( file_cxy , &pipe_ptr->buffer ));
157    xptr_t         buf_xp  = XPTR( file_cxy , buf_ptr );
158
159    ///////////////// pipe read
160    if( to_buffer )
161    {
162
163assert( __FUNCTION__, (reader_xp == XPTR(local_cxy , this)) , "illegal reader thread" );
164
165        // wait remote_buf_t non empty
166        while( 1 )
167        {
168            // get remote_buf status
169            int status = (int)remote_buf_status( buf_xp );
170
171            // move data when remote_buf non empty
172            if( status )   
173            {
174                // compute min(status , count)
175                nbytes = (status < (int)size) ? status : (int)size;
176
177                // move nbytes to user buffer
178                remote_buf_get_to_user( buf_xp,
179                                        u_buf,
180                                        nbytes );
181
182                // unblock writer thread
183                thread_unblock( writer_xp , THREAD_BLOCKED_IO );
184
185                // exit loop
186                break; 
187            }
188            else  // block and deschedule reader thread when remote_buf empty
189            {
190                thread_block( reader_xp , THREAD_BLOCKED_IO );
191                sched_yield( "pipe empty" );
192            }
193        }  // end while
194    }
195    ////////////////// pipe write
196    else
197    {
198
199assert( __FUNCTION__, (writer_xp == XPTR(local_cxy , this)) , "illegal writer thread" );
200
201        // wait remote_buf_t non full
202        while( 1 )
203        {
204            // get remote_buf status
205            int status = (int)remote_buf_status( buf_xp );
206
207            // get remote_buf length
208            int buflen  = 1 << hal_remote_l32( XPTR( file_cxy , &buf_ptr->order )); 
209
210            // move data when remote_buf non full
211            if( status < buflen )   
212            {
213                // compute min( buflen - status , size)
214                nbytes = ((buflen - status) < (int)size) ? (buflen - status) : (int)size;
215
216                // move nbytes from user buffer
217                remote_buf_put_from_user( buf_xp,
218                                          u_buf,
219                                          nbytes );
220
221                // unblock reader thread
222                thread_unblock( reader_xp , THREAD_BLOCKED_IO );
223
224                // exit loop
225                break; 
226            }
227            else  // block and deschedule writer thread when remote_buf full
228            {
229                thread_block( writer_xp , THREAD_BLOCKED_IO );
230                sched_yield( "pipe full" );
231            }
232        }  // end while
233    }
234
235#if DEBUG_PIPE_MOVE
236cycle = (uint32_t)hal_get_cycles();
237if( DEBUG_PIPE_MOVE < cycle )
238printk("\n[%s] thread[%x,%x] exit / to_user %d / nbytes %d / cycle %d\n",
239__FUNCTION__, this->process->pid, this->trdid, to_buffer, nbytes, cycle );
240#endif
241
242    return nbytes;
243
244}  // end pipe_user_move()
245
246
247
248
Note: See TracBrowser for help on using the repository browser.