Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions llama-dist/src/main/conf/llama-site.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,14 @@
before the cache.
</description>
</property>
<property>
<name>llama.am.resource.normalizing.enabled.#QUEUE#</name>
<value>true</value>
<description>
Per queue setting that indicates whether to break resource requests into smaller requests of standard size
before the cache.
</description>
</property>
<property>
<name>llama.am.resource.normalizing.standard.mbs</name>
<value>1024</value>
Expand Down
2 changes: 1 addition & 1 deletion llama/src/main/java/com/cloudera/llama/am/api/LlamaAM.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public abstract class LlamaAM {
public static final long GANG_ANTI_DEADLOCK_BACKOFF_MAX_DELAY_DEFAULT = 30000;

public static final String CACHING_ENABLED_KEY =
PREFIX_KEY + "caching.enabled";
PREFIX_KEY + "cache.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this. Can we also update the reference to "caching.enabled" in llama-site.xml as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public static final boolean CACHING_ENABLED_DEFAULT = true;

public static final String THROTTLING_ENABLED_KEY =
Expand Down
2 changes: 2 additions & 0 deletions llama/src/main/java/com/cloudera/llama/am/impl/SingleQueueLlamaAM.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private RMConnector createRMConnector() {
NORMALIZING_ENABLED_DEFAULT);
caching = getConf().getBoolean(
CACHING_ENABLED_KEY + "." + queue, caching);
normalizing = getConf().getBoolean(
NORMALIZING_ENABLED_KEY + "." + queue, normalizing);
LOG.info("Caching for queue '{}' enabled '{}'", queue,
caching);
if (caching && normalizing) {
Expand Down
52 changes: 51 additions & 1 deletion llama/src/main/java/com/cloudera/llama/am/yarn/YarnRMConnector.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ private void _reserve(Collection<RMResource> resources)
resource.getRmData().put("request", request);

resource.getRmData().put(YARN_RM_CONNECTOR_KEY, this);

/*Keeping resources which relax locality in the separate map to handle them when possible*/
if(resource.getLocalityAsk()!= com.cloudera.llama.am.api.Resource.Locality.MUST) {
anyLocationResourceIdToRequestMap.put(resource.getResourceId(), request);
}
}
}

Expand Down Expand Up @@ -660,6 +665,9 @@ public void emptyCache() throws LlamaException {
ConcurrentHashMap<ContainerId, UUID> containerToResourceMap =
new ConcurrentHashMap<ContainerId, UUID>();

ConcurrentHashMap<UUID, LlamaContainerRequest> anyLocationResourceIdToRequestMap =
new ConcurrentHashMap<UUID, LlamaContainerRequest>();

@Override
public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
List<RMEvent> changes = new ArrayList<RMEvent>();
Expand Down Expand Up @@ -772,6 +780,7 @@ private RMEvent createResourceAllocation(RMResource resources,
public void onContainersAllocated(List<Container> containers) {
List<RMEvent> changes = new ArrayList<RMEvent>();
// no need to use a ugi.doAs() as this is called from within Yarn client
List<Container> unclaimedContainers = new ArrayList<Container>();
for (Container container : containers) {
List<? extends Collection<LlamaContainerRequest>> matchingContainerReqs =
amRmClientAsync.getMatchingRequests(container.getPriority(),
Expand Down Expand Up @@ -806,10 +815,51 @@ public void onContainersAllocated(List<Container> containers) {
LOG.trace("Reservation resource '{}' removed from YARN", resource);

queue(new ContainerHandler(ugi, resource, container, Action.START));

/*Remove the granted request from anyLocationResourceIdToRequestMap if it is there*/
anyLocationResourceIdToRequestMap.remove(resource.getResourceId());
}
} else {
LOG.error("No matching request for {}. Releasing the container.",
LOG.debug("No strong request match for {}. Adding to the list of unclaimed containers.",
container);
unclaimedContainers.add(container);
}
}
/*Matching YARN resources against requests relaxing locality*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interest of readability, would it make sense to do this in a separate private method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After moving the duplicate code into the separate private method, onContainersAllocated method looks quite concise and readable. Please let me know if you still would like me to move weak match logic into separate method also.

/*Doing this in the separate loop as strong match should be preferred */
for (Container container : unclaimedContainers) {
/*Find pending request that relax locality which can get use of unclaimed containers*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't follow the comment fully. Rephrase?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

boolean containerIsClaimed = false;
Iterator<Map.Entry<UUID, LlamaContainerRequest>> iterator = anyLocationResourceIdToRequestMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<UUID, LlamaContainerRequest> entry = iterator.next();
LlamaContainerRequest request = entry.getValue();
/*Matching by the capacity only*/
if(request.getResourceAsk().getCpuVCoresAsk() == container.getResource().getVirtualCores() &&
request.getResourceAsk().getMemoryMbsAsk() == container.getResource().getMemory()) {

RMResource resource = request.getResourceAsk();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicated from the case where there is a strong match. Move it to a private helper method and reuse at both places?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


LOG.debug("New allocation for '{}' container '{}', node '{}'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three {} for two variables?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, there are actually 3 variables: resource, container.getId(), container.getNodeId(). Am I missing the point of the comment?

resource, container.getId(), container.getNodeId());

resource.getRmData().put("container", container);
containerToResourceMap.put(container.getId(),
resource.getResourceId());
changes.add(createResourceAllocation(resource, container));
amRmClientAsync.removeContainerRequest(request);
LOG.trace("Reservation resource '{}' removed from YARN", resource);

queue(new ContainerHandler(ugi, resource, container, Action.START));

iterator.remove();
containerIsClaimed = true;
break;
}
}
if(!containerIsClaimed) {
LOG.error("No matching request for {}. Releasing the container.",
container);
containerToResourceMap.remove(container.getId());
amRmClientAsync.releaseAssignedContainer(container.getId());
}
Expand Down