diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-14 20:38:18 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-14 20:38:18 +0000 |
commit | b26d56275e4d7e88d1317372c335d2797e3928eb (patch) | |
tree | b53b4b585114b1b0587601e76cc905b8266d3299 | |
parent | a100fc191af0babcea7e702fa910abd76b233de9 (diff) | |
download | pa3-b26d56275e4d7e88d1317372c335d2797e3928eb.zip pa3-b26d56275e4d7e88d1317372c335d2797e3928eb.tar.gz pa3-b26d56275e4d7e88d1317372c335d2797e3928eb.tar.bz2 |
Lamport realisation
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | banking.c | 11 | ||||
-rw-r--r-- | child.c | 51 | ||||
-rw-r--r-- | dist.h | 2 | ||||
-rw-r--r-- | lamport.c | 24 | ||||
-rw-r--r-- | lamport.h | 13 | ||||
-rw-r--r-- | pa3.c (renamed from pa23.c) | 2 | ||||
-rw-r--r-- | pa3.h (renamed from pa23.h) | 0 | ||||
-rw-r--r-- | parent.c | 11 |
10 files changed, 92 insertions, 33 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5636990 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +*.log +tags @@ -1,13 +1,13 @@ -CFLAGS = -std=c99 -Wall -pedantic +CFLAGS = -std=c99 -Werror -Wall -pedantic CC = clang -all: pa2 +all: pa3 -pa2: bank_robbery.o banking.o pa23.o parent.o child.o ipc.o dist.o +pa3: lamport.o bank_robbery.o banking.o pa3.o parent.o child.o ipc.o dist.o $(CC) -o $@ $^ -L. -lruntime %.o: %.c $(CC) -c -o $@ $< $(CFLAGS) clean: - rm -f *.o pa2 + rm -f *.o pa3 @@ -4,6 +4,7 @@ #include "ipc.h" #include "dist.h" +#include "lamport.h" void transfer(void * parent_data, local_id src, local_id dst, balance_t amount) { @@ -17,22 +18,22 @@ void transfer(void * parent_data, local_id src, local_id dst, balance_t amount) .s_amount = amount }; + lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = sizeof( order ), .s_type = TRANSFER, - .s_local_time = get_physical_time() + .s_local_time = get_lamport_time() } }; memcpy( msg.s_payload, &order, sizeof( order ) ); - + send( me, src, &msg ); if( !receive( me, dst, &msg ) ) { - if( msg.s_header.s_type != ACK ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); + if( msg.s_header.s_type != ACK ) return; - } } - return; } @@ -4,10 +4,12 @@ #include "banking.h" #include "ipc.h" #include "pa2345.h" +#include "lamport.h" #include "dist.h" void child_phase0( dist_info_t *info, uint8_t id ) { + lamport_init( &gl_lamport_time ); close_redundant_pipes( info, id ); } @@ -21,23 +23,23 @@ void child_phase1( dist_info_t *info, uint8_t id ) { .dist_info = info }; - int size = snprintf( buf, sizeof(buf), log_started_fmt, get_physical_time(), + int size = snprintf( buf, sizeof(buf), log_started_fmt, get_lamport_time(), id, child_pid, info->parent_pid, info->balances[ id ] ); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + + lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = STARTED, - .s_local_time = get_physical_time() + .s_local_time = get_lamport_time() } }; - - fputs( buf, info->events_log ); - fputs( buf, stdout ); - strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); @@ -46,6 +48,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) { continue; if( !receive( &me, i, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type != STARTED ) { fprintf( stderr, "Message type INVALID (not STARTED)!\n" ); break; @@ -53,7 +56,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) { } } - snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_physical_time(), id ); + snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_lamport_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); } @@ -79,10 +82,11 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { int running = 1; while( running ) { if( !receive_any( &me, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); switch( msg.s_header.s_type ) { case TRANSFER: { TransferOrder *order = (TransferOrder *) msg.s_payload; - uint8_t cur_order_time = get_physical_time(); + uint8_t cur_order_time = get_lamport_time(); for( i = last_order_time + 1; i < cur_order_time; i++ ) { balanceState.s_balance = info->balances[ id ]; @@ -93,28 +97,31 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { if( order->s_src == id ) { snprintf( buf, sizeof(buf), log_transfer_out_fmt, - get_physical_time(), order->s_src, + get_lamport_time(), order->s_src, order->s_amount, order->s_dst ); fputs( buf, info->events_log ); fputs( buf, stdout ); info->balances[ id ] -= order->s_amount; - msg.s_header.s_local_time = get_physical_time(); + //lamport_increase( &gl_lamport_time ); + //msg.s_header.s_local_time = get_lamport_time(); send( &me, order->s_dst, &msg ); } else if( order->s_dst == id ) { snprintf( buf, sizeof(buf), log_transfer_in_fmt, - get_physical_time(), order->s_dst, + get_lamport_time(), order->s_dst, order->s_amount, order->s_src); fputs( buf, info->events_log ); fputs( buf, stdout ); + lamport_increase( &gl_lamport_time ); + Message stopMsg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = 0, .s_type = ACK, - .s_local_time = get_physical_time() + .s_local_time = get_lamport_time() } }; info->balances[ id ] += order->s_amount; @@ -148,22 +155,22 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { }; int size = snprintf( buf, sizeof(buf), log_done_fmt, - get_physical_time(), id, + get_lamport_time(), id, info->balances[ id ] ); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + + lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = DONE, - .s_local_time = get_physical_time() + .s_local_time = get_lamport_time() } }; - - fputs( buf, info->events_log ); - fputs( buf, stdout ); strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); - send_multicast( &me, &msg ); for( i = 1; i <= info->x; i++ ) { @@ -171,6 +178,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { continue; if( !receive( &me, i, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type != DONE ) { fprintf( stderr, "Message type INVALID (not DONE)!\n" ); break; @@ -179,11 +187,11 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { } snprintf( buf, sizeof(buf), log_received_all_done_fmt, - get_physical_time(), id ); + get_lamport_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); - int time = get_physical_time(); + int time = get_lamport_time(); for( i = history->s_history_len; i <= time; i++ ) { history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance; history->s_history[i].s_time = i; @@ -191,6 +199,8 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { history->s_history_len = time+1; + lamport_increase( &gl_lamport_time ); + int wr_size = history->s_history_len * sizeof(history->s_history[0]); Message historyMsg = { .s_header = { @@ -200,7 +210,6 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { .s_local_time = time } }; - memcpy( historyMsg.s_payload, history, wr_size ); send( &me, PARENT_ID, &historyMsg ); } @@ -1,7 +1,7 @@ #ifndef __IFMO_DISTRIBUTED_CLASS_DIST__H #define __IFMO_DISTRIBUTED_CLASS_DIST__H -#include "pa23.h" +#include "pa3.h" #include "banking.h" #define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 ) diff --git a/lamport.c b/lamport.c new file mode 100644 index 0000000..65c76bc --- /dev/null +++ b/lamport.c @@ -0,0 +1,24 @@ +#include "lamport.h" +#include "banking.h" + +timestamp_t gl_lamport_time; // TODO: remove global variable with Lamport time + +timestamp_t get_lamport_time() +{ + return gl_lamport_time; +} + +void lamport_init( timestamp_t *lamport_time ) +{ + *lamport_time = 0; +} + +void lamport_increase( timestamp_t *lamport_time ) +{ + (*lamport_time)++; +} + +void lamport_message_handler( timestamp_t *lamport_time, timestamp_t local_time ) +{ + *lamport_time = ( local_time > *lamport_time ? local_time + 1 : *lamport_time + 1 ); +} diff --git a/lamport.h b/lamport.h new file mode 100644 index 0000000..edd19c2 --- /dev/null +++ b/lamport.h @@ -0,0 +1,13 @@ +#ifndef __IFMO_DISTRIBUTED_CLASS_LAMPORT__H +#define __IFMO_DISTRIBUTED_CLASS_LAMPORT__H + +#include "banking.h" + +extern timestamp_t gl_lamport_time; + +void lamport_message_handler( timestamp_t *lamport_time, + timestamp_t local_time); +void lamport_increase( timestamp_t *lamport_time ); +void lamport_init( timestamp_t *lamport_time ); + +#endif @@ -6,7 +6,7 @@ #include <stdlib.h> #include <string.h> -#include "pa23.h" +#include "pa3.h" #include "dist.h" #include "child.h" @@ -6,6 +6,7 @@ #include "dist.h" #include "ipc.h" #include "banking.h" +#include "lamport.h" void parent_phase4( my_info_t *me, dist_info_t *info ) { Message msg; @@ -15,12 +16,15 @@ void parent_phase4( my_info_t *me, dist_info_t *info ) { uint8_t i; for( i = 1; i <= info->x; i++ ) { if( !receive( me, i, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); //TODO: what if s_payload_len more than s_history[i]? b_history = (BalanceHistory *) msg.s_payload; memcpy( &history.s_history[i-1], b_history, sizeof( *b_history ) ); } } + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); + history.s_history_len = info->x; print_history( &history ); } @@ -28,12 +32,14 @@ void parent_phase4( my_info_t *me, dist_info_t *info ) { void parent_phase2( my_info_t *me, dist_info_t *info ) { bank_robbery( me, info->x ); + lamport_increase( &gl_lamport_time ); + Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = 0, .s_type = STOP, - .s_local_time = get_physical_time() + .s_local_time = get_lamport_time() } }; @@ -47,6 +53,7 @@ void parent_workflow( dist_info_t *info ) { int status; int ret = 0; + lamport_init( &gl_lamport_time ); my_info_t me = { .id = PARENT_ID, .dist_info = info @@ -58,6 +65,7 @@ void parent_workflow( dist_info_t *info ) { // PARENT PHASE 1: Receive all STARTED messages from all childs for( i = 1; i <= info->x; i++ ) { if( !receive( &me, i, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type != STARTED ) { ret = 1; break; @@ -71,6 +79,7 @@ void parent_workflow( dist_info_t *info ) { // PARENT PHASE 3: Receive all DONE messages for( i = 1; i <= info->x; i++ ) { if( !receive( &me, i, &msg ) ) { + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); if( msg.s_header.s_type != DONE ) { ret = 1; break; |