Skip to content

File Quorum.java

File List > algorithm > migration > Quorum.java

Go to the documentation of this file

package skydata.behaviour.algorithm.migration;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

import org.javatuples.Pair;
import org.javatuples.Quintet;

import jade.core.AID;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import skydata.internal.agents.SKAgent;
import skydata.internal.agents.SKD;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

public class Quorum extends SKAgentBehaviour {

    private SKD skd;

    private DataAlgorithm d;

    public Quorum(SKAgent agent) {
        super(agent);
        skd = (SKD) agent;
    }

    static private class DataAlgorithm implements Serializable {
        @SuppressWarnings("unused")
        AID wantedHarbour;
        // Quorum detector
        boolean haveQuorum;
        Set<SKAID> quorum;
        Set<SKAID> nextQuorum;
        Set<SKAID> possibleMembers;
        ArrayList<Set<SKAID>> allQuorums;
        Set<SKAID> allOldMembers;
        // Algorithm
        HashMap<String, SKAID> future_locs;
        HashMap<String, Integer> fam_rnds;
        Set<SKAID> answers;
    }

    private void addTwice(SKAID receiver, SKLMessage msg) {
        SKAID recentReceiver = receiver;
        for (SKAID m : skd.getFamily()) {
            if (m.getName().equals(receiver.getName())) {
                recentReceiver = m;
                break;
            }
        }
        msg.addReceiver(recentReceiver);
        if (d.future_locs.containsKey(receiver.getName())) {
            msg.addReceiver(d.future_locs.get(receiver.getName()));
        }
    }

    private void broadcastTwice(Set<SKAID> agents, SKLMessage msg) {
        for (SKAID s : agents) {
            this.addTwice(s, msg);
        }
        agent.skSendReliable(msg);
    }

    private void mergePositions(Set<SKAID> newPositions) {
        for (SKAID a : newPositions) {
            skd.updatePositionFamily(a);
        }
    }

    private boolean isUpdate(SKAID member) {
        Integer last_r = d.fam_rnds.getOrDefault(member.getName(), 0);
        if (last_r <= member.getNbMigration())
            return true;
        return false;
    }

    private boolean migrationAccepted() {
        for(Set<SKAID> quorum : d.allQuorums) {
            if(d.answers.containsAll(quorum)) {
                return true;
            }
        }
        return false;
    }

