Skip to content

File FreezeLocal.java

File List > behaviour > SKD > FreezeLocal.java

Go to the documentation of this file

package skydata.behaviour.SKD;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;

import com.google.common.collect.Sets;

import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.core.behaviours.WakerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import skydata.internal.agents.SKAgent;
import skydata.internal.agents.SKD;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.message.RSBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

public class FreezeLocal extends SKAgentBehaviour {

    static long deltaLocal = 5000;

    enum FREEZING_ANSWER {
        YES, NO, ALREADY
    }

    static class Data implements Serializable {
        enum Stat {
            NONE, IN_PROGRESS, ENDED, CANCELLED
        }

        Stat isFreezing = Stat.NONE;
        Set<SKAID> listFreezing = new HashSet<>();
        Set<SKAID> listSended = new HashSet<>();
        Set<SKAID> listAnswer = new HashSet<>();
        Set<SKAID> listCrashed = new HashSet<>();
        SKAID initiator;
        HashMap<SKAID, Long> lastAlive = new HashMap<>();
    }

    public FreezeLocal(SKAgent agent) {
        super(agent);
    }

    public boolean isFinished(Data data) {
        for (SKAID a : data.listSended) {
            if (!(agent.getSKAID().equals(a) || data.listAnswer.contains(a) || data.listCrashed.contains(a))) {
                return false;
            }
        }
        return true;
    }

    protected void afterFreeze(Data data) {
        agent.print("Freezing ended" + data.listFreezing);
    }

    protected void unfreeze(Data data) {
        SKD agent = (SKD) this.agent;
        data.isFreezing = Data.Stat.CANCELLED;
        agent.addBehaviour(new WakerBehaviour(agent, deltaLocal) {
            @Override
            public void onWake() {
                agent.print("Je quitte !");
                agent.unblockMigrate();
                data.isFreezing = Data.Stat.NONE;
            }
        });
    }

