diff --git a/llama-dist/src/main/conf/llama-site.xml b/llama-dist/src/main/conf/llama-site.xml
old mode 100644
new mode 100755
index fa230629..1aefad0b
--- a/llama-dist/src/main/conf/llama-site.xml
+++ b/llama-dist/src/main/conf/llama-site.xml
@@ -425,7 +425,7 @@
Per queue setting that indicates if Llama should cache allocated resources
on release for the #QUEUE# queue. If not set, the
- 'llama.am.caching.enabled' is used.
+ 'llama.am.cache.enabled' is used.
@@ -472,6 +472,14 @@
before the cache.
+
+ llama.am.resource.normalizing.enabled.#QUEUE#
+ true
+
+ Per queue setting that indicates whether to break resource requests into smaller requests of standard size
+ before the cache.
+
+
llama.am.resource.normalizing.standard.mbs
1024
diff --git a/llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java b/llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java
old mode 100644
new mode 100755
index c88cfae6..08d8542f
--- a/llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java
+++ b/llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java
@@ -95,7 +95,7 @@ public abstract class LlamaAM {
public static final long GANG_ANTI_DEADLOCK_BACKOFF_MAX_DELAY_DEFAULT = 30000;
public static final String CACHING_ENABLED_KEY =
- PREFIX_KEY + "caching.enabled";
+ PREFIX_KEY + "cache.enabled";
public static final boolean CACHING_ENABLED_DEFAULT = true;
public static final String THROTTLING_ENABLED_KEY =
diff --git a/llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java b/llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java
old mode 100644
new mode 100755
index a07f717a..0e48f8af
--- a/llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java
+++ b/llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java
@@ -122,6 +122,8 @@ private RMConnector createRMConnector() {
NORMALIZING_ENABLED_DEFAULT);
caching = getConf().getBoolean(
CACHING_ENABLED_KEY + "." + queue, caching);
+ normalizing = getConf().getBoolean(
+ NORMALIZING_ENABLED_KEY + "." + queue, normalizing);
LOG.info("Caching for queue '{}' enabled '{}'", queue,
caching);
if (caching && normalizing) {
diff --git a/llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java b/llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java
old mode 100644
new mode 100755
index 723d70f6..9a9c7977
--- a/llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java
+++ b/llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java
@@ -573,6 +573,11 @@ private void _reserve(Collection resources)
resource.getRmData().put("request", request);
resource.getRmData().put(YARN_RM_CONNECTOR_KEY, this);
+
+ /*Keeping resources which relax locality in the separate map to handle them when possible*/
+ if(resource.getLocalityAsk()!= com.cloudera.llama.am.api.Resource.Locality.MUST) {
+ anyLocationResourceIdToRequestMap.put(resource.getResourceId(), request);
+ }
}
}
@@ -660,6 +665,9 @@ public void emptyCache() throws LlamaException {
ConcurrentHashMap containerToResourceMap =
new ConcurrentHashMap();
+ ConcurrentHashMap anyLocationResourceIdToRequestMap =
+ new ConcurrentHashMap();
+
@Override
public void onContainersCompleted(List containerStatuses) {
List changes = new ArrayList();
@@ -768,10 +776,27 @@ private RMEvent createResourceAllocation(RMResource resources,
resources.getRmData());
}
+ private void handleContainerMatchingRequest(Container container, LlamaContainerRequest req, List changes) {
+ RMResource resource = req.getResourceAsk();
+
+ LOG.debug("New allocation for '{}' container '{}', node '{}'",
+ resource, container.getId(), container.getNodeId());
+
+ resource.getRmData().put("container", container);
+ containerToResourceMap.put(container.getId(),
+ resource.getResourceId());
+ changes.add(createResourceAllocation(resource, container));
+ amRmClientAsync.removeContainerRequest(req);
+ LOG.trace("Reservation resource '{}' removed from YARN", resource);
+
+ queue(new ContainerHandler(ugi, resource, container, Action.START));
+ }
+
@Override
public void onContainersAllocated(List containers) {
List changes = new ArrayList();
// no need to use a ugi.doAs() as this is called from within Yarn client
+ List unclaimedContainers = new ArrayList();
for (Container container : containers) {
List extends Collection> matchingContainerReqs =
amRmClientAsync.getMatchingRequests(container.getPriority(),
@@ -793,23 +818,36 @@ public void onContainersAllocated(List containers) {
LOG.error("There was a match for container '{}', " +
"LlamaContainerRequest cannot be NULL", container);
} else {
- RMResource resource = req.getResourceAsk();
-
- LOG.debug("New allocation for '{}' container '{}', node '{}'",
- resource, container.getId(), container.getNodeId());
-
- resource.getRmData().put("container", container);
- containerToResourceMap.put(container.getId(),
- resource.getResourceId());
- changes.add(createResourceAllocation(resource, container));
- amRmClientAsync.removeContainerRequest(req);
- LOG.trace("Reservation resource '{}' removed from YARN", resource);
-
- queue(new ContainerHandler(ugi, resource, container, Action.START));
+ handleContainerMatchingRequest(container, req, changes);
+ /*Remove the granted request from anyLocationResourceIdToRequestMap if it is there*/
+ anyLocationResourceIdToRequestMap.remove(req.getResourceAsk().getResourceId());
}
} else {
- LOG.error("No matching request for {}. Releasing the container.",
+ LOG.debug("No strong request match for {}. Adding to the list of unclaimed containers.",
container);
+ unclaimedContainers.add(container);
+ }
+ }
+ /*Matching YARN resources against requests relaxing locality*/
+ for (Container container : unclaimedContainers) {
+ /*Looking for requests with 'DONT_CARE' or 'PREFERRED' locality which match with the resources we've got*/
+ boolean containerIsClaimed = false;
+ Iterator> iterator = anyLocationResourceIdToRequestMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ LlamaContainerRequest request = entry.getValue();
+ /*Matching by the capacity only*/
+ if(request.getResourceAsk().getCpuVCoresAsk() == container.getResource().getVirtualCores() &&
+ request.getResourceAsk().getMemoryMbsAsk() == container.getResource().getMemory()) {
+ handleContainerMatchingRequest(container, request, changes);
+ iterator.remove();
+ containerIsClaimed = true;
+ break;
+ }
+ }
+ if(!containerIsClaimed) {
+ LOG.error("No matching request for {}. Releasing the container.",
+ container);
containerToResourceMap.remove(container.getId());
amRmClientAsync.releaseAssignedContainer(container.getId());
}