Skip to content

File SKD.java

File List > agents > SKD.java

Go to the documentation of this file

package skydata.internal.agents;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import jade.core.AID;
import jade.core.ContainerID;
import jade.core.Location;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import skydata.internal.behaviours.RegularlyUpdateKnowledge;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.behaviours.UpdateFamilyKnowledge;
import skydata.internal.message.NBroadcast;
import skydata.internal.message.RSBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

public class SKD extends SKAgent {

    public int lastReplicateIndex = 0;

    protected int nbBlockingReplicate = 0;

    public Set<SKAID> deletedFamilyMembers = new HashSet<>();

    public Set<SKAID> crashedFamilyMembers = new HashSet<>();

    protected Set<SKAID> family = new HashSet<>();

    protected HashMap<SKAID, LocalDateTime> suspectedFamily = new HashMap<>();

    private String cloningContainerName;

    private String cloningContainerAddress;

    Map<SKAID, LocalDateTime> lastReceived;

    long failureDetectionAfter;

    Random rand = new Random();

    // -----------------------------------------------------------------------------------
    // Initialisation
    // -----------------------------------------------------------------------------------

    public SKD() {
        super();
    }

    public Map<SKAID, LocalDateTime> createLastReceived(Set<SKAID> family) {
        Map<SKAID, LocalDateTime> lastR = new HashMap<>();
        for (SKAID member : family) {
            if (!member.equals(getSKAID()))
                lastR.put(member, LocalDateTime.now());
        }

        return lastR;
    }

