summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Nazaryev <sergey@nazaryev.ru>2016-05-14 20:38:18 +0000
committerSergey Nazaryev <sergey@nazaryev.ru>2016-05-14 20:38:18 +0000
commitb26d56275e4d7e88d1317372c335d2797e3928eb (patch)
treeb53b4b585114b1b0587601e76cc905b8266d3299
parenta100fc191af0babcea7e702fa910abd76b233de9 (diff)
downloadpa3-b26d56275e4d7e88d1317372c335d2797e3928eb.zip
pa3-b26d56275e4d7e88d1317372c335d2797e3928eb.tar.gz
pa3-b26d56275e4d7e88d1317372c335d2797e3928eb.tar.bz2
Lamport realisation
-rw-r--r--.gitignore3
-rw-r--r--Makefile8
-rw-r--r--banking.c11
-rw-r--r--child.c51
-rw-r--r--dist.h2
-rw-r--r--lamport.c24
-rw-r--r--lamport.h13
-rw-r--r--pa3.c (renamed from pa23.c)2
-rw-r--r--pa3.h (renamed from pa23.h)0
-rw-r--r--parent.c11
10 files changed, 92 insertions, 33 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5636990
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*.o
+*.log
+tags
diff --git a/Makefile b/Makefile
index 8e1e684..e3df556 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,13 @@
-CFLAGS = -std=c99 -Wall -pedantic
+CFLAGS = -std=c99 -Werror -Wall -pedantic
CC = clang
-all: pa2
+all: pa3
-pa2: bank_robbery.o banking.o pa23.o parent.o child.o ipc.o dist.o
+pa3: lamport.o bank_robbery.o banking.o pa3.o parent.o child.o ipc.o dist.o
$(CC) -o $@ $^ -L. -lruntime
%.o: %.c
$(CC) -c -o $@ $< $(CFLAGS)
clean:
- rm -f *.o pa2
+ rm -f *.o pa3
diff --git a/banking.c b/banking.c
index b5285e6..8ae96ea 100644
--- a/banking.c
+++ b/banking.c
@@ -4,6 +4,7 @@
#include "ipc.h"
#include "dist.h"
+#include "lamport.h"
void transfer(void * parent_data, local_id src, local_id dst, balance_t amount)
{
@@ -17,22 +18,22 @@ void transfer(void * parent_data, local_id src, local_id dst, balance_t amount)
.s_amount = amount
};
+ lamport_increase( &gl_lamport_time );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = sizeof( order ),
.s_type = TRANSFER,
- .s_local_time = get_physical_time()
+ .s_local_time = get_lamport_time()
}
};
memcpy( msg.s_payload, &order, sizeof( order ) );
-
+
send( me, src, &msg );
if( !receive( me, dst, &msg ) ) {
- if( msg.s_header.s_type != ACK ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
+ if( msg.s_header.s_type != ACK )
return;
- }
}
-
return;
}
diff --git a/child.c b/child.c
index cf7496f..eeac0b9 100644
--- a/child.c
+++ b/child.c
@@ -4,10 +4,12 @@
#include "banking.h"
#include "ipc.h"
#include "pa2345.h"
+#include "lamport.h"
#include "dist.h"
void child_phase0( dist_info_t *info, uint8_t id ) {
+ lamport_init( &gl_lamport_time );
close_redundant_pipes( info, id );
}
@@ -21,23 +23,23 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
.dist_info = info
};
- int size = snprintf( buf, sizeof(buf), log_started_fmt, get_physical_time(),
+ int size = snprintf( buf, sizeof(buf), log_started_fmt, get_lamport_time(),
id, child_pid,
info->parent_pid,
info->balances[ id ] );
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+
+ lamport_increase( &gl_lamport_time );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = size + 1,
.s_type = STARTED,
- .s_local_time = get_physical_time()
+ .s_local_time = get_lamport_time()
}
};
-
- fputs( buf, info->events_log );
- fputs( buf, stdout );
-
strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) );
send_multicast( &me, &msg );
@@ -46,6 +48,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
continue;
if( !receive( &me, i, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
if( msg.s_header.s_type != STARTED ) {
fprintf( stderr, "Message type INVALID (not STARTED)!\n" );
break;
@@ -53,7 +56,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
}
}
- snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_physical_time(), id );
+ snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_lamport_time(), id );
fputs( buf, info->events_log );
fputs( buf, stdout );
}
@@ -79,10 +82,11 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
int running = 1;
while( running ) {
if( !receive_any( &me, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
switch( msg.s_header.s_type ) {
case TRANSFER: {
TransferOrder *order = (TransferOrder *) msg.s_payload;
- uint8_t cur_order_time = get_physical_time();
+ uint8_t cur_order_time = get_lamport_time();
for( i = last_order_time + 1; i < cur_order_time; i++ ) {
balanceState.s_balance = info->balances[ id ];
@@ -93,28 +97,31 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
if( order->s_src == id ) {
snprintf( buf, sizeof(buf), log_transfer_out_fmt,
- get_physical_time(), order->s_src,
+ get_lamport_time(), order->s_src,
order->s_amount, order->s_dst );
fputs( buf, info->events_log );
fputs( buf, stdout );
info->balances[ id ] -= order->s_amount;
- msg.s_header.s_local_time = get_physical_time();
+ //lamport_increase( &gl_lamport_time );
+ //msg.s_header.s_local_time = get_lamport_time();
send( &me, order->s_dst, &msg );
} else if( order->s_dst == id ) {
snprintf( buf, sizeof(buf), log_transfer_in_fmt,
- get_physical_time(), order->s_dst,
+ get_lamport_time(), order->s_dst,
order->s_amount, order->s_src);
fputs( buf, info->events_log );
fputs( buf, stdout );
+ lamport_increase( &gl_lamport_time );
+
Message stopMsg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = 0,
.s_type = ACK,
- .s_local_time = get_physical_time()
+ .s_local_time = get_lamport_time()
}
};
info->balances[ id ] += order->s_amount;
@@ -148,22 +155,22 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
};
int size = snprintf( buf, sizeof(buf), log_done_fmt,
- get_physical_time(), id,
+ get_lamport_time(), id,
info->balances[ id ] );
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+
+ lamport_increase( &gl_lamport_time );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = size + 1,
.s_type = DONE,
- .s_local_time = get_physical_time()
+ .s_local_time = get_lamport_time()
}
};
-
- fputs( buf, info->events_log );
- fputs( buf, stdout );
strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) );
-
send_multicast( &me, &msg );
for( i = 1; i <= info->x; i++ ) {
@@ -171,6 +178,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
continue;
if( !receive( &me, i, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
if( msg.s_header.s_type != DONE ) {
fprintf( stderr, "Message type INVALID (not DONE)!\n" );
break;
@@ -179,11 +187,11 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
}
snprintf( buf, sizeof(buf), log_received_all_done_fmt,
- get_physical_time(), id );
+ get_lamport_time(), id );
fputs( buf, info->events_log );
fputs( buf, stdout );
- int time = get_physical_time();
+ int time = get_lamport_time();
for( i = history->s_history_len; i <= time; i++ ) {
history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance;
history->s_history[i].s_time = i;
@@ -191,6 +199,8 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
history->s_history_len = time+1;
+ lamport_increase( &gl_lamport_time );
+
int wr_size = history->s_history_len * sizeof(history->s_history[0]);
Message historyMsg = {
.s_header = {
@@ -200,7 +210,6 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
.s_local_time = time
}
};
-
memcpy( historyMsg.s_payload, history, wr_size );
send( &me, PARENT_ID, &historyMsg );
}
diff --git a/dist.h b/dist.h
index b7758cf..9c5c4b4 100644
--- a/dist.h
+++ b/dist.h
@@ -1,7 +1,7 @@
#ifndef __IFMO_DISTRIBUTED_CLASS_DIST__H
#define __IFMO_DISTRIBUTED_CLASS_DIST__H
-#include "pa23.h"
+#include "pa3.h"
#include "banking.h"
#define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 )
diff --git a/lamport.c b/lamport.c
new file mode 100644
index 0000000..65c76bc
--- /dev/null
+++ b/lamport.c
@@ -0,0 +1,24 @@
+#include "lamport.h"
+#include "banking.h"
+
+timestamp_t gl_lamport_time; // TODO: remove global variable with Lamport time
+
+timestamp_t get_lamport_time()
+{
+ return gl_lamport_time;
+}
+
+void lamport_init( timestamp_t *lamport_time )
+{
+ *lamport_time = 0;
+}
+
+void lamport_increase( timestamp_t *lamport_time )
+{
+ (*lamport_time)++;
+}
+
+void lamport_message_handler( timestamp_t *lamport_time, timestamp_t local_time )
+{
+ *lamport_time = ( local_time > *lamport_time ? local_time + 1 : *lamport_time + 1 );
+}
diff --git a/lamport.h b/lamport.h
new file mode 100644
index 0000000..edd19c2
--- /dev/null
+++ b/lamport.h
@@ -0,0 +1,13 @@
+#ifndef __IFMO_DISTRIBUTED_CLASS_LAMPORT__H
+#define __IFMO_DISTRIBUTED_CLASS_LAMPORT__H
+
+#include "banking.h"
+
+extern timestamp_t gl_lamport_time;
+
+void lamport_message_handler( timestamp_t *lamport_time,
+ timestamp_t local_time);
+void lamport_increase( timestamp_t *lamport_time );
+void lamport_init( timestamp_t *lamport_time );
+
+#endif
diff --git a/pa23.c b/pa3.c
index 2df9268..b01dfdc 100644
--- a/pa23.c
+++ b/pa3.c
@@ -6,7 +6,7 @@
#include <stdlib.h>
#include <string.h>
-#include "pa23.h"
+#include "pa3.h"
#include "dist.h"
#include "child.h"
diff --git a/pa23.h b/pa3.h
index 42e288e..42e288e 100644
--- a/pa23.h
+++ b/pa3.h
diff --git a/parent.c b/parent.c
index dcce4a2..fea2f96 100644
--- a/parent.c
+++ b/parent.c
@@ -6,6 +6,7 @@
#include "dist.h"
#include "ipc.h"
#include "banking.h"
+#include "lamport.h"
void parent_phase4( my_info_t *me, dist_info_t *info ) {
Message msg;
@@ -15,12 +16,15 @@ void parent_phase4( my_info_t *me, dist_info_t *info ) {
uint8_t i;
for( i = 1; i <= info->x; i++ ) {
if( !receive( me, i, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
//TODO: what if s_payload_len more than s_history[i]?
b_history = (BalanceHistory *) msg.s_payload;
memcpy( &history.s_history[i-1], b_history, sizeof( *b_history ) );
}
}
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
+
history.s_history_len = info->x;
print_history( &history );
}
@@ -28,12 +32,14 @@ void parent_phase4( my_info_t *me, dist_info_t *info ) {
void parent_phase2( my_info_t *me, dist_info_t *info ) {
bank_robbery( me, info->x );
+ lamport_increase( &gl_lamport_time );
+
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = 0,
.s_type = STOP,
- .s_local_time = get_physical_time()
+ .s_local_time = get_lamport_time()
}
};
@@ -47,6 +53,7 @@ void parent_workflow( dist_info_t *info ) {
int status;
int ret = 0;
+ lamport_init( &gl_lamport_time );
my_info_t me = {
.id = PARENT_ID,
.dist_info = info
@@ -58,6 +65,7 @@ void parent_workflow( dist_info_t *info ) {
// PARENT PHASE 1: Receive all STARTED messages from all childs
for( i = 1; i <= info->x; i++ ) {
if( !receive( &me, i, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
if( msg.s_header.s_type != STARTED ) {
ret = 1;
break;
@@ -71,6 +79,7 @@ void parent_workflow( dist_info_t *info ) {
// PARENT PHASE 3: Receive all DONE messages
for( i = 1; i <= info->x; i++ ) {
if( !receive( &me, i, &msg ) ) {
+ lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
if( msg.s_header.s_type != DONE ) {
ret = 1;
break;