import java.util.*; /** software library */ public class comm { private int port ; /* port on which client will send */ private int ID ; /* unique id . will get from the server at initialization*/ private int CTR; /* to count number of messages sent . will increase monotonically */ private int MID; /* Message id. used for tracking sequence of messages : abcast,abcast_ack,abcast2 */ final static int READ=1; final static int WRITE=2; final static int QUERYRESULT=-1; final static int ABCAST=0; final static int ABCAST_ACK=1; final static int ABCAST2=2; final static int PROBE=3; final static int PROBE_ACK=4; final static int VIEWREQUEST=1111; final static int VIEWRESULT=2222; final static int UNDELIVERABLE=0; final static int DELIVERABLE=1; final static int CLIENT=0; final static int SERVER=1; public boolean abcastsent=false; private Abcast a; public Message m,mab2,newm;// private made public by Mithun private AbcastAck aack; private Abcast2 a2,ab2; private CoreMessage cm; private String v,u,s; Object o; private Hashtable causalCheck; /* contains client_id and last message delivered -- both string objects */ private Hashtable causalQueue; /* contains "client_id + messagenum" and Message object */ private TreeMap abcastQueue ; /* contains "priority + client_id + Message_ID" and Message object */ private Hashtable abcastAckSet; /* contains abcast_ack received by the clients sent by the servers */ /* contains "client_id + message_ID" */ private TreeMap present_in_history; /* stores a log of all messages delivered to the server */ HashSet viewset; Hashtable viewcheck; Hashtable viewpriority; int viewsize; /* no of servers present in the view... */ int deleteCount; /* will be used to handle cases where a client fails => processProbeAck */ int delayCount; /* user as a timeout mechanism */ boolean deleteFlag; // int abcastcount; /* synchronised methods --- producer consumer */ DataCA dca; /* queue , item produced by comm , consumed by comm */ DataAC dac; /* queue , item produced by App , consumed by comm */ DataCT dct; /* queue , item produced by comm , consumed by transport */ DataTC dtc; /* queue , item produced by transport , consumed by comm */ /* initialization of comm module */ /* this module is called by the application layer */ public comm(int ID, int port, int X, int ReceiverType, DataAC dac, DataCA dca) // int X is boolean to differentiate between client and server. { this.ID=ID; /* initialise this client/server with a unique identitification */ this.CTR=1; this.MID=1; causalCheck = new Hashtable(); causalQueue = new Hashtable(); abcastQueue = new TreeMap(new CompareString()); present_in_history = new TreeMap(new CompareString()); abcastAckSet = new Hashtable(); this.dac=dac; this.dca=dca; dtc=new DataTC(); dct=new DataCT(); viewset=new HashSet(); viewcheck = new Hashtable(); viewpriority = new Hashtable(); deleteFlag = false; deleteCount=0; delayCount=0; Thread sender=new Thread(new Send(port,dct)); /* instantiate the sender , which will open the send port */ /* instantiate the receiver , which will open the receive port at address (port + 1) from listening */ /* threee types of receivers : */ /* point to point receive ,multicast receive , and both multicast and point to point */ if (ReceiverType==0) { Thread t=new Thread(new ReceiveP2P((port+1),dtc)); t.start(); } else if (ReceiverType==1) { Thread t=new Thread(new ReceiveMulticast(dtc)); t.start(); } else { Thread t1=new Thread(new ReceiveP2P((port+1),dtc)); Thread t2=new Thread(new ReceiveMulticast(dtc)); t1.start(); t2.start(); } sender.start(); /* start sender thread : single port for all types ( multicast and point to point ) */ /* these threads start two threads of the commm module */ /* one will consume from the buffer produced by the application layer */ /* other will consume from the buffer produced by the transport layer below */ if (ReceiverType==0) { this.init(); //for client only! } Thread cfa=new Thread(new ConsumeFromApp(dac,this)); cfa.start(); Thread cfd=new Thread(new ConsumeFromT(dtc,this)); cfd.start(); } /* init ends */ /* ----------------------------------------------------------------- */ public synchronized int getCounterValue() { return CTR++; /* vector time of the process */ /* increments every time a message is sent */ } /* ----------------------------------------------------------------- */ public synchronized int getMessageID() { abcastsent=false; return MID++; /* message identification */ /* to identify a message while sending Abcast, Ack, Probe ... */ } /* ----------------------------------------------------------------- */ /* Application layer will issue a read request,by passing an instance of CoreMessage to the comm layer */ public void processMessage(CoreMessage cm) { try { // Debug.dump("Processing CoreMessage "); if(cm.getType() == READ) { /* these lines are added just to change the order of messages int qwc = this.getCounterValue(); if (qwc == 2) { qwc = 3 ; } else if (qwc == 3) { qwc = 2 ; }; int qwm = this.getMessageID(); if (qwm == 2) { qwm = 3 ; } else if (qwm == 3) { qwm = 2 ; }; */ a=new Abcast(cm,READ); m=new Message(ID,ABCAST,this.getMessageID(),0,a,this.getCounterValue(),viewset.size() ,0); // UNCOMENT IT LATER /* ------------------------------------------------------------------------- */ // m= new Message(ID,ABCAST,qwm,0,a,qwc,-1,0); // comment it later /* ------------------------------------------------------------------------- */ HashSet tempviewset=(HashSet)viewset.clone(); Iterator itr=viewset.iterator(); while(itr.hasNext()) { tempviewset.add(itr.next()); } viewcheck.put(""+m.getmessageid(),tempviewset); viewpriority.put(""+m.getmessageid(),""+0); /* obtaining the view , for later processing */ // abcastcount=m.getmessageid(); //System.out.println("abcast count"+abcastcount); dct.Produce(m); } else { /* include WRITE OPERATION */ } } catch(Exception e) { System.out.println("ProcessMessage: "+e); } } /** New method added to enable server to send independent messages*/ void processServerMessage(Message m) { // System.out.println("Query reply"); dct.Produce(m); } /* ------------------------------------------------------------- */ /* read write primitives invoked by this layer and passed to the lower layer */ public void processAbcast(Message m) { viewsize = m.getmessagedest(); try { /* Phase -I check for causal order */ if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); if( x == m.getmessagenum() - 1) { causalCheck.put(""+m.getmessagesource(),""+(x+1)); // Phase -II Atomicity and total ordering ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0+"A"+m.getmessagesource()+"A"+m.getmessageid()); /* 0 stands for initial proiority , which will be changed later */ abcastQueue.put(u,m); sendAbcastAck(m,this.getCounterValue()); //check list of waiting messages in causal_queue check(m); // recursive call } else { //System.out.println( "message out of order "); causalQueue.put(""+m.getmessagesource()+"A"+m.getmessagenum(),m); } } else { causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); // Phase -II Atomicity and total ordering //second phase starts for the first message recived from a client ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0 +"A"+ m.getmessagesource()+"A"+m.getmessageid()); abcastQueue.put(u,m); // change it later sendAbcastAck(m,this.getCounterValue()); } } catch(Exception e) { System.out.println("processAbcast :"+e); } } /* ------------------------------------------------------------- */ public void check(Message m) { try { if(causalQueue.containsKey(m.getmessagesource()+"A"+(m.getmessagenum()+1))) { causalCheck.put(""+m.getmessagesource(),""+(m.getmessagenum()+1)); //second phase starts Message temp=(Message)causalQueue.get(m.getmessagesource()+"A"+(m.getmessagenum() +1 )); ((Abcast)temp.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0+"A"+temp.getmessagesource()+"A"+temp.getmessageid()); // System.out.println("check function"+u); abcastQueue.put(u ,temp); // check this later sendAbcastAck(temp,this.getCounterValue()); check(temp); } else { return; } } catch(Exception e) { System.out.println("Error in check :"+e); } } /* ------------------------------------------------------------- */ public void processAbcast2(Message m) { try { /* Phase -I check for causal order */ //System.out.println("abcast2 received"); if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); // System.out.println("processab2 value of x "+x+" "+m.getmessagenum()); if( x == (m.getmessagenum() - 1)) { causalCheck.put(""+m.getmessagesource(),""+(x+1)); // Phase -2 Atomicity and total ordering v = new String(0+"A"+m.getmessagesource()+"A"+m.getmessageid()); /* 0 stands for priority */ newm = (Message)abcastQueue.remove(v); ((Abcast)newm.getmessageobj()).setpriority( ((Abcast2)m.getmessageobj()).getpriority()); ((Abcast)newm.getmessageobj()).setmessagestatus(DELIVERABLE); // System.out.println("made deliverable"); abcastQueue.put(v,newm); processAbcastQueue(); } else { causalQueue.put(""+m.getmessagesource()+"A"+m.getmessagenum(),m); // System.out.println("causalQueue"); } } else { causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); //second phase starts for the first message recived from a client v = new String(0+"A"+ m.getmessagesource() + "A" + m.getmessageid()); newm = (Message)abcastQueue.remove(v); ((Abcast)newm.getmessageobj()).setpriority( ((Abcast2)m.getmessageobj()).getpriority()); ((Abcast)newm.getmessageobj()).setmessagestatus(DELIVERABLE); // System.out.println("made deliverable"); abcastQueue.put(v,newm); processAbcastQueue(); } } catch(Exception e) { System.out.println("processAbcast2 :"+e); } } public synchronized void processAbcastQueue() { try { /* to deliver message whose status bit is "DELIVERABLE" */ boolean flag = true; Message dm; // System.out.println("processAbcastQue"); while (flag) { if(!abcastQueue.isEmpty()) { dm = (Message) abcastQueue.get(abcastQueue.lastKey()); if( ((Abcast)dm.getmessageobj()).getmessagestatus() == DELIVERABLE ) { // System.out.println("finally delivered"); dca.Produce((Message) abcastQueue.remove(abcastQueue.lastKey()) ); //System.out.println("Message delivered to App layer"+abcastQueue.size()); delayCount = 0; } else //potential problem { /* // if it enters this loop ten times => the message might be lost flag=false; delayCount++; if (delayCount == 10) { if(deleteFlag == true) { abcastQueue.remove(abcastQueue.lastKey()); deleteFlag = false; } else { sendProbe( (Message) abcastQueue.get(abcastQueue.lastKey())); } } */ } } else { flag=false; } } } catch(Exception e) { System.out.println("processAbcastQueue :"+e); } } /* ------------------------------------------------------------- */ public void sendAbcastAck(Message m,int priority) { try { // System.out.println("Abcast_aCk sent"); AbcastAck aack = new AbcastAck(priority); Message mack = new Message(ID,ABCAST_ACK,m.getmessageid(),m.getgid(),aack,priority,m.getmessagesource(),-1); /* here priority and CTR will have the same value */ dct.Produce(mack); // Debug.dump(DataSR_comm.Comm_SR," Abcast Ack added to queue "); /* send this message to only one source */ } catch(Exception r) { System.out.println("sendAbcastAck :"+r); } } public void processAbcastAck(Message m) { int currentp,ackp,currentmax; // long waitforack; try { // s = new String(m.getmessagesource()+"A"+m.getmessageid()); // PROBLEM AREA System.out.println("AbcastAck received-Viewsetsize: "+viewset.size()); ((HashSet)viewcheck.get(""+m.getmessageid())).remove(""+m.getmessagesource()); currentp=Integer.parseInt((String)viewpriority.get(""+m.getmessageid())); ackp=((AbcastAck)(m.getmessageobj())).getpriority(); viewpriority.put(""+m.getmessageid(),""+Math.max(currentp,ackp)); currentmax=Math.max(currentp,ackp); /** Forms the message**/ if(((HashSet)viewcheck.get(""+m.getmessageid())).size()==0) { Abcast2 ab2 = new Abcast2(currentmax); mab2 = new Message(ID,ABCAST2,m.getmessageid(),0,ab2,this.getCounterValue(),-1,0); dct.Produce(mab2); /**sends it on receiving all the acknowldegements**/ abcastsent=true; // System.out.println("ABCAST2 sent"); } else { System.out.println("thread started"); Thread t=new Thread(new Timer(10,dtc)); t.start();//timer waits for 10 secs- if no acks are received during this period then the abcast2 is sent // System.out.println("waiting for abcastack"); } } catch(Exception e) { System.out.println("processAbacstAck :"+e); } } public void processAbcastAck() { /** this method is invoked by the fault tolerance mechanism when all acks are not received and the view is updated**/ dct.Produce(mab2); } void init() { boolean ontime=true; viewset.clear(); m = new Message(ID,VIEWREQUEST,0,0,"VIEWREQUEST",0,-1,0); dct.Produce(m); try { while(ontime || !dtc.TC.isEmpty()) { Message m=(Message)dtc.Consume(); System.out.println("enter1"); System.out.println("message type "+m.getmessagetype()+" message source "+m.getmessagesource()+" msg_id"+m.getmessageid()); processViewResult(m); System.out.println("enter2"); Thread.sleep(2000); ontime=false; } } catch(Exception e) { System.out.println("error in init"+e); } } public void sendProbe(Message m) { System.out.println("probe sent &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&"); Message x = new Message(ID,PROBE,m.getmessageid(),0," " ,this.getCounterValue(),m.getmessagesource(),0); // multicast the message /* M_DEST IS SET TO THE CLIENT ID WHICH SENT THE OIGINAL MESSAGE */ deleteCount = viewsize; dct.Produce(x); } public void processProbe(Message m) { /* // check message in causal queue . if present reply .. else check in abcast queue .. and send the message */ System.out.println("processprobe"); String skey = new String (0+"A"+m.getmessagedest()+"A"+m.getmessageid()); int prio; int status; if(abcastQueue.containsKey(skey)) { prio = ((Abcast)((Message)abcastQueue.get(0+"A"+m.getmessagedest()+"A"+m.getmessageid())).getmessageobj()).getpriority(); status = ((Abcast)((Message)abcastQueue.get(0+"A"+m.getmessagedest()+"A"+m.getmessageid())).getmessageobj()).getmessagestatus(); ProbeAck pa = new ProbeAck(status,prio); Message mpa = new Message (m.getmessagedest(),PROBE_ACK,m.getmessageid(),0,pa,this.getCounterValue(),m.getmessagesource(),-1); // HERE ID IS SET TO THE ID OF THE CLIENT WHO HAS FAILED ... i.e m.getmessagedest(); dct.Produce(mpa); } // SEE THIS LATER // else // if(present_in_history.containsKey(skey)) /* { prio = status = ProbeAck pa = new ProbeAck(status,prio); Message mpa = new Message (m.getmessagedest(),PROBE_ACK,m.getmessageid(),0,pa,this.getCounterValue(),m.getmessagesource(),-1); dct.Produce(mpa); } */ else { // message not received ... do nothing .... take care of yourself... others will take care of themselves } } public void processProbeAck(Message m) { System.out.println("processprobeack"); /* if deliverable , set the proirity in the abcast queue */ // else delete the message after waiting for random amount of time String spack = new String(0+"A"+m.getmessagesource()+"A"+m.getmessageid()); if(((ProbeAck)m.getmessageobj()).getmessagestatus()==DELIVERABLE) { deleteCount=0; if (abcastQueue.containsKey(spack)) { Message t1 = (Message)abcastQueue.remove(spack); ((Abcast)t1.getmessageobj()).setpriority( ((ProbeAck)m.getmessageobj()).getpriority()); ((Abcast)t1.getmessageobj()).setmessagestatus(DELIVERABLE); abcastQueue.put(spack,t1); processAbcastQueue(); } else { // this means that the message might have been delivered by receiving ProbeAck from some other server ... } } else { // the message at this client is also undeliverable ..FAILURE . we are in the same boat.. //wait for some time and discard the message .... TIMER deleteCount--; if(deleteCount <= 0) deleteFlag=true; } } public void processViewRequest(Message m) { m.setmessagetype(VIEWRESULT); m.setmessagedest(m.getmessagesource()); m.setmessagesource(this.ID); m.setmessagegrp(-1); dct.Produce(m); } public void processViewResult(Message m) { viewset.add(""+m.getmessagesource()); System.out.println(viewset.size()); } }