Skip to content

File SKAgent.java

File List > agents > SKAgent.java

Go to the documentation of this file

package skydata.internal.agents;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import jade.core.AID;
import jade.core.Agent;
import jade.core.ContainerID;
import jade.core.Location;
import jade.core.PlatformID;
import jade.core.behaviours.Behaviour;
import jade.core.behaviours.WakerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import skydata.internal.behaviours.CleanMessagesBuffer;
import skydata.internal.behaviours.ReceiveOnReliableSend;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.behaviours.UpdateStatsHarbour;
import skydata.internal.message.BroadcastI;
import skydata.internal.message.IDGenerator;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

public class SKAgent extends Agent {

    public long startingDate = 0;

    public Map<String, Object> args = null;

    private HashMap<String, Serializable> behaviourStorage = new HashMap<>();

    private HashMap<String, List<SKAgentBehaviour>> internalBehaviour = new HashMap<>();

    private ArrayList<Behaviour> myBehaviours = new ArrayList<>();

    public Boolean inDebug = false;

    protected SKAID myAID;

    private HashMap<String, HashMap<String, SKLMessage>> msgInfoBF = new HashMap<>();

    private HashMap<String, HashMap<String, Long>> receivedMsgs = new HashMap<>();

    protected Boolean inMigration;

    private SKAID oldHarbour;

    // if the agent cannot migrate, this number is greater than 0
    protected int nbBlockingMigrate = 0;

    // Stats on the harbours

    protected Set<SKAID> harbours;

    protected HashMap<SKAID, HashMap<String, Comparable>> harboursStats = new HashMap<SKAID, HashMap<String, Comparable>>();

    protected Set<SKAID> membersHarbour = new HashSet<SKAID>();

    public int size;

    public long getStartingDate() {
        return this.startingDate;
    }

    final public SKAID getSKAID() {
        return (SKAID) myAID.clone();
    }

    final public SKAID getRealSKAID() {
        return myAID;
    }

    public void setMsgInfoBF(HashMap<String, HashMap<String, SKLMessage>> inf) {
        this.msgInfoBF = inf;
    }

    public HashMap<String, HashMap<String, SKLMessage>> getMsgInfoBF() {
        return this.msgInfoBF;
    }

    public HashMap<String, HashMap<String, Long>> getReceivedMsgs() {
        return this.receivedMsgs;
    }

    final public void createStorageObject(String property, Serializable value) {
        behaviourStorage.put(property, value);
    }

    final public Serializable getStorageObject(String property) {
        return behaviourStorage.get(property);
    }

    final public boolean hasStorageObject(String property) {
        return behaviourStorage.containsKey(property);
    }

    @SuppressWarnings("unchecked")
    @Override
    protected void setup() {
        myAID = new SKAID();
        myAID.updateMigration(0);
        startingDate = Instant.now().toEpochMilli();
        this.args = (Map<String, Object>) getArguments()[0];
        this.harbours = (Set<SKAID>) getArguments()[1];
        SKAID harbour = myHarbour();
        if (harbour != null) {
            SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
            joinHarbour.addReceiver(myHarbour());
            skSendNormal(joinHarbour);
        }
        reset();
        this.internalUpdate("AFTER_REPLICATION");
    }

    protected void reset() {
        this.msgInfoBF.clear();
        this.startingDate = Instant.now().toEpochMilli();
        String[] address = getAID().getAddressesArray();
        myAID.setName(getName());
        myAID.updateAddress(address[address.length - 1]);

        this.nbBlockingMigrate = 0;
        this.size = (Integer) this.args.getOrDefault("size", 0);
        if (this.args.get("actions") != null) {
            String[] actions = ((String) this.args.get("actions")).split(",");
            for (String action : actions) {
                run(action.trim());
            }
        }

        new ReceiveOnReliableSend(this).action();
        new CleanMessagesBuffer(this).action();
    }

    final protected void internalUpdate(String type, Object o) {
        if (this.internalBehaviour.containsKey(type)) {
            for (SKAgentBehaviour b : this.internalBehaviour.get(type)) {
                if (o == null) {
                    b.action();
                } else {
                    b.actionWithParameters(o);
                }
            }
        }
    }

    final protected void internalUpdate(String type) {
        this.internalUpdate(type, null);
    }