    @Override
    public void action() {

        // Store the data of the algorithm
        if (!agent.hasStorageObject("MIGRATION_QUORUM")) {
            d = new DataAlgorithm();
            d.quorum = new HashSet<>();
            d.allQuorums = new ArrayList<>();
            d.allOldMembers = new HashSet<>();
            d.nextQuorum = new HashSet<>();
            d.haveQuorum = false;
            d.wantedHarbour = null;
            d.future_locs = new HashMap<>();
            d.fam_rnds = new HashMap<>();
            d.answers = new HashSet<>();
            agent.createStorageObject("MIGRATION_QUORUM", d);
        }

        d = (DataAlgorithm) skd.getStorageObject("MIGRATION_QUORUM");

        // When the SKD wants to migrate
        SKAgentBehaviour whenWantMigrate = new SKAgentBehaviour(agent) {
            @Override
            public void actionWithParameters(Object o) {
                if (d.haveQuorum && d.wantedHarbour == null) {
                    AID wantedAid = (AID) o;
                    d.wantedHarbour = wantedAid;
                    d.answers.clear();
                    d.allOldMembers.clear();
                    d.allQuorums.clear();
                    d.allOldMembers.addAll(d.quorum);
                    Set<SKAID> copy = new HashSet<>();
                    copy.addAll(d.quorum);
                    d.allQuorums.add(copy);
                    skd.getRealSKAID().nextMigration();
                    SKLMessage request = new SKLMessage("MIGRATION_REQUEST", "MIGRATION");
                    Pair<Integer, String> data = Pair.with(skd.getSKAID().getNbMigration(),
                            d.wantedHarbour.getAddressesArray()[0]);
                    request.setContent(data);
                    broadcastTwice(d.allOldMembers, request);
                }
            }
        };

        skd.addInternalUpdate("WANT_MIGRATE", whenWantMigrate);

        SKAgentBehaviour whenFamilyRemoved = new SKAgentBehaviour(agent) {
            @Override
            public void actionWithParameters(Object o) {
                if (!d.haveQuorum || !skd.getFamily().containsAll(d.quorum)) {

                    if (d.possibleMembers == null) {
                        if (skd.getFamilySize() >= (Integer) skd.args.get("rg") / 2 + 1) {
                            d.possibleMembers = new HashSet<>();
                            d.possibleMembers.addAll(skd.getFamily());
                        } else {
                            return;
                        }
                    }
                    d.nextQuorum.clear();
                    SKLMessage message = new SKLMessage("QUORUM_ASK", "MIGRATION");
                    broadcastTwice(d.possibleMembers, message);
                }
            }
        };
        skd.addInternalUpdate("FAMILY_REMOVED", whenFamilyRemoved);

        // When the migration is finished
        SKAgentBehaviour afterMigration = new SKAgentBehaviour(agent) {
            public void action() {
                SKLMessage success = new SKLMessage("MIGRATION_END", "MIGRATION");
                broadcastTwice(skd.getFamily(), success);
                d.wantedHarbour = null;
            }
        };

        skd.addInternalUpdate("AFTER_MIGRATION", afterMigration);

        // Start the creation of a new quorum
        skd.addBehaviour(new TickerBehaviour(skd, 20000) {
            @Override
            public void onTick() {
                if (!d.haveQuorum || !skd.getFamily().containsAll(d.quorum)) {
                    if (d.possibleMembers == null) {
                        if (skd.getFamilySize() >= (Integer) skd.args.get("rg") / 2 + 1) {
                            d.possibleMembers = new HashSet<>();
                            d.possibleMembers.addAll(skd.getFamily());
                        } else {
                            return;
                        }
                    }
                    d.nextQuorum.clear();
                    SKLMessage message = new SKLMessage("QUORUM_ASK", "MIGRATION");
                    broadcastTwice(d.possibleMembers, message);
                }
            }
        });

        skd.addBehaviour(new TickerBehaviour(skd, 5000) {
            public void onTick() {
                if (d.wantedHarbour != null) {
                    SKLMessage request = new SKLMessage("MIGRATION_REQUEST", "MIGRATION");
                    Pair<Integer, String> new_data = Pair.with(skd.getSKAID().getNbMigration(),
                            d.wantedHarbour.getAddressesArray()[0]);
                    request.setContent(new_data);
                    broadcastTwice(d.quorum, request);
                }
            }
        });

        // I receive a request for a quorum
        MessageTemplate mtQuorumRequest = MessageTemplate.and(
                MessageTemplate.MatchOntology("QUORUM_ASK"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtQuorumRequest);
                if (message == null) {
                    block();
                    return;
                }
                SKLMessage answer = new SKLMessage("QUORUM_ANSWER", "MIGRATION");
                SKAID sender = message.getSender();
                addTwice(sender, answer);
                agent.skSendReliable(answer);
            }
        });

        // I receive an answer for a quorum
        MessageTemplate mtQuorumAnswer = MessageTemplate.and(
                MessageTemplate.MatchOntology("QUORUM_ANSWER"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtQuorumAnswer);
                if (message == null) {
                    block();
                    return;
                }
                SKAID sender = message.getSender();
                d.nextQuorum.add(sender);
                if (d.nextQuorum.size() > d.possibleMembers.size() / 2) {
                    d.quorum.clear();
                    d.quorum.addAll(d.nextQuorum);
                    d.allOldMembers.addAll(d.quorum);
                    Set<SKAID> copy = new HashSet<>();
                    copy.addAll(d.quorum);
                    d.allQuorums.add(copy);
                    d.haveQuorum = true;
                }

            }
        });

        MessageTemplate mtMigrationRequest = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_REQUEST"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));

        skd.addBehaviour(new CyclicBehaviour() {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationRequest);
                if (message == null) {
                    block();
                    return;
                }
                Pair<Integer, String> data = (Pair<Integer, String>) message.getContent();
                SKAID sender = message.getSender();
                if (isUpdate(sender)) {
                    d.fam_rnds.put(sender.getName(), data.getValue0());
                    SKAID copy = (SKAID) sender.clone();
                    copy.updateAddress(data.getValue1());
                    d.future_locs.put(message.getSender().getName(), copy);
                    SKLMessage answer = new SKLMessage("MIGRATION_ANSWER", "MIGRATION");
                    Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>> newData = Quintet
                            .with(data.getValue0(), data.getValue1(), d.fam_rnds, skd.getFamily(), d.future_locs);
                    answer.setContent(newData);
                    addTwice(sender, answer);
                    skd.skSendReliable(answer);
                } else {
                    skd.print("Not update !");
                }

            }
        });
        MessageTemplate mtMigrationAnswer = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_ANSWER"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationAnswer);
                if (message == null) {
                    block();
                    return;
                }
                Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>> data = (Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>>) message
                        .getContent();
                if (data.getValue0() == skd.getSKAID().getNbMigration()) {
                    skd.print(("Je reçoit une réponse"));
                    for (SKAID k : data.getValue3()) {
                        int last_r = d.fam_rnds.getOrDefault(k.getName(), 0);
                        int last_rj = data.getValue2().getOrDefault(k.getName(), 0);
                        SKAID last_futur_loc_j = data.getValue4().get(k.getName());
                        if (last_rj > 0 && last_r < last_rj) {
                            d.fam_rnds.put(k.getName(), last_rj);
                            skd.updatePositionFamily(k);
                            d.future_locs.put(k.getName(), last_futur_loc_j);
                            skd.print("J'apprend que " + k.getName() + "a bougé !");
                        }
                    }
                    d.answers.add(message.getSender());
                    if(migrationAccepted()) {
                        skd.migrate(d.wantedHarbour, true);
                    }

                }
            }
        });

        // I receive an update of migration
        MessageTemplate mtMigrationUpdate = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_UPDATE"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationUpdate);
                if (message == null) {
                    block();
                    return;
                }
                SKAID sender = message.getSender();
                int rnd = sender.getNbMigration();
                int last_r = d.fam_rnds.getOrDefault(message.getSender().getName(), 0);
                if (last_r < rnd) {
                    d.fam_rnds.put(message.getSender().getName(), last_r);
                    skd.updatePositionFamily(message.getSender());
                }
            }
        });

    }

}