Skip to content

File LeaderBased.java

File List > algorithm > migration > LeaderBased.java

Go to the documentation of this file

package skydata.behaviour.algorithm.migration;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;

import jade.core.AID;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import skydata.internal.agents.SKAgent;
import skydata.internal.agents.SKD;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

public class LeaderBased extends SKAgentBehaviour {

    private SKD skd;

    public LeaderBased(SKAgent agent) {
        super(agent);
        skd = (SKD) agent;
    }

    static private class DataAlgorithm implements Serializable {
        @SuppressWarnings("unused")
        SKAID leader;
        AID wantedHarbour;
    }

    private void mergePositions(Set<SKAID> newPositions) {
        for (SKAID a : newPositions) {
            skd.updatePositionFamily(a);
        }
    }

    @Override
    public void action() {

        // Store the data of the algorithm
        if (!agent.hasStorageObject("MIGRATION_LEADER")) {
            DataAlgorithm d = new DataAlgorithm();
            d.leader = skd.getSKAID();
            d.wantedHarbour = null;
            agent.createStorageObject("MIGRATION_LEADER", d);
        }

        final DataAlgorithm d = (DataAlgorithm) skd.getStorageObject("MIGRATION_LEADER");

        // Make a heartbeat every 2s
        skd.addBehaviour(new TickerBehaviour(skd, 2000) {
            @Override
            public void onTick() {
                if (d.leader != null && d.leader.equals(skd.getSKAID())) {
                    SKLMessage message = new SKLMessage("ALIVE", "MIGRATION");
                    skd.broadcastFamily(message, false);

                }
            }
        });

        // When the SKD wants to migrate
        SKAgentBehaviour whenWantMigrate = new SKAgentBehaviour(agent) {
            @Override
            public void actionWithParameters(Object o) {
                if (!d.leader.getName().equals(skd.getName())) {
                    AID wantedAid = (AID) o;
                    d.wantedHarbour = wantedAid;
                    SKLMessage request = new SKLMessage("MIGRATION_REQUEST", "MIGRATION");
                    request.addReceiver(d.leader);
                    skd.skSendReliable(request);
                    skd.print("[WANT_MIGRATE]");
                }
            }

            @Override
            public void action() {
                skd.print("here !");
            }
        };

        skd.addInternalUpdate("WANT_MIGRATE", whenWantMigrate);

        // When a member of family is removed
        SKAgentBehaviour whenFamilyRemoved = new SKAgentBehaviour(agent) {
            public void action() {
                if (!skd.getFamily().contains(d.leader)) {
                    d.leader = skd.getSKAID();
                }
            }
        };

        skd.addInternalUpdate("FAMILY_REMOVED", whenFamilyRemoved);

        // When the migration is finished
        SKAgentBehaviour afterMigration = new SKAgentBehaviour(agent) {
            public void action() {
                String address = skd.getSKAID().getAddress();
                SKLMessage success = new SKLMessage("MIGRATION_LEADER", "MIGRATION");
                success.addReceiver(d.leader);
                success.setContent(address);
                skd.skSendReliable(success);
                SKLMessage update = new SKLMessage("UPDATE_POSITION", "MIGRATION");
                skd.broadcastFamily(update);
            }
        };

        skd.addInternalUpdate("AFTER_MIGRATION", afterMigration);

        // I receive a heartbeat
        MessageTemplate mtAlive = MessageTemplate.and(
                MessageTemplate.MatchOntology("ALIVE"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage msg = skd.skReceive(mtAlive);
                if (msg == null) {
                    block();
                    return;
                }
                if (msg.getSender().getName().compareTo(skd.getSKAID().getName()) > 0 &&
                        msg.getSender().getName().compareTo(d.leader.getName()) > 0) {
                    d.leader = msg.getSender();
                }
            }
        });

        // I am the leader and I receive update from a family member
        MessageTemplate mtMigrationLeader = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_LEADER"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationLeader);
                if (message == null) {
                    block();
                    return;
                }
                skd.updatePositionFamily(message.getSender());

                SKLMessage answer = new SKLMessage("UPDATE_FAMILY", "MIGRATION");
                Set<SKAID> copiedFamily = new HashSet<>(skd.getFamily());
                answer.setContent((Serializable) copiedFamily);
                skd.broadcastFamily(answer);
            }
        });

        // I receive update from the leader
        MessageTemplate mtMigrationUpdate = MessageTemplate.and(
                MessageTemplate.MatchOntology("UPDATE_FAMILY"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationUpdate);
                if (message == null) {
                    block();
                    return;
                }
                Set<SKAID> newPositions = (Set<SKAID>) message.getContent();
                mergePositions(newPositions);
            }
        });

        MessageTemplate mtMigrationUpdate2 = MessageTemplate.and(
                MessageTemplate.MatchOntology("UPDATE_POSITION"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtMigrationUpdate);
                if (message == null) {
                    block();
                    return;
                }
                skd.updatePositionFamily(message.getSender());
            }
        });



        // I am the leader and I receive a request from someone
        MessageTemplate mtRequest = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_REQUEST"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtRequest);
                if (message == null) {
                    block();
                    return;
                }

                if (d.leader.equals(skd.getSKAID())) {
                    SKLMessage reply = new SKLMessage("MIGRATION_ANSWER", "MIGRATION");
                    reply.addReceiver(message.getSender());
                    reply.setContent(true);
                    skd.skSendReliable(reply);
                }

            }
        });

        // I receive an answer from the leader
        MessageTemplate mtAnswer = MessageTemplate.and(
                MessageTemplate.MatchOntology("MIGRATION_ANSWER"),
                MessageTemplate.and(
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM),
                        MessageTemplate.MatchProtocol("MIGRATION")));
        skd.addBehaviour(new CyclicBehaviour(skd) {
            public void action() {
                SKLMessage message = skd.skReceive(mtAnswer);
                if (message == null) {
                    block();
                    return;
                }
                if (d.leader.equals(message.getSender())) {
                    Boolean can = (Boolean) message.getContent();
                    if (d.wantedHarbour != null && can)
                        skd.migrate(d.wantedHarbour, true);
                }

            }
        });

    }

}