    @SuppressWarnings("unchecked")
    @Override
    protected void setup() {
        super.setup();

    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    protected void reset() {
        super.reset();
        this.nbBlockingReplicate = 0;
        this.lastReplicateIndex = 0;
        lastReceived = createLastReceived(family);
        failureDetectionAfter = (Integer) (this.args.get("failureDetectionAfter"));
        this.suspectedFamily.clear();
        setupStats();
        setupFamily();

    }

    public boolean canReplicate() {
        return nbBlockingReplicate == 0;
    }

    public void blockReplicate() {
        nbBlockingReplicate++;
    }

    public void unblockReplicate() {
        if (nbBlockingReplicate > 0)
            nbBlockingMigrate--;
    }

    @Override
    public SKLMessage skReceive(MessageTemplate mt) {
        SKLMessage msg = super.skReceive(mt);
        if (msg != null) {
            SKAID sender = msg.getSender();
            if (family.contains(sender)) {
                lastReceived.put(sender, LocalDateTime.now());
                this.unsuspectMemberFamily(sender);
            }
        }
        return msg;
    }

    // -----------------------------------------------------------------------------------
    // Manage family
    // -----------------------------------------------------------------------------------

    public String getFamilyString() {
        ArrayList<SKAID> familyNames = new ArrayList<>(getFamily());
        ArrayList<String> memberNames = new ArrayList<>();
        if (getFamilySize() > 0) {
            for (int i = 0; i < getFamilySize(); i++) {
                String memberAddress = familyNames.get(i).getAddress();
                memberNames.add("(" + familyNames.get(i).getName() + " : "
                        + memberAddress + ")");
            }
        }
        return memberNames.toString();
    }

    public Set<SKAID> getFamilySameHarbour() {
        Set<SKAID> membersSameHarbour = new HashSet<>();
        String familyName = getName().split("-")[0];
        for (SKAID m : membersHarbour) {
            if (m.getName().split("-")[0].equals(familyName)) {
                membersSameHarbour.add(m);
            }
        }
        return membersSameHarbour;
    }

    public void updatePositionFamily(SKAID m) {
        for (SKAID member : family) {
            if (member.getName().equals(m.getName()) && member.getNbMigration() <= m.getNbMigration()) {
                member.update(m.getAddress(), m.getNbMigration());
                this.internalUpdate("FAMILY_UPDATE", m);
                return;
            }
        }
        addFamily(m);
    }

    public void mergeWithFamily(Set<SKAID> agents) {
        int old = this.family.size();
        this.family.addAll(agents);

        this.family.removeAll(crashedFamilyMembers);

        if (this.family.size() != old)
            this.internalUpdate("FAMILY_ADDED");
    }

    public void addFamily(SKAID aid) {
        int old = this.family.size();
        this.family.add(aid);
        if (this.family.size() != old)
            this.internalUpdate("FAMILY_ADDED");
    }

    public void mergeWithDeletedFamilyMembers(Set<SKAID> agents) {
        int old = this.family.size();
        this.deletedFamilyMembers.addAll(agents);
        if (this.family.size() != old)
            this.internalUpdate("FAMILY_REMOVED");
    }

    public Set<SKAID> getFamily() {
        return this.family;
    }

    public Set<SKAID> getDeletedFamilyMembers() {
        return this.deletedFamilyMembers;
    }

    public void removeFromFamily(SKAID aid) {
        int old = this.family.size();
        this.family.remove(aid);
        if (this.family.size() != old)
            this.internalUpdate("FAMILY_REMOVED");
    }

    public void removeFromFamily(Set<SKAID> aids) {
        int old = this.family.size();
        this.family.removeAll(aids);
        if (this.family.size() != old)
            this.internalUpdate("FAMILY_REMOVED");
    }

    public int getFamilySize() {
        return this.family.size();
    }

    public void broadcastFamily(SKLMessage message, Boolean sendReliably) {
        if (sendReliably) {
            broadcast(new RSBroadcast(), message, family);
        } else {
            broadcast(new NBroadcast(), message, family);
        }
    }

    public void broadcastFamily(SKLMessage message) {
        broadcastFamily(message, true);
    }

    protected void setupFamily() {
        addFamily(getSKAID());
        new UpdateFamilyKnowledge(this).action();
        addBehaviour(new TickerBehaviour(this, 1000) {

            @Override
            protected void onTick() {
                LocalDateTime now = LocalDateTime.now();
                Set<SKAID> toRemove = new HashSet<>();
                for (SKAID member : family) {
                    if (!member.equals(getSKAID()) && suspectedFamily.containsKey(member)) {
                        if (Duration.between(now, suspectedFamily.get(member)).toSeconds() < -failureDetectionAfter) {
                            toRemove.add(member);
                            print("CRASH : " + member.getName());
                        }
                    }
                }
                if (!toRemove.isEmpty()) {
                    crashedFamilyMembers.addAll(toRemove);
                    removeFromFamily(toRemove);
                }
            }
        });
        SKD agent = this;

        MessageTemplate mtUpdate = MessageTemplate.and(
                MessageTemplate.MatchProtocol("HARBOUR"),
                MessageTemplate.MatchOntology("UPDATE_LIST"));
        agent.addBehaviour(new CyclicBehaviour(agent) {
            @SuppressWarnings("unchecked")
            @Override
            public void action() {
                SKLMessage acl = agent.skReceive(mtUpdate);
                if (acl == null) {
                    block();
                    return;
                }
                membersHarbour.clear();
                membersHarbour.addAll((Set<SKAID>) acl.getContent());
            }
        });

        agent.addBehaviour(new TickerBehaviour(agent, 1000) {
            public void onTick() {
                SKLMessage message = new SKLMessage("FAILURE_DETECTOR", "ALIVE");
                agent.broadcastFamily(message, false);
            }
        });

        agent.addInternalUpdate("FAMILY_UPDATE", new SKAgentBehaviour(agent) {
            @Override
            public void actionWithParameters(Object o) {
                SKAID b = (SKAID) o;
                lastReceived.put(b, LocalDateTime.now());
            }
        });

        MessageTemplate mtAlive = MessageTemplate.and(
                MessageTemplate.MatchProtocol("FAILURE_DETECTOR"),
                MessageTemplate.and(
                        MessageTemplate.MatchOntology("ALIVE"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));

        agent.addBehaviour(new CyclicBehaviour(agent) {
            @Override
            public void action() {
                SKLMessage message = agent.skReceive(mtAlive);
                if (message == null) {
                    block();
                    return;
                }
                lastReceived.put(message.getSender(), LocalDateTime.now());
                agent.unsuspectMemberFamily(message.getSender());

            }
        });

        agent.addBehaviour(new TickerBehaviour(agent, 2000) {
            public void onTick() {
                LocalDateTime now = LocalDateTime.now();
                for (SKAID member : family) {
                    if (!member.equals(agent.getSKAID()) && lastReceived.containsKey(member)) {
                        if (Duration.between(now, lastReceived.get(member)).toSeconds() < -failureDetectionAfter) {
                            agent.suspectMemberFamily(member, now);
                            lastReceived.remove(member);
                            agent.print("Suspect " + member.getName());
                        }
                    }

                }
            }
        });
    }

    public void suspectMemberFamily(SKAID suspected, LocalDateTime date) {
        if (!suspectedFamily.containsKey(suspected)) {
            suspectedFamily.put(suspected, date);
        }
    }

    public void unsuspectMemberFamily(SKAID unsuspected) {
        if (suspectedFamily.containsKey(unsuspected)) {
            print("False suspicion");
            this.failureDetectionAfter *= 2;
        }
        addFamily(unsuspected);
        suspectedFamily.remove(unsuspected);
    }

    @Override
    public void afterMove() {
        super.afterMove();
        String myName = getAID().getName();
        for (SKAID member : family) {
            if (member.getName().equals(myName)) {
                String[] addresses = getAID().getAddressesArray();
                member.update(addresses[addresses.length - 1]);
                break;
            }
        }
    }

    // -----------------------------------------------------------------------------------
    // Cloning
    // -----------------------------------------------------------------------------------

    @Override
    public void afterClone() {
        super.afterClone();
        this.lastReplicateIndex = 0;
        this.family = getFamily();
        new RegularlyUpdateKnowledge(this).action();
        if (this.cloningContainerAddress != null) {
            String[] addresses = getAID().getAddressesArray();
            if (!addresses[addresses.length - 1].equals(this.cloningContainerAddress)) {
                AID remoteAMS = new AID("ams@" + this.cloningContainerName, AID.ISGUID);
                remoteAMS.addAddresses(this.cloningContainerAddress);

                migrate(remoteAMS, true);
            }
        }
        SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
        joinHarbour.addReceiver(myHarbour());
        skSendNormal(joinHarbour);

    }

    public void clone(AID harbour, String name) {
        String[] harbourAddresses = harbour.getAddressesArray();
        String mtp = harbourAddresses[harbourAddresses.length - 1];
        String[] nameSplitted = harbour.getName().split("@");
        String containerAddress = nameSplitted[1];
        String nameContainer = nameSplitted[0].split("_")[0];

        cloningContainerName = containerAddress;
        cloningContainerAddress = mtp;

        ContainerID cID = new ContainerID();
        cID.setName(nameContainer);
        cID.setAddress(cloningContainerAddress);
        this.doClone(cID, name);
    }

    @Override
    public void doClone(Location loc, String newName) {
        this.lastReplicateIndex += 1;
        String[] seperators = new String[] { ".", "_" };
        SimpleDateFormat sdf = (new SimpleDateFormat("yyyy-MM-dd" + seperators[rand.nextInt(2)] + "HH:mm:ss:SS"));

        String autogeneratedId = sdf.format(new Date(Instant.now().toEpochMilli()))
                + String.valueOf(rand.nextInt(100));
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(autogeneratedId.getBytes(StandardCharsets.UTF_8));
            String encoded = Base64.getEncoder().encodeToString(hash);
            newName = newName + "_" + encoded.substring(0, 5);
        } catch (NoSuchAlgorithmException e) {
            newName = newName + "_" + autogeneratedId;
        }
        super.doClone(loc, newName);
        SKAID a = new SKAID();
        a.update(loc.getAddress(), 0);
        a.setName(newName + "@" + this.myHarbour().getName().split("@")[1]);
        updatePositionFamily(a);
    }

    protected String getNextReplicateIndex() {
        return String.valueOf(this.lastReplicateIndex + 1);
    }

}