summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile6
-rw-r--r--child.c154
-rw-r--r--dist.h5
-rw-r--r--ipc.c10
-rw-r--r--lamport.c2
-rw-r--r--lamport.h4
-rw-r--r--pa3.c49
-rw-r--r--parent.c59
8 files changed, 78 insertions, 211 deletions
diff --git a/Makefile b/Makefile
index 67dd8b7..e0d2413 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,13 @@
CFLAGS = -std=c99 -Werror -Wall -pedantic
CC = clang
-all: pa3
+all: pa4
-pa3: lamport.o bank_robbery.o banking.o pa3.o parent.o child.o ipc.o dist.o
+pa4: cs.o queue.o lamport.o pa3.o parent.o child.o ipc.o dist.o
$(CC) -o $@ $^ -Llib64 -lruntime
%.o: %.c
$(CC) -c -o $@ $< $(CFLAGS)
clean:
- rm -f *.o pa3
+ rm -f *.o pa4
diff --git a/child.c b/child.c
index e62ad15..7966844 100644
--- a/child.c
+++ b/child.c
@@ -1,13 +1,14 @@
#include <stdint.h>
#include <string.h>
-#include "banking.h"
#include "ipc.h"
#include "pa2345.h"
#include "lamport.h"
#include "dist.h"
+#define MAX_LOOP_STR 1024
+
void child_phase0( dist_info_t *info, uint8_t id ) {
lamport_init( &gl_lamport_time );
close_redundant_pipes( info, id );
@@ -26,7 +27,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
int size = snprintf( buf, sizeof(buf), log_started_fmt, get_lamport_time(),
id, child_pid,
info->parent_pid,
- info->balances[ id ] );
+ 0 );
fputs( buf, info->events_log );
fflush( info->events_log );
fputs( buf, stdout );
@@ -64,104 +65,30 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
fflush( info->events_log );
}
-void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
- Message msg;
- BalanceState balanceState;
- char buf[MAX_MESSAGE_LEN];
-
+void child_phase2( dist_info_t *info, uint8_t id ) {
my_info_t me = {
.id = id,
.dist_info = info
};
- uint8_t i;
- uint8_t last_order_time = 0;
-
- balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = last_order_time;
- balanceState.s_balance_pending_in = 0;
+ if( info->mutex )
+ request_cs(&me);
+
+ uint8_t i, loop_cnt;
+ char buf[ MAX_LOOP_STR ];
- history->s_history[ last_order_time ] = balanceState;
- history->s_history_len = 1;
-
- int running = 1;
- while( running ) {
- if( !receive_any( &me, &msg ) ) {
- lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
- switch( msg.s_header.s_type ) {
- case TRANSFER: {
- TransferOrder *order = (TransferOrder *) msg.s_payload;
- if( order->s_src == id ) {
- lamport_increase( &gl_lamport_time );
-
- info->balances[ id ] -= order->s_amount;
- int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in;
- balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = get_lamport_time();
- balanceState.s_balance_pending_in = oldBp + order->s_amount;
-
- msg.s_header.s_local_time = get_lamport_time();
- send( &me, order->s_dst, &msg );
-
- snprintf( buf, sizeof(buf), log_transfer_out_fmt,
- get_lamport_time(), order->s_src,
- order->s_amount, order->s_dst );
- fputs( buf, info->events_log );
- fflush( info->events_log );
- fputs( buf, stdout );
- } else if( order->s_dst == id ) {
- lamport_increase( &gl_lamport_time );
-
- info->balances[ id ] += order->s_amount;
- int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in;
-
- balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = get_lamport_time();
- balanceState.s_balance_pending_in = oldBp - order->s_amount;
-
- Message ackMsg = {
- .s_header = {
- .s_magic = MESSAGE_MAGIC,
- .s_payload_len = 0,
- .s_type = ACK,
- .s_local_time = get_lamport_time()
- }
- };
- send( &me, PARENT_ID, &ackMsg );
-
- snprintf( buf, sizeof(buf), log_transfer_in_fmt,
- get_lamport_time(), order->s_dst,
- order->s_amount, order->s_src);
- fputs( buf, info->events_log );
- fflush( info->events_log );
- fputs( buf, stdout );
- }
-
- int cur_time = get_lamport_time();
-
- history->s_history[ cur_time ] = balanceState;
- history->s_history_len = cur_time + 1;
-
- for( i = last_order_time + 1; i < cur_time; i++ ) {
- balanceState = history->s_history[ i-1 ];
- balanceState.s_time = i;
- history->s_history[ i ] = balanceState;
- }
-
- last_order_time = cur_time;
- break;
- }
- case STOP:
- running = 0;
- break;
- }
- }
+ loop_cnt = id * 5;
+ for( i = 1; i <= loop_cnt; i++ ) {
+ snprintf( buf, sizeof(buf), log_loop_operation_fmt, id, i, loop_cnt );
+ print( buf );
}
+
+ if( info->mutex )
+ release_cs( &me );
}
-void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
+void child_phase3( dist_info_t *info, uint8_t id ) {
char buf[MAX_MESSAGE_LEN];
- uint8_t i;
my_info_t me = {
.id = id,
@@ -170,7 +97,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
int size = snprintf( buf, sizeof(buf), log_done_fmt,
get_lamport_time(), id,
- info->balances[ id ] );
+ 0 );
fputs( buf, info->events_log );
fflush( info->events_log );
fputs( buf, stdout );
@@ -187,17 +114,13 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
};
strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) );
send_multicast( &me, &msg );
+ info->workers--;
- for( i = 1; i <= info->x; i++ ) {
- if( i == me.id )
- continue;
-
- if( !receive( &me, i, &msg ) ) {
+ while( info->workers ) {
+ if( !receive_any( &me, &msg ) ) {
lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
- if( msg.s_header.s_type != DONE ) {
- fprintf( stderr, "Message type INVALID (not DONE)!\n" );
- break;
- }
+ if( msg.s_header.s_type == DONE )
+ info->workers--;
}
}
@@ -206,36 +129,9 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
fputs( buf, info->events_log );
fflush( info->events_log );
fputs( buf, stdout );
-
- int time = get_lamport_time();
- for( i = history->s_history_len; i <= time; i++ ) {
- history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance;
- history->s_history[i].s_balance_pending_in = history->s_history[ history->s_history_len - 1].s_balance_pending_in;
- history->s_history[i].s_time = i;
- }
-
- history->s_history_len = time+1;
-
- lamport_increase( &gl_lamport_time );
-
- int wr_size = history->s_history_len * sizeof(history->s_history[0]);
- Message historyMsg = {
- .s_header = {
- .s_magic = MESSAGE_MAGIC,
- .s_payload_len = wr_size,
- .s_type = BALANCE_HISTORY,
- .s_local_time = get_lamport_time()
- }
- };
- memcpy( historyMsg.s_payload, history, wr_size );
- send( &me, PARENT_ID, &historyMsg );
}
void child_workflow( dist_info_t *info, uint8_t id ) {
- BalanceHistory history = {
- .s_id = id
- };
-
/* CHILD PHASE 0: Close all redundant descriptors */
child_phase0(info, id);
@@ -244,9 +140,9 @@ void child_workflow( dist_info_t *info, uint8_t id ) {
child_phase1(info, id);
/* CHILD PHASE 2: Do some useful work */
- child_phase2(info, id, &history);
+ child_phase2(info, id);
/* CHILD PHASE 3: Send DONE message for all, receive DONE messages from
* other childs and exit */
- child_phase3(info, id, &history);
+ child_phase3(info, id);
}
diff --git a/dist.h b/dist.h
index 9c5c4b4..d3834a5 100644
--- a/dist.h
+++ b/dist.h
@@ -2,7 +2,6 @@
#define __IFMO_DISTRIBUTED_CLASS_DIST__H
#include "pa3.h"
-#include "banking.h"
#define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 )
@@ -10,7 +9,8 @@ typedef struct {
int pipes[MAX_PIPES][2];
uint8_t proccnt;
uint8_t x;
- balance_t balances[MAX_X];
+ int mutex;
+ uint8_t workers;
FILE* events_log;
FILE* pipes_log;
@@ -20,6 +20,7 @@ typedef struct {
typedef struct {
int id;
dist_info_t *dist_info;
+ int msg_author;
} my_info_t;
void close_redundant_pipes( dist_info_t *info, uint8_t id );
diff --git a/ipc.c b/ipc.c
index 71ec9aa..f04f017 100644
--- a/ipc.c
+++ b/ipc.c
@@ -99,6 +99,7 @@ int receive_any(void * self, Message * msg) {
uint8_t i, j = 0;
int *descriptors = (int *)malloc( sizeof(int) * (me->dist_info->proccnt - 1) );
+ int *senders = (int *)malloc( sizeof(int) * (me->dist_info->proccnt - 1) );
for(i = 0; i < me->dist_info->proccnt; i++) {
// don't see at pipes as ( ID -> Y )
if( me->id == i )
@@ -110,7 +111,9 @@ int receive_any(void * self, Message * msg) {
// index number in block of pipes for ( X -> ID )
uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id );
- descriptors[j++] = me->dist_info->pipes[idx][0];
+ descriptors[j] = me->dist_info->pipes[idx][0];
+ senders[j] = i;
+ j++;
}
char buf[MAX_MESSAGE_LEN];
@@ -123,6 +126,10 @@ int receive_any(void * self, Message * msg) {
memcpy( msgbuf, buf, size_hdr );
size_payload = read( descriptors[i], buf, msg->s_header.s_payload_len );
memcpy( msgbuf + size_hdr, buf, size_payload );
+
+ me->msg_author = senders[i];
+
+ free( senders );
free( descriptors );
return 0;
}
@@ -130,6 +137,7 @@ int receive_any(void * self, Message * msg) {
usleep(100000); // sleep 100 ms
}
+ free( senders );
free( descriptors );
return 1;
}
diff --git a/lamport.c b/lamport.c
index 65c76bc..a7ad7ff 100644
--- a/lamport.c
+++ b/lamport.c
@@ -1,5 +1,5 @@
#include "lamport.h"
-#include "banking.h"
+#include "ipc.h"
timestamp_t gl_lamport_time; // TODO: remove global variable with Lamport time
diff --git a/lamport.h b/lamport.h
index edd19c2..b197c32 100644
--- a/lamport.h
+++ b/lamport.h
@@ -1,10 +1,10 @@
#ifndef __IFMO_DISTRIBUTED_CLASS_LAMPORT__H
#define __IFMO_DISTRIBUTED_CLASS_LAMPORT__H
-
-#include "banking.h"
+#include "ipc.h"
extern timestamp_t gl_lamport_time;
+timestamp_t get_lamport_time();
void lamport_message_handler( timestamp_t *lamport_time,
timestamp_t local_time);
void lamport_increase( timestamp_t *lamport_time );
diff --git a/pa3.c b/pa3.c
index c7e0ae2..d1465d8 100644
--- a/pa3.c
+++ b/pa3.c
@@ -2,6 +2,7 @@
#include <fcntl.h> /* Obtain O_* constant definitions */
#include <unistd.h>
+#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -13,54 +14,62 @@
#include "parent.h"
#include "common.h"
-#include "banking.h"
static char* app_name;
void usage( FILE* output ) {
- fprintf(output, "usage: %s -p <X> <BAL1> <BAL2> ... <BALX>\n", app_name);
+ fprintf(output, "usage: %s [--mutexl] -p <X>\n", app_name);
fprintf(output, "`X` must be int between 1 and %d\n", MAX_X);
}
int main(int argc, char* argv[]) {
uint8_t i;
+ int opt;
dist_info_t info;
app_name = argv[0];
- if( argc < 3 || strcmp( argv[1], "-p") != 0 ) {
- usage(stdout);
- return 1;
- }
+ struct option long_options[] = {
+ {"mutexl", no_argument, &info.mutex, 1},
+ {0, 0, 0, 0}
+ };
+ int option_index;
- // fill info about distributed network
- info.x = atoi(argv[2]);
- if( info.x < 0 || info.x > MAX_X ) {
- usage(stdout);
- return 1;
+ info.x = 0;
+ while ((opt = getopt_long(argc, argv, "p:", long_options, &option_index)) != -1) {
+ switch (opt) {
+ case 0:
+ break;
+ case 'p':
+ info.x = atoi(optarg);
+ break;
+ default: /* '?' */
+ usage(stdout);
+ exit(EXIT_FAILURE);
+ }
}
- if( argc != (3 + info.x) ) {
+ if( info.x <= 0 || info.x > MAX_X ) {
usage(stdout);
- return 1;
- }
-
- for( i = 1; i <= info.x; i++ ) {
- int balance = atoi(argv[3 + i - 1]);
- info.balances[ i ] = balance;
+ exit(EXIT_FAILURE);
}
info.proccnt = info.x + 1;
+ info.workers = info.x;
info.parent_pid = getpid();
info.events_log = fopen( events_log, "w" );
info.pipes_log = fopen( pipes_log, "w" );
+ if( !info.pipes_log || !info.events_log ) {
+ fprintf( stderr, "Events log or pipes log can't be open" );
+ return 1;
+ }
+
// open all (N * (N-1)) pipes for a full mesh topology
for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) {
pipe2( info.pipes[i], O_NONBLOCK );
fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] );
}
fclose( info.pipes_log );
-
for( i = 1; i < info.proccnt; i++ ) {
pid_t pid = fork();
switch(pid) {
@@ -77,7 +86,7 @@ int main(int argc, char* argv[]) {
break;
}
}
-
+
parent_workflow( &info );
return 0;
}
diff --git a/parent.c b/parent.c
index fea2f96..c4742ff 100644
--- a/parent.c
+++ b/parent.c
@@ -5,47 +5,8 @@
#include "dist.h"
#include "ipc.h"
-#include "banking.h"
#include "lamport.h"
-void parent_phase4( my_info_t *me, dist_info_t *info ) {
- Message msg;
- BalanceHistory *b_history;
- AllHistory history;
-
- uint8_t i;
- for( i = 1; i <= info->x; i++ ) {
- if( !receive( me, i, &msg ) ) {
- lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
- //TODO: what if s_payload_len more than s_history[i]?
- b_history = (BalanceHistory *) msg.s_payload;
- memcpy( &history.s_history[i-1], b_history, sizeof( *b_history ) );
- }
- }
-
- lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
-
- history.s_history_len = info->x;
- print_history( &history );
-}
-
-void parent_phase2( my_info_t *me, dist_info_t *info ) {
- bank_robbery( me, info->x );
-
- lamport_increase( &gl_lamport_time );
-
- Message msg = {
- .s_header = {
- .s_magic = MESSAGE_MAGIC,
- .s_payload_len = 0,
- .s_type = STOP,
- .s_local_time = get_lamport_time()
- }
- };
-
- send_multicast( me, &msg );
-}
-
void parent_workflow( dist_info_t *info ) {
Message msg;
uint8_t i, receive_cnt;
@@ -72,25 +33,17 @@ void parent_workflow( dist_info_t *info ) {
}
}
}
-
- // PARENT PHASE 2: Bank robbery & send STOP for all childs
- parent_phase2( &me, info );
- // PARENT PHASE 3: Receive all DONE messages
- for( i = 1; i <= info->x; i++ ) {
- if( !receive( &me, i, &msg ) ) {
+ // PARENT PHASE 2: Receive all DONE messages
+ while( info->workers ) {
+ if( !receive_any( &me, &msg ) ) {
lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
- if( msg.s_header.s_type != DONE ) {
- ret = 1;
- break;
- }
+ if( msg.s_header.s_type == DONE )
+ info->workers--;
}
}
- // PARENT PHASE 4: Receive all BALANCE_HISTORY and print all history
- parent_phase4( &me, info );
-
- // PARENT PHASE 5: Wait for the end of all processes
+ // PARENT PHASE 3: Wait for the end of all processes
receive_cnt = info->x;
while( receive_cnt != 0 ) {
if( (wpid = wait(&status)) > 0 ) {