#include #include #include "ipc.h" #include "pa2345.h" #include "lamport.h" #include "dist.h" #define MAX_LOOP_STR 1024 void child_phase0( dist_info_t *info, uint8_t id ) { lamport_init( &gl_lamport_time ); close_redundant_pipes( info, id ); } void child_phase1( dist_info_t *info, uint8_t id ) { pid_t child_pid = getpid(); char buf[MAX_MESSAGE_LEN]; uint8_t i; my_info_t me = { .id = id, .dist_info = info }; int size = snprintf( buf, sizeof(buf), log_started_fmt, get_lamport_time(), id, child_pid, info->parent_pid, 0 ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = STARTED, .s_local_time = get_lamport_time() } }; strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); for( i = 1; i <= info->x; i++ ) { if( i == me.id ) continue; if( !receive( &me, i, &msg ) ) { lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type != STARTED ) { fprintf( stderr, "Message type INVALID (not STARTED)!\n" ); break; } } } snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_lamport_time(), id ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); fflush( info->events_log ); } void child_phase2( dist_info_t *info, uint8_t id ) { my_info_t me = { .id = id, .dist_info = info }; if( info->mutex ) request_cs(&me); uint8_t i, loop_cnt; char buf[ MAX_LOOP_STR ]; loop_cnt = id * 5; for( i = 1; i <= loop_cnt; i++ ) { snprintf( buf, sizeof(buf), log_loop_operation_fmt, id, i, loop_cnt ); print( buf ); } if( info->mutex ) release_cs( &me ); } void child_phase3( dist_info_t *info, uint8_t id ) { char buf[MAX_MESSAGE_LEN]; my_info_t me = { .id = id, .dist_info = info }; int size = snprintf( buf, sizeof(buf), log_done_fmt, get_lamport_time(), id, 0 ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = DONE, .s_local_time = get_lamport_time() } }; strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); info->workers--; while( info->workers ) { if( !receive_any( &me, &msg ) ) { lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type == DONE ) info->workers--; } } snprintf( buf, sizeof(buf), log_received_all_done_fmt, get_lamport_time(), id ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); } void child_workflow( dist_info_t *info, uint8_t id ) { /* CHILD PHASE 0: Close all redundant descriptors */ child_phase0(info, id); /* CHILD PHASE 1: Send STARTED message for all, receive STARTED * messages from other childs */ child_phase1(info, id); /* CHILD PHASE 2: Do some useful work */ child_phase2(info, id); /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from * other childs and exit */ child_phase3(info, id); }