    final public void addInternalUpdate(String type, SKAgentBehaviour b) {
        if (!this.internalBehaviour.containsKey(type)) {
            this.internalBehaviour.put(type, new ArrayList<>());
        }
        this.internalBehaviour.get(type).add(b);
    }

    // -----------------------------------------------------------------------------------
    // Migration
    // -----------------------------------------------------------------------------------

    @Override
    public void afterMove() {
        super.afterMove();
        if (inMigration) {
            String[] address = getAID().getAddressesArray();
            myAID.update(address[address.length - 1]);
            ContainerID cID = new ContainerID();
            cID.setName("Harbour");
            cID.setAddress("localhost");
            inMigration = false;
            this.doMove(cID);
        } else {
            takeDown();

            if (oldHarbour != null) {
                SKLMessage quit = new SKLMessage("QUIT_HARBOUR", "HARBOUR");
                quit.addReceiver(oldHarbour);
                quit.setContent(size);
                skSendNormal(quit);
                oldHarbour = null;
            }

            SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
            joinHarbour.addReceiver(myHarbour());
            skSendNormal(joinHarbour);
            this.internalUpdate("AFTER_MIGRATION");

        }
    }

    @Override
    protected void beforeMove() {
        super.beforeMove();
        oldHarbour = myHarbour();
    }

    public void migrate(AID harbour) {
        if (canMigrate()) {
            this.migrate(harbour, false);
        } else {
            this.print("I cannot migrate");
        }
    }

    public void migrate(AID harbour, boolean forced) {
        if (forced) {
            String[] harbourAddresses = harbour.getAddressesArray();
            String mtp = harbourAddresses[harbourAddresses.length - 1];
            String[] nameSplitted = harbour.getName().split("@");
            String containerAddress = nameSplitted[1];

            String[] addresses = getAID().getAddressesArray();
            if (addresses[addresses.length - 1].equals(mtp)) {
                debug("migration local");
                this.inMigration = false;
                this.takeDown();
                ContainerID cID = new ContainerID();
                cID.setName(nameSplitted[0].split("_")[0]);
                cID.setAddress(mtp);
                this.doMove(cID);
            } else {
                debug("migration distante");
                this.inMigration = true;
                AID remoteAMS = new AID("ams@" + containerAddress, AID.ISGUID);
                remoteAMS.addAddresses(mtp);
                PlatformID destination = new PlatformID(remoteAMS);
                this.doMove(destination);
            }
        } else if (canMigrate()) {
            this.internalUpdate("WANT_MIGRATE", harbour);
        } else {
            this.print("I cannot migrate");
        }
    }

    @Override
    public void doMove(Location loc) {
        super.doMove(loc);
        membersHarbour.clear();
    }

    @Override
    public void afterClone() {
        super.afterClone();
        for (Behaviour b : myBehaviours) {
            removeBehaviour(b);
        }
        internalBehaviour.clear();
        myBehaviours.clear();
        this.reset();
        this.internalUpdate("AFTER_REPLICATION");
    }

    @Override
    public void doClone(Location loc, String newName) {
        super.doClone(loc, newName);
    }

