diff options
-rw-r--r-- | Makefile | 6 | ||||
-rw-r--r-- | child.c | 154 | ||||
-rw-r--r-- | dist.h | 5 | ||||
-rw-r--r-- | ipc.c | 10 | ||||
-rw-r--r-- | lamport.c | 2 | ||||
-rw-r--r-- | lamport.h | 4 | ||||
-rw-r--r-- | pa3.c | 49 | ||||
-rw-r--r-- | parent.c | 59 |
8 files changed, 78 insertions, 211 deletions
@@ -1,13 +1,13 @@ CFLAGS = -std=c99 -Werror -Wall -pedantic CC = clang -all: pa3 +all: pa4 -pa3: lamport.o bank_robbery.o banking.o pa3.o parent.o child.o ipc.o dist.o +pa4: cs.o queue.o lamport.o pa3.o parent.o child.o ipc.o dist.o $(CC) -o $@ $^ -Llib64 -lruntime %.o: %.c $(CC) -c -o $@ $< $(CFLAGS) clean: - rm -f *.o pa3 + rm -f *.o pa4 @@ -1,13 +1,14 @@ #include <stdint.h> #include <string.h> -#include "banking.h" #include "ipc.h" #include "pa2345.h" #include "lamport.h" #include "dist.h" +#define MAX_LOOP_STR 1024 + void child_phase0( dist_info_t *info, uint8_t id ) { lamport_init( &gl_lamport_time ); close_redundant_pipes( info, id ); @@ -26,7 +27,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) { int size = snprintf( buf, sizeof(buf), log_started_fmt, get_lamport_time(), id, child_pid, info->parent_pid, - info->balances[ id ] ); + 0 ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); @@ -64,104 +65,30 @@ void child_phase1( dist_info_t *info, uint8_t id ) { fflush( info->events_log ); } -void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { - Message msg; - BalanceState balanceState; - char buf[MAX_MESSAGE_LEN]; - +void child_phase2( dist_info_t *info, uint8_t id ) { my_info_t me = { .id = id, .dist_info = info }; - uint8_t i; - uint8_t last_order_time = 0; - - balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = last_order_time; - balanceState.s_balance_pending_in = 0; + if( info->mutex ) + request_cs(&me); + + uint8_t i, loop_cnt; + char buf[ MAX_LOOP_STR ]; - history->s_history[ last_order_time ] = balanceState; - history->s_history_len = 1; - - int running = 1; - while( running ) { - if( !receive_any( &me, &msg ) ) { - lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - switch( msg.s_header.s_type ) { - case TRANSFER: { - TransferOrder *order = (TransferOrder *) msg.s_payload; - if( order->s_src == id ) { - lamport_increase( &gl_lamport_time ); - - info->balances[ id ] -= order->s_amount; - int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in; - balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = get_lamport_time(); - balanceState.s_balance_pending_in = oldBp + order->s_amount; - - msg.s_header.s_local_time = get_lamport_time(); - send( &me, order->s_dst, &msg ); - - snprintf( buf, sizeof(buf), log_transfer_out_fmt, - get_lamport_time(), order->s_src, - order->s_amount, order->s_dst ); - fputs( buf, info->events_log ); - fflush( info->events_log ); - fputs( buf, stdout ); - } else if( order->s_dst == id ) { - lamport_increase( &gl_lamport_time ); - - info->balances[ id ] += order->s_amount; - int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in; - - balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = get_lamport_time(); - balanceState.s_balance_pending_in = oldBp - order->s_amount; - - Message ackMsg = { - .s_header = { - .s_magic = MESSAGE_MAGIC, - .s_payload_len = 0, - .s_type = ACK, - .s_local_time = get_lamport_time() - } - }; - send( &me, PARENT_ID, &ackMsg ); - - snprintf( buf, sizeof(buf), log_transfer_in_fmt, - get_lamport_time(), order->s_dst, - order->s_amount, order->s_src); - fputs( buf, info->events_log ); - fflush( info->events_log ); - fputs( buf, stdout ); - } - - int cur_time = get_lamport_time(); - - history->s_history[ cur_time ] = balanceState; - history->s_history_len = cur_time + 1; - - for( i = last_order_time + 1; i < cur_time; i++ ) { - balanceState = history->s_history[ i-1 ]; - balanceState.s_time = i; - history->s_history[ i ] = balanceState; - } - - last_order_time = cur_time; - break; - } - case STOP: - running = 0; - break; - } - } + loop_cnt = id * 5; + for( i = 1; i <= loop_cnt; i++ ) { + snprintf( buf, sizeof(buf), log_loop_operation_fmt, id, i, loop_cnt ); + print( buf ); } + + if( info->mutex ) + release_cs( &me ); } -void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { +void child_phase3( dist_info_t *info, uint8_t id ) { char buf[MAX_MESSAGE_LEN]; - uint8_t i; my_info_t me = { .id = id, @@ -170,7 +97,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { int size = snprintf( buf, sizeof(buf), log_done_fmt, get_lamport_time(), id, - info->balances[ id ] ); + 0 ); fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); @@ -187,17 +114,13 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { }; strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); + info->workers--; - for( i = 1; i <= info->x; i++ ) { - if( i == me.id ) - continue; - - if( !receive( &me, i, &msg ) ) { + while( info->workers ) { + if( !receive_any( &me, &msg ) ) { lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - if( msg.s_header.s_type != DONE ) { - fprintf( stderr, "Message type INVALID (not DONE)!\n" ); - break; - } + if( msg.s_header.s_type == DONE ) + info->workers--; } } @@ -206,36 +129,9 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { fputs( buf, info->events_log ); fflush( info->events_log ); fputs( buf, stdout ); - - int time = get_lamport_time(); - for( i = history->s_history_len; i <= time; i++ ) { - history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance; - history->s_history[i].s_balance_pending_in = history->s_history[ history->s_history_len - 1].s_balance_pending_in; - history->s_history[i].s_time = i; - } - - history->s_history_len = time+1; - - lamport_increase( &gl_lamport_time ); - - int wr_size = history->s_history_len * sizeof(history->s_history[0]); - Message historyMsg = { - .s_header = { - .s_magic = MESSAGE_MAGIC, - .s_payload_len = wr_size, - .s_type = BALANCE_HISTORY, - .s_local_time = get_lamport_time() - } - }; - memcpy( historyMsg.s_payload, history, wr_size ); - send( &me, PARENT_ID, &historyMsg ); } void child_workflow( dist_info_t *info, uint8_t id ) { - BalanceHistory history = { - .s_id = id - }; - /* CHILD PHASE 0: Close all redundant descriptors */ child_phase0(info, id); @@ -244,9 +140,9 @@ void child_workflow( dist_info_t *info, uint8_t id ) { child_phase1(info, id); /* CHILD PHASE 2: Do some useful work */ - child_phase2(info, id, &history); + child_phase2(info, id); /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from * other childs and exit */ - child_phase3(info, id, &history); + child_phase3(info, id); } @@ -2,7 +2,6 @@ #define __IFMO_DISTRIBUTED_CLASS_DIST__H #include "pa3.h" -#include "banking.h" #define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 ) @@ -10,7 +9,8 @@ typedef struct { int pipes[MAX_PIPES][2]; uint8_t proccnt; uint8_t x; - balance_t balances[MAX_X]; + int mutex; + uint8_t workers; FILE* events_log; FILE* pipes_log; @@ -20,6 +20,7 @@ typedef struct { typedef struct { int id; dist_info_t *dist_info; + int msg_author; } my_info_t; void close_redundant_pipes( dist_info_t *info, uint8_t id ); @@ -99,6 +99,7 @@ int receive_any(void * self, Message * msg) { uint8_t i, j = 0; int *descriptors = (int *)malloc( sizeof(int) * (me->dist_info->proccnt - 1) ); + int *senders = (int *)malloc( sizeof(int) * (me->dist_info->proccnt - 1) ); for(i = 0; i < me->dist_info->proccnt; i++) { // don't see at pipes as ( ID -> Y ) if( me->id == i ) @@ -110,7 +111,9 @@ int receive_any(void * self, Message * msg) { // index number in block of pipes for ( X -> ID ) uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id ); - descriptors[j++] = me->dist_info->pipes[idx][0]; + descriptors[j] = me->dist_info->pipes[idx][0]; + senders[j] = i; + j++; } char buf[MAX_MESSAGE_LEN]; @@ -123,6 +126,10 @@ int receive_any(void * self, Message * msg) { memcpy( msgbuf, buf, size_hdr ); size_payload = read( descriptors[i], buf, msg->s_header.s_payload_len ); memcpy( msgbuf + size_hdr, buf, size_payload ); + + me->msg_author = senders[i]; + + free( senders ); free( descriptors ); return 0; } @@ -130,6 +137,7 @@ int receive_any(void * self, Message * msg) { usleep(100000); // sleep 100 ms } + free( senders ); free( descriptors ); return 1; } @@ -1,5 +1,5 @@ #include "lamport.h" -#include "banking.h" +#include "ipc.h" timestamp_t gl_lamport_time; // TODO: remove global variable with Lamport time @@ -1,10 +1,10 @@ #ifndef __IFMO_DISTRIBUTED_CLASS_LAMPORT__H #define __IFMO_DISTRIBUTED_CLASS_LAMPORT__H - -#include "banking.h" +#include "ipc.h" extern timestamp_t gl_lamport_time; +timestamp_t get_lamport_time(); void lamport_message_handler( timestamp_t *lamport_time, timestamp_t local_time); void lamport_increase( timestamp_t *lamport_time ); @@ -2,6 +2,7 @@ #include <fcntl.h> /* Obtain O_* constant definitions */ #include <unistd.h> +#include <getopt.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -13,54 +14,62 @@ #include "parent.h" #include "common.h" -#include "banking.h" static char* app_name; void usage( FILE* output ) { - fprintf(output, "usage: %s -p <X> <BAL1> <BAL2> ... <BALX>\n", app_name); + fprintf(output, "usage: %s [--mutexl] -p <X>\n", app_name); fprintf(output, "`X` must be int between 1 and %d\n", MAX_X); } int main(int argc, char* argv[]) { uint8_t i; + int opt; dist_info_t info; app_name = argv[0]; - if( argc < 3 || strcmp( argv[1], "-p") != 0 ) { - usage(stdout); - return 1; - } + struct option long_options[] = { + {"mutexl", no_argument, &info.mutex, 1}, + {0, 0, 0, 0} + }; + int option_index; - // fill info about distributed network - info.x = atoi(argv[2]); - if( info.x < 0 || info.x > MAX_X ) { - usage(stdout); - return 1; + info.x = 0; + while ((opt = getopt_long(argc, argv, "p:", long_options, &option_index)) != -1) { + switch (opt) { + case 0: + break; + case 'p': + info.x = atoi(optarg); + break; + default: /* '?' */ + usage(stdout); + exit(EXIT_FAILURE); + } } - if( argc != (3 + info.x) ) { + if( info.x <= 0 || info.x > MAX_X ) { usage(stdout); - return 1; - } - - for( i = 1; i <= info.x; i++ ) { - int balance = atoi(argv[3 + i - 1]); - info.balances[ i ] = balance; + exit(EXIT_FAILURE); } info.proccnt = info.x + 1; + info.workers = info.x; info.parent_pid = getpid(); info.events_log = fopen( events_log, "w" ); info.pipes_log = fopen( pipes_log, "w" ); + if( !info.pipes_log || !info.events_log ) { + fprintf( stderr, "Events log or pipes log can't be open" ); + return 1; + } + // open all (N * (N-1)) pipes for a full mesh topology for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) { pipe2( info.pipes[i], O_NONBLOCK ); fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] ); } fclose( info.pipes_log ); - for( i = 1; i < info.proccnt; i++ ) { pid_t pid = fork(); switch(pid) { @@ -77,7 +86,7 @@ int main(int argc, char* argv[]) { break; } } - + parent_workflow( &info ); return 0; } @@ -5,47 +5,8 @@ #include "dist.h" #include "ipc.h" -#include "banking.h" #include "lamport.h" -void parent_phase4( my_info_t *me, dist_info_t *info ) { - Message msg; - BalanceHistory *b_history; - AllHistory history; - - uint8_t i; - for( i = 1; i <= info->x; i++ ) { - if( !receive( me, i, &msg ) ) { - lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - //TODO: what if s_payload_len more than s_history[i]? - b_history = (BalanceHistory *) msg.s_payload; - memcpy( &history.s_history[i-1], b_history, sizeof( *b_history ) ); - } - } - - lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - - history.s_history_len = info->x; - print_history( &history ); -} - -void parent_phase2( my_info_t *me, dist_info_t *info ) { - bank_robbery( me, info->x ); - - lamport_increase( &gl_lamport_time ); - - Message msg = { - .s_header = { - .s_magic = MESSAGE_MAGIC, - .s_payload_len = 0, - .s_type = STOP, - .s_local_time = get_lamport_time() - } - }; - - send_multicast( me, &msg ); -} - void parent_workflow( dist_info_t *info ) { Message msg; uint8_t i, receive_cnt; @@ -72,25 +33,17 @@ void parent_workflow( dist_info_t *info ) { } } } - - // PARENT PHASE 2: Bank robbery & send STOP for all childs - parent_phase2( &me, info ); - // PARENT PHASE 3: Receive all DONE messages - for( i = 1; i <= info->x; i++ ) { - if( !receive( &me, i, &msg ) ) { + // PARENT PHASE 2: Receive all DONE messages + while( info->workers ) { + if( !receive_any( &me, &msg ) ) { lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - if( msg.s_header.s_type != DONE ) { - ret = 1; - break; - } + if( msg.s_header.s_type == DONE ) + info->workers--; } } - // PARENT PHASE 4: Receive all BALANCE_HISTORY and print all history - parent_phase4( &me, info ); - - // PARENT PHASE 5: Wait for the end of all processes + // PARENT PHASE 3: Wait for the end of all processes receive_cnt = info->x; while( receive_cnt != 0 ) { if( (wpid = wait(&status)) > 0 ) { |