    @Override
    public void action() {
        SKD agent = (SKD) this.agent;
        int rg = (Integer) agent.args.get("rg");
        int rgLocal = 1;
        agent.createStorageObject("FREEZING_DATA", new Data());
        Data data = (Data) agent.getStorageObject("FREEZING_DATA");

        // Start the algorithm
        agent.addBehaviour(new TickerBehaviour(agent, deltaLocal / 2) {
            @Override
            public void onTick() {
                Set<SKAID> sameHarbour = agent.getFamilySameHarbour();
                if (sameHarbour.size() > rgLocal && agent.getFamilySize() > rg) {
                    if (data.isFreezing == Data.Stat.NONE) {
                        agent.print("Je débute le freeze");
                        agent.blockMigrate();
                        data.listSended.addAll(sameHarbour);
                        data.listSended.remove(agent.getSKAID());
                        data.lastAlive.clear();
                        long now = Instant.now().getMillis();
                        for (SKAID a : data.listSended) {
                            data.lastAlive.put(a, now + deltaLocal);
                        }
                        data.isFreezing = Data.Stat.IN_PROGRESS;
                        data.initiator = agent.getSKAID();
                        data.listFreezing.add(agent.getSKAID());
                        SKLMessage msg = new SKLMessage("FREEZE_LOCAL", "FREEZING");
                        agent.broadcast(new RSBroadcast(), msg, data.listSended);
                    }
                }
            }
        });

        // Heartbeat
        agent.addBehaviour(new TickerBehaviour(agent, deltaLocal) {
            @Override
            public void onTick() {
                if (data.isFreezing != Data.Stat.CANCELLED) {
                    if (data.isFreezing != Data.Stat.NONE) {
                        SKLMessage msg = new SKLMessage("FREEZE_LOCAL", "ALIVE");
                        msg.setContent(true);
                        msg.addReceiver(data.initiator);
                        agent.broadcast(new RSBroadcast(), msg, data.listSended);
                        long now = Instant.now().getMillis();
                        Iterator<SKAID> iter = data.listSended.iterator();
                        while (iter.hasNext()) {
                            SKAID a = iter.next();
                            if (data.lastAlive.containsKey(a) && now - data.lastAlive.get(a) >= deltaLocal) {
                                agent.print(a.getName() + " a crash");
                                data.listCrashed.add(a);
                                iter.remove();
                            }
                        }
                        if ((data.isFreezing == Data.Stat.ENDED
                                && Sets.difference(data.listFreezing, data.listCrashed).size() <= rgLocal)
                                || (data.listSended.size() == 0 && data.isFreezing == Data.Stat.IN_PROGRESS)) {
                            unfreeze(data);
                        }
                    }
                }
            }
        });

        // Receive heartbeat
        MessageTemplate mtAlive = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("ALIVE"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        agent.addBehaviour(new CyclicBehaviour(agent) {
            @Override
            public void action() {
                if (data.isFreezing != Data.Stat.CANCELLED) {
                    SKLMessage msg = agent.skReceive(mtAlive);
                    if (msg == null) {
                        block();
                        return;
                    }
                    SKAID sender = msg.getSender();
                    data.lastAlive.put(sender, Instant.now().getMillis());

                }
            }
        });

        // Receive request of freeze
        MessageTemplate mtFreezing = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("FREEZING"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        agent.addBehaviour(new CyclicBehaviour(agent) {

            @Override
            public void action() {
                if (data.isFreezing != Data.Stat.CANCELLED) {

                    SKLMessage msg = agent.skReceive(mtFreezing);
                    if (msg == null) {
                        block();
                        return;
                    }
                    SKAID sender = msg.getSender();
                    agent.print(sender.getName() + " veut débuter le freeze");
                    agent.blockMigrate();
                    if (data.isFreezing == Data.Stat.NONE) {
                        data.isFreezing = Data.Stat.IN_PROGRESS;
                        data.initiator = sender;
                        SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
                        answer.addReceiver(sender);
                        Set<SKAID> sameHarbour = agent.getFamilySameHarbour();
                        answer.setContent(Pair.of(FREEZING_ANSWER.YES, sameHarbour));
                        agent.skSendReliable(answer);
                    } else if (data.isFreezing == Data.Stat.IN_PROGRESS
                            && data.initiator.equals(agent.getSKAID())) {
                        if (sender.compareTo(data.initiator) < 0) {
                            SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
                            answer.setContent(Pair.of(FREEZING_ANSWER.NO, null));
                            agent.skSendReliable(answer);
                            data.listSended.add(sender);
                            data.listFreezing.add(sender);
                        } else {
                            SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
                            answer.addReceiver(sender);
                            answer.setContent(Pair.of(FREEZING_ANSWER.YES, data.listSended));
                            data.initiator = sender;
                            agent.skSendReliable(answer);
                            SKLMessage changeInit = new SKLMessage("CHANGE_INIT", "FREEZE_LOCAL");
                            changeInit.setContent(sender);
                            agent.skSendReliable(changeInit);
                        }
                    } else {
                        SKLMessage answer = new SKLMessage("FREEZING_ANSWER", "FREEZING");
                        answer.addReceiver(sender);
                        answer.setContent(Pair.of(FREEZING_ANSWER.ALREADY, null));
                        agent.skSendReliable(answer);
                    }
                }
            }
        });

        // Receive answer of freeze
        MessageTemplate mtFreezingAnswer = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("FREEZING_ANSWER"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));

        agent.addBehaviour(new CyclicBehaviour(agent) {

            @SuppressWarnings("unchecked")
            public void action() {
                if (data.isFreezing != Data.Stat.CANCELLED) {
                    SKLMessage msg = agent.skReceive(mtFreezingAnswer);
                    if (msg == null) {
                        block();
                        return;
                    }

                    Data data = (Data) agent.getStorageObject("FREEZING_DATA");

                    SKAID sender = msg.getSender();
                    Pair<FREEZING_ANSWER, Set<SKAID>> answer = (Pair<FREEZING_ANSWER, Set<SKAID>>) msg.getContent();
                    agent.print(sender.getName() + " me répond " + answer.getLeft());
                    data.listAnswer.add(sender);
                    if (answer.getLeft() == FREEZING_ANSWER.YES) {
                        Set<SKAID> diff = Sets.difference(answer.getRight(), data.listFreezing);
                        if (diff.size() != 0) {
                            data.listSended.addAll(diff);
                        }
                        data.listFreezing.add(sender);
                        if (isFinished(data)) {
                            data.isFreezing = Data.Stat.ENDED;
                            SKLMessage endFreeze = new SKLMessage("FREEZE_LOCAL", "FREEZING_ENDED");
                            endFreeze.setContent((Serializable) data.listFreezing);
                            agent.broadcast(new RSBroadcast(), endFreeze, data.listFreezing);
                            afterFreeze(data);
                        }
                    }

                }
            }
        });

        // Change of initiator
        MessageTemplate mtChangeInit = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("CHANGE_INIT"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        agent.addBehaviour(new CyclicBehaviour(agent) {

            @Override
            public void action() {
                if (data.isFreezing != Data.Stat.CANCELLED) {

                    SKLMessage msg = agent.skReceive(mtChangeInit);
                    if (msg == null) {
                        block();
                        return;
                    }

                    Data data = (Data) agent.getStorageObject("FREEZING_DATA");
                    SKAID sender = msg.getSender();
                    if (data.initiator.equals(sender)) {
                        data.initiator = (SKAID) msg.getContent();
                        SKLMessage freezingAnswer = new SKLMessage("FREEZING", "FREEZE_LOCAL");
                        agent.skSendReliable(freezingAnswer);
                    }

                }
            }
        });

        // End of freeze
        MessageTemplate mtFreezingEnded = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FREEZE_LOCAL"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("FREEZING_ENDED"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        agent.addBehaviour(new CyclicBehaviour(agent) {

            @SuppressWarnings("unchecked")
            @Override
            public void action() {
                if (data.isFreezing != Data.Stat.CANCELLED) {

                    SKLMessage msg = agent.skReceive(mtFreezingEnded);
                    if (msg == null) {
                        block();
                        return;
                    }
                    Data data = (Data) agent.getStorageObject("FREEZING_DATA");
                    data.listFreezing = (Set<SKAID>) msg.getContent();
                    data.isFreezing = Data.Stat.ENDED;
                    afterFreeze(data);

                }
            }
        });
    }

}