    protected void run(String action) {
        try {
            Class<?> dynamicClass = Class.forName("skydata.behaviour." + action);
            Constructor<?> constructor = dynamicClass.getConstructor(SKAgent.class);
            Object instance = constructor.newInstance(this);
            Method method = dynamicClass.getMethod("action");
            method.invoke(instance);

        } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
                | InvocationTargetException e) {
            e.printStackTrace();
        }
    }

    public void debug(String s) {
        if (inDebug)
            System.out.println("[" + getName() + "][" + Instant.now().toEpochMilli() + "]" + s);
    }

    public void print(String s) {
        SKAID my = this.myHarbour();
        String nameHarbour = my != null ? my.getName() : "Unknown";
        System.out.println(
                "[" + getName() + "][" + Instant.now().toEpochMilli() + "][" + nameHarbour + "]" + s);
    }

    public void broadcast(BroadcastI broadCaster, SKLMessage message, Set<SKAID> agents) {
        broadCaster.broadcast(this, message, agents);
    }

    public void skSendReliable(SKLMessage msg) {
        msg.setSender(this.getSKAID());
        String msg_id = msg.getMessageId();
        if (msg_id == null) {
            msg.setMessageId(IDGenerator.generateID());
        }
        if (msg.getNbSent() >= 1) {
            msg.setArrivalDate(Instant.now().toEpochMilli());
            // Bufferize the message for all receivers
            for (SKAID r : msg.getReceivers()) {
                HashMap<String, SKLMessage> allMsgs;
                if (!msgInfoBF.containsKey(r.getName())) {
                    allMsgs = new HashMap<>();
                    msgInfoBF.put(r.getName(), allMsgs);
                } else {
                    allMsgs = msgInfoBF.get(r.getName());
                }
                allMsgs.put(msg_id, msg);
            }
            ACLMessage aclMsg = msg.toJade();
            this.send(aclMsg);

        }
    }

    public void skSendNormal(SKLMessage msg) {
        msg.setSender(this.getSKAID());
        ACLMessage aclMsg = msg.toJade();
        this.send(aclMsg);
    }

    public SKLMessage skReceive(MessageTemplate mt) {
        ACLMessage acl = this.receive(mt);
        if (acl != null) {
            try {
                SKLMessage skl = (SKLMessage) acl.getContentObject();
                // Retransmit all message that is not ACK by the sender
                SKAID sender = skl.getSender();
                if (msgInfoBF.containsKey(sender.getName())) {
                    for (SKLMessage msg : msgInfoBF.get(sender.getName()).values()) {
                        SKAID r = msg.getReceiver(sender.getName());
                        if (r.getNbMigration() < sender.getNbMigration()
                                && !r.getAddress().equals(sender.getAddress())) {
                            r.update(sender.getAddress(), sender.getNbMigration());
                            msg.setNbSent(msg.getNbSent() + 1);
                            msg.setSender(this.getSKAID());
                            send(msg.toJade(sender));
                        }
                    }
                }

                // Manage sending ACK
                if (skl.getMessageId() != null) {
                    SKLMessage reply = new SKLMessage("ACK", "RELIABLE_SEND");
                    reply.setContent(skl.getMessageId());
                    reply.addReceiver(sender);
                    skSendNormal(reply);

                    // Bufferize message received
                    if (!receivedMsgs.containsKey(skl.getSender().getName())) {
                        receivedMsgs.put(skl.getSender().getName(), new HashMap<>());
                    }
                    HashMap<String, Long> receivedFromsender = receivedMsgs.get(skl.getSender().getName());
                    if (!receivedFromsender.containsKey(skl.getMessageId())) {
                        receivedFromsender.put(skl.getMessageId(), Instant.now().toEpochMilli());
                    } else {
                        return this.skReceive(mt);
                    }
                }

                return skl;
            } catch (UnreadableException e) {
                e.printStackTrace();
            }
        }
        return null;
    }

    public Set<SKAID> connectedHarbours() {
        return harbours;
    }

    @SuppressWarnings("rawtypes")
    public HashMap<SKAID, HashMap<String, Comparable>> getHarboursStats() {
        return harboursStats;
    }

    public SKAID myHarbour() {
        try {
            String address = this.getSKAID().getAddress();
            for (SKAID s : connectedHarbours()) {
                if (s.getAddress().equals(address)) {
                    return s;
                }
            }
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

    }

    protected void setupStats() {
        new UpdateStatsHarbour(this).action();
    }

    public boolean canMigrate() {
        return nbBlockingMigrate == 0;
    }

    public void blockMigrate() {
        nbBlockingMigrate++;
    }

    public void unblockMigrate() {
        if (nbBlockingMigrate > 0)
            nbBlockingMigrate--;
    }

    // -----------------------------------------------------------------------------------
    // Deletion
    // -----------------------------------------------------------------------------------

    public void delete() {
        SKLMessage quit = new SKLMessage("QUIT_HARBOUR", "HARBOUR");
        quit.addReceiver(myHarbour());
        quit.setContent(size);
        skSendNormal(quit);
        doDelete();
        print("deletion");
        this.internalUpdate("DELETION");
    }

    // -----------------------------------------------------------------------------------
    // Manage Behaviour
    // -----------------------------------------------------------------------------------

    @Override
    public void addBehaviour(Behaviour wakerBehaviour) {
        super.addBehaviour(wakerBehaviour);
        myBehaviours.add(wakerBehaviour);
    }

}