Skip to content

File SKHM.java

File List > agents > SKHM.java

Go to the documentation of this file

package skydata.internal.agents;

import jade.core.AID;
import jade.wrapper.AgentContainer;
import jade.wrapper.AgentController;
import java.util.Map;
import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.HashSet;

import jade.core.Runtime;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import jade.lang.acl.ACLMessage;
import java.util.Set;
import skydata.centralized.CentralizedClient;
import skydata.internal.message.NBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

public class SKHM extends SKAgent {

    private Map<String, Object> harbour = new HashMap<>();

    private int capacity = 0;

    private int available;

    CentralizedClient centralizedClient = new CentralizedClient();

    Set<SKAID> agents = new HashSet<SKAID>();

    public SKHM() {
        super();
    }

    @SuppressWarnings({ "unchecked", "unused" })
    @Override
    protected void setup() {
        super.setup();
        String harbourName = (String) args.get("name");
        String hostAddress = (String) args.get("address");
        String params = (String) args.get("params");
        Object[] platformParams = (Object[]) args.get("platform");
        Runtime rt = Runtime.instance();
        try {
            ObjectMapper mapper = new ObjectMapper();
            this.harbour = mapper.readValue(params, new TypeReference<Map<String, Object>>() {
            });
            this.capacity = (Integer) this.harbour.get("capacity");
            this.available = capacity;

            centralizedClient.addTuple(getName(), getAID().getAddressesArray()[0]);
            if (((Integer) platformParams[3]) > 0) {
                doWait((Integer) platformParams[3]);
            }

            Set<SKAID> allHarbours = centralizedClient.getTuples();
            AgentContainer agc = getContainerController();

            Map<String, Object> agents = (Map<String, Object>) this.harbour.get("agents");
            if (agents != null) {
                Set<String> skdKeys = agents.keySet();
                for (String skdKey : skdKeys) {
                    Map<String, Object> agentInfos = (Map<String, Object>) agents.get(skdKey);
                    Object[] args = new Object[] { agentInfos, allHarbours };
                    AgentController agent = agc.createNewAgent(skdKey,
                            "skydata.internal.agents." + agentInfos.get("class"),
                            args);
                    this.available -= (Integer) agentInfos.get("size");
                    agent.start();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        setupManagement("HARBOUR", ACLMessage.INFORM);
        setupManagement("REPLICATION", ACLMessage.REQUEST);
        setupStats();
        setupMembership();
    }

    private void lock(int size, AID agent) {
        available -= size;
    }

    private void setupManagement(String protocol, int performative) {
        SKHM agent = this;
        MessageTemplate mtLock = MessageTemplate.and(
                MessageTemplate.MatchOntology("LOCK_REQUEST"),
                MessageTemplate.and(
                        MessageTemplate.MatchProtocol(protocol),
                        MessageTemplate.MatchPerformative(performative)));
        this.addBehaviour(new CyclicBehaviour(this) {
            @SuppressWarnings("unused")
            @Override
            public void action() {
                SKLMessage acl = agent.skReceive(mtLock);
                if (acl == null) {
                    block();
                    return;
                }
                SKLMessage answer = new SKLMessage("LOCK_RESPONSE", protocol);
                answer.addReceiver(acl.getSender());
                agent.print("lock request");
                int m = (int) acl.getContent();
                if (m <= available) {
                    lock(m, acl.getSender().toAID());
                    answer.setContent(true);
                } else {
                    answer.setContent(false);
                }
                skSendNormal(answer);
            }
        });
    }

    @Override
    protected void setupStats() {
        SKHM agent = this;
        MessageTemplate mtRequest = MessageTemplate.and(
                MessageTemplate.MatchOntology("STATS_REQUEST"),
                MessageTemplate.and(
                        MessageTemplate.MatchProtocol("HARBOUR"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        this.addBehaviour(new CyclicBehaviour(this) {
            @Override
            @SuppressWarnings("all")
            public void action() {
                SKLMessage acl = agent.skReceive(mtRequest);
                if (acl == null) {
                    block();
                    return;
                }
                SKLMessage answer = new SKLMessage("STATS_RESPONSE", "HARBOUR");
                String[] valuesName = (String[]) acl.getContent();
                HashMap<String, Object> values = new HashMap<String, Object>();
                for (String valueName : valuesName) {
                    Object value;
                    switch (valueName) {
                        case "available":
                            value = available;
                            break;
                        case "capacity":
                            value = capacity;
                            break;
                        default:
                            value = null;
                            break;
                    }
                    values.put(valueName, value);
                }
                answer.addReceiver(acl.getSender());
                answer.setContent((Serializable) values);
                skSendNormal(answer);
            }
        });
    }

    private void broadcastMembers() {
        SKLMessage msg = new SKLMessage("UPDATE_LIST", "HARBOUR");
        msg.setContent((Serializable) agents);
        this.broadcast(new NBroadcast(), msg, agents);

    }

    private void setupMembership() {
        SKHM agent = this;
        MessageTemplate mtQuit = MessageTemplate.and(
                MessageTemplate.MatchOntology("QUIT_HARBOUR"),
                MessageTemplate.and(
                        MessageTemplate.MatchProtocol("HARBOUR"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        this.addBehaviour(new CyclicBehaviour(this) {
            @Override
            public void action() {
                SKLMessage acl = agent.skReceive(mtQuit);
                if (acl == null) {
                    block();
                    return;
                }
                int size = (Integer) acl.getContent();
                available += size;
                agents.remove(acl.getSender());
                broadcastMembers();

            }
        });

        MessageTemplate mtJoin = MessageTemplate.and(
                MessageTemplate.MatchOntology("JOIN_HARBOUR"),
                MessageTemplate.and(
                        MessageTemplate.MatchProtocol("HARBOUR"),
                        MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
        this.addBehaviour(new CyclicBehaviour(this) {
            @Override
            public void action() {
                SKLMessage acl = agent.skReceive(mtJoin);
                if (acl == null) {
                    block();
                    return;
                }
                agents.add(acl.getSender());
                broadcastMembers();

            }
        });

    }

}