Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify data in #1127

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3bb395f
Simpler version of data in
rowleya Nov 30, 2023
ca54bbd
Style
rowleya Nov 30, 2023
af3f58d
Bigger timeout for packets
rowleya Nov 30, 2023
08db657
Style
rowleya Nov 30, 2023
a8b299f
Try just one at at time
rowleya Nov 30, 2023
33720a2
Style
rowleya Nov 30, 2023
a146979
No need to close
rowleya Nov 30, 2023
c225f47
Back to 1 for testing
rowleya Nov 30, 2023
22d4a22
Use board local coordinates
rowleya Dec 1, 2023
a2e8627
Style
rowleya Dec 1, 2023
7a19bef
Does more channels help
rowleya Dec 1, 2023
df1614b
Move data through a stream
rowleya Dec 5, 2023
9b43acf
Debugging... need thread-safe multi-channel system!
rowleya Dec 7, 2023
e55d9a7
Working multi-threaded
rowleya Dec 7, 2023
ceba9c7
Working with gathering of packets!
rowleya Dec 7, 2023
6039f1c
Fixes
rowleya Dec 7, 2023
c16ec72
Write the last packet (doh)
rowleya Dec 7, 2023
9f44da8
Revert "Write the last packet (doh)"
rowleya Dec 7, 2023
3c5844a
Revert "Fixes"
rowleya Dec 7, 2023
8c1693c
Revert "Working with gathering of packets!"
rowleya Dec 7, 2023
746a521
Revert "Working multi-threaded"
rowleya Dec 7, 2023
3835ecb
Revert "Debugging... need thread-safe multi-channel system!"
rowleya Dec 7, 2023
b1a192f
Revert "Move data through a stream"
rowleya Dec 7, 2023
72101ce
Remove bits no longer needed
rowleya Dec 8, 2023
316ac97
Fix tests
rowleya Dec 11, 2023
0796ec9
Missed one!
rowleya Dec 11, 2023
98b97f3
Merge branch 'master' into simplify_data_in
rowleya Jul 22, 2024
0a12207
Merge branch 'master' into simplify_data_in
rowleya Jul 22, 2024
1783b5c
Merge branch 'simplify_data_in' of https://github.com/SpiNNakerManche…
rowleya Jul 22, 2024
a489539
The constant has changed name...
rowleya Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2019 The University of Manchester
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.manchester.spinnaker.messages.scp;

import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_EXPECTED;
import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.COPY_DATA_IN_PORT;
import static uk.ac.manchester.spinnaker.messages.scp.SCPCommand.CMD_WRITE;
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;


import java.nio.ByteBuffer;

import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
import uk.ac.manchester.spinnaker.messages.model.UnexpectedResponseCodeException;
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;

