File Quorum.java¶
File List > algorithm > migration > Quorum.java
Go to the documentation of this file
package skydata.behaviour.algorithm.migration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.javatuples.Pair;
import org.javatuples.Quintet;
import jade.core.AID;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import skydata.internal.agents.SKAgent;
import skydata.internal.agents.SKD;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;
public class Quorum extends SKAgentBehaviour {
private SKD skd;
private DataAlgorithm d;
public Quorum(SKAgent agent) {
super(agent);
skd = (SKD) agent;
}
static private class DataAlgorithm implements Serializable {
@SuppressWarnings("unused")
AID wantedHarbour;
// Quorum detector
boolean haveQuorum;
Set<SKAID> quorum;
Set<SKAID> nextQuorum;
Set<SKAID> possibleMembers;
ArrayList<Set<SKAID>> allQuorums;
Set<SKAID> allOldMembers;
// Algorithm
HashMap<String, SKAID> future_locs;
HashMap<String, Integer> fam_rnds;
Set<SKAID> answers;
}
private void addTwice(SKAID receiver, SKLMessage msg) {
SKAID recentReceiver = receiver;
for (SKAID m : skd.getFamily()) {
if (m.getName().equals(receiver.getName())) {
recentReceiver = m;
break;
}
}
msg.addReceiver(recentReceiver);
if (d.future_locs.containsKey(receiver.getName())) {
msg.addReceiver(d.future_locs.get(receiver.getName()));
}
}
private void broadcastTwice(Set<SKAID> agents, SKLMessage msg) {
for (SKAID s : agents) {
this.addTwice(s, msg);
}
agent.skSendReliable(msg);
}
private void mergePositions(Set<SKAID> newPositions) {
for (SKAID a : newPositions) {
skd.updatePositionFamily(a);
}
}
private boolean isUpdate(SKAID member) {
Integer last_r = d.fam_rnds.getOrDefault(member.getName(), 0);
if (last_r <= member.getNbMigration())
return true;
return false;
}
private boolean migrationAccepted() {
for(Set<SKAID> quorum : d.allQuorums) {
if(d.answers.containsAll(quorum)) {
return true;
}
}
return false;
}
@Override
public void action() {
// Store the data of the algorithm
if (!agent.hasStorageObject("MIGRATION_QUORUM")) {
d = new DataAlgorithm();
d.quorum = new HashSet<>();
d.allQuorums = new ArrayList<>();
d.allOldMembers = new HashSet<>();
d.nextQuorum = new HashSet<>();
d.haveQuorum = false;
d.wantedHarbour = null;
d.future_locs = new HashMap<>();
d.fam_rnds = new HashMap<>();
d.answers = new HashSet<>();
agent.createStorageObject("MIGRATION_QUORUM", d);
}
d = (DataAlgorithm) skd.getStorageObject("MIGRATION_QUORUM");
// When the SKD wants to migrate
SKAgentBehaviour whenWantMigrate = new SKAgentBehaviour(agent) {
@Override
public void actionWithParameters(Object o) {
if (d.haveQuorum && d.wantedHarbour == null) {
AID wantedAid = (AID) o;
d.wantedHarbour = wantedAid;
d.answers.clear();
d.allOldMembers.clear();
d.allQuorums.clear();
d.allOldMembers.addAll(d.quorum);
Set<SKAID> copy = new HashSet<>();
copy.addAll(d.quorum);
d.allQuorums.add(copy);
skd.getRealSKAID().nextMigration();
SKLMessage request = new SKLMessage("MIGRATION_REQUEST", "MIGRATION");
Pair<Integer, String> data = Pair.with(skd.getSKAID().getNbMigration(),
d.wantedHarbour.getAddressesArray()[0]);
request.setContent(data);
broadcastTwice(d.allOldMembers, request);
}
}
};
skd.addInternalUpdate("WANT_MIGRATE", whenWantMigrate);
SKAgentBehaviour whenFamilyRemoved = new SKAgentBehaviour(agent) {
@Override
public void actionWithParameters(Object o) {
if (!d.haveQuorum || !skd.getFamily().containsAll(d.quorum)) {
if (d.possibleMembers == null) {
if (skd.getFamilySize() >= (Integer) skd.args.get("rg") / 2 + 1) {
d.possibleMembers = new HashSet<>();
d.possibleMembers.addAll(skd.getFamily());
} else {
return;
}
}
d.nextQuorum.clear();
SKLMessage message = new SKLMessage("QUORUM_ASK", "MIGRATION");
broadcastTwice(d.possibleMembers, message);
}
}
};
skd.addInternalUpdate("FAMILY_REMOVED", whenFamilyRemoved);
// When the migration is finished
SKAgentBehaviour afterMigration = new SKAgentBehaviour(agent) {
public void action() {
SKLMessage success = new SKLMessage("MIGRATION_END", "MIGRATION");
broadcastTwice(skd.getFamily(), success);
d.wantedHarbour = null;
}
};
skd.addInternalUpdate("AFTER_MIGRATION", afterMigration);
// Start the creation of a new quorum
skd.addBehaviour(new TickerBehaviour(skd, 20000) {
@Override
public void onTick() {
if (!d.haveQuorum || !skd.getFamily().containsAll(d.quorum)) {
if (d.possibleMembers == null) {
if (skd.getFamilySize() >= (Integer) skd.args.get("rg") / 2 + 1) {
d.possibleMembers = new HashSet<>();
d.possibleMembers.addAll(skd.getFamily());
} else {
return;
}
}
d.nextQuorum.clear();
SKLMessage message = new SKLMessage("QUORUM_ASK", "MIGRATION");
broadcastTwice(d.possibleMembers, message);
}
}
});
skd.addBehaviour(new TickerBehaviour(skd, 5000) {
public void onTick() {
if (d.wantedHarbour != null) {
SKLMessage request = new SKLMessage("MIGRATION_REQUEST", "MIGRATION");
Pair<Integer, String> new_data = Pair.with(skd.getSKAID().getNbMigration(),
d.wantedHarbour.getAddressesArray()[0]);
request.setContent(new_data);
broadcastTwice(d.quorum, request);
}
}
});
// I receive a request for a quorum
MessageTemplate mtQuorumRequest = MessageTemplate.and(
MessageTemplate.MatchOntology("QUORUM_ASK"),
MessageTemplate.and(
MessageTemplate.MatchPerformative(ACLMessage.INFORM),
MessageTemplate.MatchProtocol("MIGRATION")));
skd.addBehaviour(new CyclicBehaviour(skd) {
public void action() {
SKLMessage message = skd.skReceive(mtQuorumRequest);
if (message == null) {
block();
return;
}
SKLMessage answer = new SKLMessage("QUORUM_ANSWER", "MIGRATION");
SKAID sender = message.getSender();
addTwice(sender, answer);
agent.skSendReliable(answer);
}
});
// I receive an answer for a quorum
MessageTemplate mtQuorumAnswer = MessageTemplate.and(
MessageTemplate.MatchOntology("QUORUM_ANSWER"),
MessageTemplate.and(
MessageTemplate.MatchPerformative(ACLMessage.INFORM),
MessageTemplate.MatchProtocol("MIGRATION")));
skd.addBehaviour(new CyclicBehaviour(skd) {
public void action() {
SKLMessage message = skd.skReceive(mtQuorumAnswer);
if (message == null) {
block();
return;
}
SKAID sender = message.getSender();
d.nextQuorum.add(sender);
if (d.nextQuorum.size() > d.possibleMembers.size() / 2) {
d.quorum.clear();
d.quorum.addAll(d.nextQuorum);
d.allOldMembers.addAll(d.quorum);
Set<SKAID> copy = new HashSet<>();
copy.addAll(d.quorum);
d.allQuorums.add(copy);
d.haveQuorum = true;
}
}
});
MessageTemplate mtMigrationRequest = MessageTemplate.and(
MessageTemplate.MatchOntology("MIGRATION_REQUEST"),
MessageTemplate.and(
MessageTemplate.MatchPerformative(ACLMessage.INFORM),
MessageTemplate.MatchProtocol("MIGRATION")));
skd.addBehaviour(new CyclicBehaviour() {
public void action() {
SKLMessage message = skd.skReceive(mtMigrationRequest);
if (message == null) {
block();
return;
}
Pair<Integer, String> data = (Pair<Integer, String>) message.getContent();
SKAID sender = message.getSender();
if (isUpdate(sender)) {
d.fam_rnds.put(sender.getName(), data.getValue0());
SKAID copy = (SKAID) sender.clone();
copy.updateAddress(data.getValue1());
d.future_locs.put(message.getSender().getName(), copy);
SKLMessage answer = new SKLMessage("MIGRATION_ANSWER", "MIGRATION");
Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>> newData = Quintet
.with(data.getValue0(), data.getValue1(), d.fam_rnds, skd.getFamily(), d.future_locs);
answer.setContent(newData);
addTwice(sender, answer);
skd.skSendReliable(answer);
} else {
skd.print("Not update !");
}
}
});
MessageTemplate mtMigrationAnswer = MessageTemplate.and(
MessageTemplate.MatchOntology("MIGRATION_ANSWER"),
MessageTemplate.and(
MessageTemplate.MatchPerformative(ACLMessage.INFORM),
MessageTemplate.MatchProtocol("MIGRATION")));
skd.addBehaviour(new CyclicBehaviour(skd) {
public void action() {
SKLMessage message = skd.skReceive(mtMigrationAnswer);
if (message == null) {
block();
return;
}
Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>> data = (Quintet<Integer, String, HashMap<String, Integer>, Set<SKAID>, HashMap<String, SKAID>>) message
.getContent();
if (data.getValue0() == skd.getSKAID().getNbMigration()) {
skd.print(("Je reçoit une réponse"));
for (SKAID k : data.getValue3()) {
int last_r = d.fam_rnds.getOrDefault(k.getName(), 0);
int last_rj = data.getValue2().getOrDefault(k.getName(), 0);
SKAID last_futur_loc_j = data.getValue4().get(k.getName());
if (last_rj > 0 && last_r < last_rj) {
d.fam_rnds.put(k.getName(), last_rj);
skd.updatePositionFamily(k);
d.future_locs.put(k.getName(), last_futur_loc_j);
skd.print("J'apprend que " + k.getName() + "a bougé !");
}
}
d.answers.add(message.getSender());
if(migrationAccepted()) {
skd.migrate(d.wantedHarbour, true);
}
}
}
});
// I receive an update of migration
MessageTemplate mtMigrationUpdate = MessageTemplate.and(
MessageTemplate.MatchOntology("MIGRATION_UPDATE"),
MessageTemplate.and(
MessageTemplate.MatchPerformative(ACLMessage.INFORM),
MessageTemplate.MatchProtocol("MIGRATION")));
skd.addBehaviour(new CyclicBehaviour(skd) {
public void action() {
SKLMessage message = skd.skReceive(mtMigrationUpdate);
if (message == null) {
block();
return;
}
SKAID sender = message.getSender();
int rnd = sender.getNbMigration();
int last_r = d.fam_rnds.getOrDefault(message.getSender().getName(), 0);
if (last_r < rnd) {
d.fam_rnds.put(message.getSender().getName(), last_r);
skd.updatePositionFamily(message.getSender());
}
}
});
}
}