Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion llama-dist/src/main/conf/llama-site.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@
<description>
Per queue setting that indicates if Llama should cache allocated resources
on release for the #QUEUE# queue. If not set, the
'llama.am.caching.enabled' is used.
'llama.am.cache.enabled' is used.
</description>
</property>
<property>
Expand Down Expand Up @@ -472,6 +472,14 @@
before the cache.
</description>
</property>
<property>
<name>llama.am.resource.normalizing.enabled.#QUEUE#</name>
<value>true</value>
<description>
Per queue setting that indicates whether to break resource requests into smaller requests of standard size
before the cache.
</description>
</property>
<property>
<name>llama.am.resource.normalizing.standard.mbs</name>
<value>1024</value>
Expand Down
2 changes: 1 addition & 1 deletion llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public abstract class LlamaAM {
public static final long GANG_ANTI_DEADLOCK_BACKOFF_MAX_DELAY_DEFAULT = 30000;

public static final String CACHING_ENABLED_KEY =
PREFIX_KEY + "caching.enabled";
PREFIX_KEY + "cache.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this. Can we also update the reference to "caching.enabled" in llama-site.xml as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public static final boolean CACHING_ENABLED_DEFAULT = true;

public static final String THROTTLING_ENABLED_KEY =
Expand Down
2 changes: 2 additions & 0 deletions llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private RMConnector createRMConnector() {
NORMALIZING_ENABLED_DEFAULT);
caching = getConf().getBoolean(
CACHING_ENABLED_KEY + "." + queue, caching);
normalizing = getConf().getBoolean(
NORMALIZING_ENABLED_KEY + "." + queue, normalizing);
LOG.info("Caching for queue '{}' enabled '{}'", queue,
caching);
if (caching && normalizing) {
Expand Down
66 changes: 52 additions & 14 deletions llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ private void _reserve(Collection<RMResource> resources)
resource.getRmData().put("request", request);

resource.getRmData().put(YARN_RM_CONNECTOR_KEY, this);

/*Keeping resources which relax locality in the separate map to handle them when possible*/
if(resource.getLocalityAsk()!= com.cloudera.llama.am.api.Resource.Locality.MUST) {
anyLocationResourceIdToRequestMap.put(resource.getResourceId(), request);
}
}
}

Expand Down Expand Up @@ -660,6 +665,9 @@ public void emptyCache() throws LlamaException {
ConcurrentHashMap<ContainerId, UUID> containerToResourceMap =
new ConcurrentHashMap<ContainerId, UUID>();

ConcurrentHashMap<UUID, LlamaContainerRequest> anyLocationResourceIdToRequestMap =
new ConcurrentHashMap<UUID, LlamaContainerRequest>();

@Override
public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
List<RMEvent> changes = new ArrayList<RMEvent>();
Expand Down Expand Up @@ -768,10 +776,27 @@ private RMEvent createResourceAllocation(RMResource resources,
resources.getRmData());
}

private void handleContainerMatchingRequest(Container container, LlamaContainerRequest req, List<RMEvent> changes) {
RMResource resource = req.getResourceAsk();

LOG.debug("New allocation for '{}' container '{}', node '{}'",
resource, container.getId(), container.getNodeId());

resource.getRmData().put("container", container);
containerToResourceMap.put(container.getId(),
resource.getResourceId());
changes.add(createResourceAllocation(resource, container));
amRmClientAsync.removeContainerRequest(req);
LOG.trace("Reservation resource '{}' removed from YARN", resource);

queue(new ContainerHandler(ugi, resource, container, Action.START));
}

@Override
public void onContainersAllocated(List<Container> containers) {
List<RMEvent> changes = new ArrayList<RMEvent>();
// no need to use a ugi.doAs() as this is called from within Yarn client
List<Container> unclaimedContainers = new ArrayList<Container>();
for (Container container : containers) {
List<? extends Collection<LlamaContainerRequest>> matchingContainerReqs =
amRmClientAsync.getMatchingRequests(container.getPriority(),
Expand All @@ -793,23 +818,36 @@ public void onContainersAllocated(List<Container> containers) {
LOG.error("There was a match for container '{}', " +
"LlamaContainerRequest cannot be NULL", container);
} else {
RMResource resource = req.getResourceAsk();

LOG.debug("New allocation for '{}' container '{}', node '{}'",
resource, container.getId(), container.getNodeId());

resource.getRmData().put("container", container);
containerToResourceMap.put(container.getId(),
resource.getResourceId());
changes.add(createResourceAllocation(resource, container));
amRmClientAsync.removeContainerRequest(req);
LOG.trace("Reservation resource '{}' removed from YARN", resource);

queue(new ContainerHandler(ugi, resource, container, Action.START));
handleContainerMatchingRequest(container, req, changes);
/*Remove the granted request from anyLocationResourceIdToRequestMap if it is there*/
anyLocationResourceIdToRequestMap.remove(req.getResourceAsk().getResourceId());
}
} else {
LOG.error("No matching request for {}. Releasing the container.",
LOG.debug("No strong request match for {}. Adding to the list of unclaimed containers.",
container);
unclaimedContainers.add(container);
}
}
/*Matching YARN resources against requests relaxing locality*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interest of readability, would it make sense to do this in a separate private method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After moving the duplicate code into the separate private method, onContainersAllocated method looks quite concise and readable. Please let me know if you still would like me to move weak match logic into separate method also.

for (Container container : unclaimedContainers) {
/*Looking for requests with 'DONT_CARE' or 'PREFERRED' locality which match with the resources we've got*/
boolean containerIsClaimed = false;
Iterator<Map.Entry<UUID, LlamaContainerRequest>> iterator = anyLocationResourceIdToRequestMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<UUID, LlamaContainerRequest> entry = iterator.next();
LlamaContainerRequest request = entry.getValue();
/*Matching by the capacity only*/
if(request.getResourceAsk().getCpuVCoresAsk() == container.getResource().getVirtualCores() &&
request.getResourceAsk().getMemoryMbsAsk() == container.getResource().getMemory()) {
handleContainerMatchingRequest(container, request, changes);
iterator.remove();
containerIsClaimed = true;
break;
}
}
if(!containerIsClaimed) {
LOG.error("No matching request for {}. Releasing the container.",
container);
containerToResourceMap.remove(container.getId());
amRmClientAsync.releaseAssignedContainer(container.getId());
}
Expand Down