/**
* A command message to the Data In port to write multicast data.
*/
public class SendMCDataRequest extends SCPRequest<EmptyResponse> {
/** Shift of x in the coordinates for arg2. */
private static final int X_SHIFT = 16;

/**
* @param core
* Where to send the request.
* @param targetCore
* The target core of the write.
* @param baseAddress
* The address to write to on the target core.
* @param data
* The data to write.
*/
public SendMCDataRequest(HasCoreLocation core, HasCoreLocation targetCore,
MemoryLocation baseAddress, ByteBuffer data) {
super(header(core), CMD_WRITE, baseAddress.address,
(targetCore.getX() << X_SHIFT) | targetCore.getY(),
data.remaining() / WORD_SIZE, data);
}

/**
* Make a variant of SDP header that talks to the packet reinjector. It
* <i>always</i> wants a reply and always talks to a particular SDP port
* (the port for the reinjector).
*
* @param core
* The SpiNNaker core that we want to talk to. Should be running
* the extra monitor core (not checked).
* @return The SDP header.
*/
private static SDPHeader header(HasCoreLocation core) {
return new SDPHeader(REPLY_EXPECTED, new SDPLocation(core),
COPY_DATA_IN_PORT);
}

@Override
public EmptyResponse getSCPResponse(ByteBuffer buffer)
throws UnexpectedResponseCodeException {
return new EmptyResponse("Copy Data In", CMD_WRITE, buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public enum SDPPort {
/** Extra monitor core data transfer functionality. */
EXTRA_MONITOR_CORE_DATA_SPEED_UP(5),
/** Messages directed at the packet gatherer for the speed up protocols. */
GATHERER_DATA_SPEED_UP(6);
GATHERER_DATA_SPEED_UP(6),
/** Messages directed at the data in loader to simply load data. */
COPY_DATA_IN_PORT(7);

/** The port ID. */
public final int value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,16 @@ public void writeMemoryFlood(MemoryLocation baseAddress, ByteBuffer data)
}
}

@Override
@ParallelSafe
public void writeMemoryMulticast(HasCoreLocation core,
HasCoreLocation targetCore, MemoryLocation baseAddress,
ByteBuffer data)
throws IOException, ProcessException, InterruptedException {
new WriteMemoryByMulticastProcess(scpSelector, this).writeMemory(
core, targetCore, baseAddress, data);
}

@Override
@CheckReturnValue
@ParallelSafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,34 @@ void writeMemory(@Valid HasCoreLocation core,
@NotNull MemoryLocation baseAddress, @NotNull ByteBuffer data)
throws IOException, ProcessException, InterruptedException;

/**
* Write to the board via multicast on the Ethernet chip.
*
* @param core
* The coordinates of the Ethernet core containing the advanced
* monitor support
* @param targetCore
* The coordinates of the core where the memory is that is to be
* written to
* @param baseAddress
* The address in SDRAM where the region of memory is to be
* written
* @param data
* The data that is to be written. The data should be from the
* <i>position</i> to the <i>limit</i>.
* @throws IOException
* If anything goes wrong with networking.
* @throws ProcessException
* If SpiNNaker rejects a message.
* @throws InterruptedException
* If the communications were interrupted.
*/
@ParallelSafe
void writeMemoryMulticast(@Valid HasCoreLocation core,
@Valid HasCoreLocation targetCore,
@NotNull MemoryLocation baseAddress, @NotNull ByteBuffer data)
throws IOException, ProcessException, InterruptedException;

/**
* Write to the user<sub>0</sub> register of a core.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (c) 2018 The University of Manchester
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.manchester.spinnaker.transceiver;

import static java.lang.Math.max;
import static java.nio.ByteBuffer.allocate;
import static uk.ac.manchester.spinnaker.messages.Constants.UDP_MESSAGE_MAX_SIZE;
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.read;
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.sliceUp;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import uk.ac.manchester.spinnaker.connections.ConnectionSelector;
import uk.ac.manchester.spinnaker.connections.SCPConnection;
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
import uk.ac.manchester.spinnaker.messages.scp.SendMCDataRequest;

/**
* Write to memory on SpiNNaker via multicast (data in only).
*/
class WriteMemoryByMulticastProcess extends TxrxProcess {

/** Timeout for a write request; longer as the write can take some time. */
private static final int TIMEOUT = 10000;

/** The number of simultaneous messages that can be in progress. */
private static final int N_CHANNELS = 8;

/**
* @param connectionSelector
* How to select how to communicate.
* @param retryTracker
* Object used to track how many retries were used in an
* operation. May be {@code null} if no suck tracking is
* required.
*/
WriteMemoryByMulticastProcess(
ConnectionSelector<? extends SCPConnection> connectionSelector,
RetryTracker retryTracker) {
super(connectionSelector, SCP_RETRIES, TIMEOUT, N_CHANNELS,
N_CHANNELS - 1, retryTracker);
}

/**
* @param connectionSelector
* How to select how to communicate.
* @param numChannels
* The number of parallel communications to support
* @param retryTracker
* Object used to track how many retries were used in an
* operation. May be {@code null} if no suck tracking is
* required.
*/
WriteMemoryByMulticastProcess(
ConnectionSelector<? extends SCPConnection> connectionSelector,
int numChannels, RetryTracker retryTracker) {
super(connectionSelector, SCP_RETRIES, TIMEOUT, numChannels,
max(numChannels / 2, 1), retryTracker);
}

/**
* Write to memory.
*
* @param core
* The location to send the message to.
* @param targetCore
* The target to write the data to.
* @param baseAddress
* The base address to write.
* @param data
* The overall block of memory to write
* @throws IOException
* If anything goes wrong with networking.
* @throws ProcessException
* If SpiNNaker rejects a message.
* @throws InterruptedException
* If the communications were interrupted.
*/
public void writeMemory(
HasCoreLocation core, HasCoreLocation targetCore,
MemoryLocation baseAddress, ByteBuffer data)
throws IOException, ProcessException, InterruptedException {
var writePosition = baseAddress;
for (var bb : sliceUp(data, UDP_MESSAGE_MAX_SIZE)) {
sendRequest(new SendMCDataRequest(core, targetCore, writePosition,
bb));
writePosition = writePosition.add(bb.remaining());
}
finishBatch();
}

/**
* Write to memory.
*
* @param core
* The location to send the message to.
* @param targetCore
* The target to write the data to.
* @param baseAddress
* The base address to write.
* @param data
* The stream of data to write.
* @throws IOException
* If anything goes wrong with networking or the input stream.
* @throws ProcessException
* If SpiNNaker rejects a message.
* @throws InterruptedException
* If the communications were interrupted.
*/
public void writeMemory(
HasCoreLocation core, HasCoreLocation targetCore,
MemoryLocation baseAddress, InputStream data)
throws IOException, ProcessException, InterruptedException {
var writePosition = baseAddress;
while (true) {
// One buffer per message; lifetime extends until batch end
var tmp = read(data, allocate(UDP_MESSAGE_MAX_SIZE),
UDP_MESSAGE_MAX_SIZE);
if (tmp == null) {
break;
}
sendRequest(new SendMCDataRequest(core, targetCore, writePosition,
tmp));
writePosition = writePosition.add(tmp.remaining());
}
finishBatch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import static uk.ac.manchester.spinnaker.alloc.client.SpallocClientFactory.getJobFromProxyInfo;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DOWNLOAD_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_APP_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_MON_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_MON_DESC_MC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_SYS_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.GATHER_DESC;
import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.IOBUF_DESC;
Expand Down Expand Up @@ -71,7 +70,7 @@
import uk.ac.manchester.spinnaker.front_end.download.RecordingRegionDataGatherer;
import uk.ac.manchester.spinnaker.front_end.download.request.Gather;
import uk.ac.manchester.spinnaker.front_end.download.request.Placement;
import uk.ac.manchester.spinnaker.front_end.dse.FastExecuteDataSpecification;
import uk.ac.manchester.spinnaker.front_end.dse.FastMCExecuteDataSpecification;
import uk.ac.manchester.spinnaker.front_end.dse.HostExecuteDataSpecification;
import uk.ac.manchester.spinnaker.front_end.iobuf.IobufRequest;
import uk.ac.manchester.spinnaker.front_end.iobuf.IobufRetriever;
Expand Down Expand Up @@ -195,19 +194,19 @@ HostExecuteDataSpecification create(TransceiverInterface txrx,
static HostDSEFactory hostFactory = HostExecuteDataSpecification::new;

@FunctionalInterface
interface FastDSEFactory {
FastExecuteDataSpecification create(TransceiverInterface txrx,
interface FastMCDSEFactory {
FastMCExecuteDataSpecification create(TransceiverInterface txrx,
Machine machine, List<Gather> gatherers, File reportDir,
DSEDatabaseEngine db)
throws IOException, SpinnmanException, StorageException,
ExecutionException, InterruptedException, URISyntaxException;
}

/**
* Makes {@link FastExecuteDataSpecification} instances. Allows for
* Makes {@link FastMCExecuteDataSpecification} instances. Allows for
* injection of debugging tooling.
*/
static FastDSEFactory fastFactory = FastExecuteDataSpecification::new;
static FastMCDSEFactory fastMCFactory = FastMCExecuteDataSpecification::new;

/**
* Run the data specifications in parallel.
Expand Down Expand Up @@ -257,7 +256,7 @@ public void runDSEUploadingViaClassicTransfer(Machine machine,
}

/**
* Run the data specifications in parallel.
* Run the data specifications in parallel using monitors and multicast.
*
* @param gatherers
* List of descriptions of gatherers.
Expand All @@ -283,8 +282,8 @@ public void runDSEUploadingViaClassicTransfer(Machine machine,
* @throws URISyntaxException
* If a proxy URI is provided but invalid.
*/
@Command(name = "dse_app_mon", description = DSE_MON_DESC)
public void runDSEForAppCoresUploadingViaMonitorStreaming(
@Command(name = "dse_app_mon_mc", description = DSE_MON_DESC_MC)
public void runDSEForAppCoresUploadingViaMulticast(
@Mixin GatherersParam gatherers,
@Mixin MachineParam machine,
@Mixin DsFileParam dsFile,
Expand All @@ -298,7 +297,7 @@ public void runDSEForAppCoresUploadingViaMonitorStreaming(
var job = getJob(db);

try (var txrx = getTransceiver(machine.get(), job);
var dseExec = fastFactory.create(txrx, machine.get(),
var dseExec = fastMCFactory.create(txrx, machine.get(),
gatherers.get(), reportFolder.orElse(null), db)) {
dseExec.loadCores();
}
Expand Down Expand Up @@ -770,6 +769,14 @@ interface CommandDescriptions {
+ "Requires system cores to be fully configured, so "
+ "can't be used to set up system cores.";

/** Description of {@code dse_app_mon} command. */
String DSE_MON_DESC_MC =
"Evaluate data specifications for application cores "
+ "and upload the results to SpiNNaker using the fast data "
+ "streaming protocol directly with multicast. "
+ "Requires system cores to be fully configured, so "
+ "can't be used to set up system cores.";

/** Description of {@code dse_sys} command. */
String DSE_SYS_DESC = "Evaluate data specifications for system cores and "
+ "upload the results to SpiNNaker (always uses the classic "
Expand Down
Loading
Loading