Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "nuget" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
5 changes: 3 additions & 2 deletions Alpheus.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29009.5
# Visual Studio Version 17
VisualStudioVersion = 17.4.33122.133
MinimumVisualStudioVersion = 10.0.40219.1
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "AlpheusCore", "AlpheusCore\AlpheusCore.fsproj", "{F429F84A-B34A-4FA2-9610-6D50B921D5B1}"
EndProject
Expand All @@ -12,6 +12,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution items", "Solution items", "{11C4123F-810D-46CE-90E3-F6151573483C}"
ProjectSection(SolutionItems) = preProject
.drone.yml = .drone.yml
.github\dependabot.yml = .github\dependabot.yml
README.md = README.md
EndProjectSection
EndProject
Expand Down
46 changes: 45 additions & 1 deletion AlpheusCore/AngaraGraphCommon.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ let getInputTypes<'a> (v:MethodVertex) =
| Source src -> List.empty
| Command cmd -> cmd.Inputs |> Seq.map(fun a -> max 0 (a.Artefact.Rank - rank) |> arrayType<'a>) |> List.ofSeq

let toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (map: MdMap<string,'a>) : obj =
/// returns either jagged array of 'c or single 'c element. Thus return type obj. This brakes the type checking,
/// but unfortunately that's how Angara expects artefact to be of that type
let toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (map: MdMap<string,'a>) =
let rec toJaggedArrayOrValueRec (mapValue: (string list * 'a) -> 'c) (index: string list) (map: MdMapTree<string,'a>) : obj =
let isValue = function
| MdMapTree.Value _ -> true
Expand Down Expand Up @@ -69,6 +71,48 @@ let toJaggedArrayOrValue (mapValue: (string list * 'a) -> 'c) (map: MdMap<string
| MdMapTree.Value _ -> failwith "Data is incomplete and has missing elements"))
toJaggedArrayOrValueRec mapValue [] (map |> MdMap.toTree)

/// returns either jagged array of 'c or single 'c element. Thus return type obj. This brakes the type checking,
/// but unfortunately that's how Angara expects artefact to be of that type
let toJaggedArrayOrValueAsync (mapValue: (string list * 'a) -> Async<'c>) (map: MdMap<string,'a>) : Async<obj> =
let rec toJaggedArrayOrValueRec (mapValue: (string list * 'a) -> Async<'c>) (index: string list) (map: MdMapTree<string,'a>) : Async<obj> =
let isValue = function
| MdMapTree.Value _ -> true
| MdMapTree.Map _ -> false

let mapToArray (getElement: (string * MdMapTree<string,'a>) -> Async<'b>) (map: Map<string,MdMapTree<string,'a>>) : Async<'b[]> =
map |> Map.toSeq |> Seq.sortBy fst |> Seq.map getElement |> Seq.toArray |> Async.Parallel

let append v list = list |> List.append [v]

match map with
| MdMapTree.Value v ->
async {
let! mappedValue = mapValue (index, v)
return upcast(mappedValue)
}
| MdMapTree.Map subMap ->
match subMap |> Map.forall(fun _ -> isValue) with
| true -> // final level
async {
let! arr = subMap |> mapToArray (fun (k,t) ->
let newIndex = index |> append k
match t with
| MdMapTree.Value v -> mapValue (newIndex, v)
| MdMapTree.Map _ -> failwith "Unreachable case")
return arr
}
| false ->
async {
let! arr = subMap |> mapToArray (fun (k,t) ->
let newIndex = index |> append k
match t with
| MdMapTree.Map _ -> toJaggedArrayOrValueRec mapValue newIndex t
| MdMapTree.Value _ -> failwith "Data is incomplete and has missing elements")
return arr
}
toJaggedArrayOrValueRec mapValue [] (map |> MdMap.toTree)


/// Truncates `index` so its length is `rank`, if rank is less or equal to the length of the index.
/// Throws if the rank is greater than the length of the index.
let rec internal truncateIndex (rank: int) (index: string list) =
Expand Down
3 changes: 2 additions & 1 deletion AlpheusCore/ComputationGraph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ type CommandMethod(command: CommandLineVertex,
outputItems |> toJaggedArrayOrValue (fun (extraIndex, itemFullPath) -> { FullPath = itemFullPath; Index = index @ extraIndex; UpdateType=Process })
else
upcast { FullPath = outputPath; Index = index; UpdateType=Process }
return Seq.singleton(outputPaths |> List.map outPathToArtefactItem, null)
let outputs = outputPaths |> List.map outPathToArtefactItem
return Seq.singleton(outputs, null)


} |> Async.RunSynchronously
Expand Down
2 changes: 1 addition & 1 deletion AlpheusCore/DependencyGraph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ and LinkToArtefact(artefact: ArtefactVertex, expectedVersion: ArtefactVersion) =
member s.AnalyzeStatus checkStoragePresence (index: string list) =
if index.Length <> artefact.Rank then invalidArg "index" "Index doesn't correspond to the rank of the artefact"
async {
let expectedVersion = MdMap.find index expected
let expectedVersion = MdMap.tryFind index expected |> Option.flatten
let! artefactItemActualVersion = artefact.ActualVersion.Get index
let expectation =
match expectedVersion with
Expand Down
78 changes: 57 additions & 21 deletions AlpheusCore/StatusGraph.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@ type SourceMethod(source: SourceVertex, experimentRoot, checkStoragePresense) =
let expectedArtefact = source.Output
let artefact = expectedArtefact.Artefact

let getItemStatus (link:LinkToArtefact) index =
link.AnalyzeStatus checkStoragePresense index

// Output of the method is an scalar or a vector of full paths to the data of the artefact.
let indices =
artefact.Id
|> PathUtils.enumerateItems experimentRoot
|> MdMap.toSeq |> Seq.map fst |> Array.ofSeq
let diskItems = artefact.Id |> PathUtils.enumerateItems experimentRoot
let expectedVersions = expectedArtefact.ExpectedVersion
let actualIndices = diskItems |> MdMap.toSeq |> Seq.map fst
let! actualVersionsStrached = actualIndices |> Seq.map artefact.ActualVersion.Get |> Async.Parallel
let actualVersions = Seq.zip actualIndices actualVersionsStrached |> Seq.fold (fun s e -> let k,v = e in MdMap.add k v s) MdMap.empty

let! itemStatuses =
indices |> Array.map (getItemStatus expectedArtefact) |> Async.Parallel
let merger _ expectedOpt actualOpt = (Option.flatten expectedOpt), (Option.flatten actualOpt)
let expAndAct = Utils.mdmapMerge merger expectedVersions actualVersions

let linkStatusToCommandVertextStatus status =
match status with
Expand All @@ -53,12 +50,45 @@ type SourceMethod(source: SourceVertex, experimentRoot, checkStoragePresense) =
failwith "The artefact data is not found neither on disk nor in any of the available storages"
| Local -> UpToDate [ArtefactLocation.Local]
| Remote -> UpToDate [ArtefactLocation.Remote]
let result =
Array.map2 (fun index status -> {ArtefactId = artefact.Id; Index = index; Status = linkStatusToCommandVertextStatus status}) indices itemStatuses
|> Seq.map (fun x -> x :> Artefact)
|> List.ofSeq

return seq{ yield (result, null) }

let getItemLinkStatus (link:LinkToArtefact) index =
link.AnalyzeStatus checkStoragePresense index

let outputArtefactAsync : Async<Artefact> =
async {
// the behavior significantly differs for scalar and vector source vertices
// for scalar, we always evaluate the expected version
if expAndAct.IsScalar then
let! status = getItemLinkStatus expectedArtefact []
let vertStatus = linkStatusToCommandVertextStatus status
return upcast { ArtefactId = artefact.Id; Index = []; Status = vertStatus}
else
// For the vector (e.g. having "*" in the path) we distinguish two cases:
// * Nothing on disk match the pattern.
// In this case we respect the expected version and the vector indices stored there
// * There is at list one item on disk that matches the pattern.
// We consider this is a new source of vector map index. We completely ignore expected version indices
if Seq.isEmpty actualIndices then
// nothing is on disk. Evaluating the expected version indices
let itemMapper (arg:string list * HashString option): Async<ArtefactItem> =
async {
let idx, _ = arg // idx, expectedVer
let! status = getItemLinkStatus expectedArtefact idx
let vertItemStatus = linkStatusToCommandVertextStatus status
return { ArtefactId= artefact.Id; Index = idx; Status=vertItemStatus }
}
let! artefactItems = expectedVersions |> toJaggedArrayOrValueAsync itemMapper
return artefactItems
else
// there are something on disk that match the pattern
// we emit the indices based on what is on the disk
let itemMapper (pair:string list * string): ArtefactItem =
let idx,_ = pair
{ ArtefactId= artefact.Id; Index = idx; Status=UpToDate[ArtefactLocation.Local] }
return diskItems |> toJaggedArrayOrValue itemMapper
}
let! outputArtefact = outputArtefactAsync
return Seq.singleton([outputArtefact], null)
} |> Async.RunSynchronously


Expand All @@ -70,7 +100,7 @@ type CommandMethod(command: CommandLineVertex,
override s.Execute(inputs, _) = //ignoring checkpoint.
async{
// Rules of execution
// The artefact is valid either if actual disk version matches expected version or if the disk version is absend and expected version is restorable from storage
// The artefact is valid either if actual disk version matches expected version or if the disk version is absent and expected version is restorable from storage
// We can bypass the computation entirely if inputs and outputs are valid

let inputItems = inputs |> List.map (fun inp -> inp :?> ArtefactItem)
Expand Down Expand Up @@ -142,10 +172,16 @@ let getStatuses (g:FlowGraph<AngaraGraphNode<ArtefactItem>>) =
ArtefactStatus.UpToDate(List.item outputIdx outputs)
| Outdated reason ->
NeedsRecomputation reason
let outputNumToRes idx : (ArtefactId * string list* ArtefactStatus) =
let artItem: ArtefactItem = downcast x.TryGet(idx).Value
artItem.ArtefactId, artItem.Index, (methodInstanceStatusToOutputStatus artItem.Status idx)
Seq.init outputsCount outputNumToRes
let outputNumToRes idx : (ArtefactId * string list* ArtefactStatus) seq =
let genTuple artItem =
artItem.ArtefactId, artItem.Index, (methodInstanceStatusToOutputStatus artItem.Status idx)
match x.TryGet(idx).Value with
| :? ArtefactItem as artItem ->
artItem |> genTuple |> Seq.singleton
| :? (ArtefactItem[]) as vector ->
vector |> Seq.map genTuple
| _ -> failwith "Unexpected type of the artItem"
Seq.init outputsCount outputNumToRes |> Seq.collect id
let itemsStatus = state |> MdMap.toSeq |> Seq.collect (fun x -> let _,v = x in v.Data.Value |> toArtefactItemStatus)
itemsStatus

Expand Down
1 change: 1 addition & 0 deletions AlpheusUnitTests/AlpheusUnitTests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Compile Include="TestUtils.fs" />
<Compile Include="OsCommands.fs" />
<Compile Include="GitIgnoreManagerTests.fs" />
<Compile Include="MethodCommandSyntaxTests.fs" />
<Compile Include="HashingTests.fs" />
Expand Down
70 changes: 70 additions & 0 deletions AlpheusUnitTests/ApiTests.Vectors.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
open Xunit
open ItisLab.Alpheus.Tests
open ItisLab.Alpheus.Tests.Utils
open ItisLab.Alpheus.Tests.OsCommands
open ItisLab.Alpheus.API
open System.Collections
open System.Collections.Generic
Expand Down Expand Up @@ -78,6 +79,22 @@ type ``Vector scenarios``(output) as this =
| TargetPlatform.Linux -> strings |> Seq.map(fun s -> s + "\n") |> String.concat ""
| _ -> failwith "Unknown platform"

let buildVectorExperiment(path) =
async {
let path = Path.GetFullPath path
let! _ = API.createExperimentDirectory path
Directory.CreateDirectory(Path.Combine(path,"vec1")) |> ignore
Directory.CreateDirectory(Path.Combine(path,"vec2")) |> ignore

do! File.WriteAllTextAsync(Path.Combine(path,"vec1","a.txt"),"File 1\\r\\n") |> Async.AwaitTask
do! File.WriteAllTextAsync(Path.Combine(path,"vec1","b.txt"),"File 2\\r\\n") |> Async.AwaitTask
do! File.WriteAllTextAsync(Path.Combine(path,"vec1","c.txt"),"File 3\\r\\n") |> Async.AwaitTask

let! res1 = API.buildAsync path path ["vec1/*.txt"] ["vec2/*.txt"] copyFileCommand DependencyGraph.CommandExecutionSettings.Default
assertResultOk res1
return ()
}


[<Fact>]
member s.``Runs same method for multiple input files``() =
Expand Down Expand Up @@ -725,6 +742,59 @@ type ``Vector scenarios``(output) as this =
//["sample2"] |> concatStrings |> assertFileContent (Path.Combine(root, "samples", "sample2.txt"))
//["sample3"] |> concatStrings |> assertFileContent (Path.Combine(root, "samples", "sample3.txt"))
}

[<Fact>]
member s.``Status: Source vector``() =
async {
let path = Path.GetFullPath s.RelativeExperimentRoot
do! buildVectorExperiment(path)

let res = API.status(path,ArtefactId.Path("vec1/*.txt"))
match res with
| Ok r ->
let expectedVector = MdMap.empty
let expectedVector = MdMap.add ["a"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector
let expectedVector = MdMap.add ["b"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector
let expectedVector = MdMap.add ["c"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector
let expectedStatuses:Map<ArtefactId,MdMap<string,StatusGraph.ArtefactStatus>> =
[
ArtefactId.Path "vec1/*.txt",expectedVector;
] |> Map.ofList
Assert.True(equalStatuses expectedStatuses r)
()
| Error e->
Assert.True(false, sprintf "Error: %A" e)
}

[<Fact>]
member s.``Status: vector``() =
async {
let path = Path.GetFullPath s.RelativeExperimentRoot
do! buildVectorExperiment(path)

let res = API.status(path,ArtefactId.Path("vec2/*.txt"))
match res with
| Ok r ->
let expectedVector = MdMap.empty
let expectedVector = MdMap.add ["a"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector
let expectedVector = MdMap.add ["b"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector
let expectedVector = MdMap.add ["c"] (StatusGraph.ArtefactStatus.UpToDate DependencyGraph.Local) expectedVector

let expectedVector2 = MdMap.empty
let expectedVector2 = MdMap.add ["a"] (StatusGraph.ArtefactStatus.NeedsRecomputation InputsOutdated) expectedVector2
let expectedVector2 = MdMap.add ["b"] (StatusGraph.ArtefactStatus.NeedsRecomputation InputsOutdated) expectedVector2
let expectedVector2 = MdMap.add ["c"] (StatusGraph.ArtefactStatus.NeedsRecomputation InputsOutdated) expectedVector2

let expectedStatuses:Map<ArtefactId,MdMap<string,StatusGraph.ArtefactStatus>> =
[
ArtefactId.Path "vec1/*.txt",expectedVector;
ArtefactId.Path "vec2/*.txt",expectedVector2;
] |> Map.ofList
Assert.True(equalStatuses expectedStatuses r)
()
| Error e->
Assert.True(false, sprintf "Error: %A" e)
}



Expand Down
38 changes: 2 additions & 36 deletions AlpheusUnitTests/ApiTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
open Xunit
open ItisLab.Alpheus.Tests
open ItisLab.Alpheus.Tests.Utils
open ItisLab.Alpheus.Tests.OsCommands
open ItisLab.Alpheus.API
open System.Collections
open System.Collections.Generic
Expand Down Expand Up @@ -375,49 +376,15 @@ type DepGraphSaveRestore(output) =

} |> toAsyncFact

let equalStatuses expected actual =
let s1 = Map.toSeq expected
let s2 = Map.toSeq actual
Seq.forall2 (fun x y -> let idx1,v1 = x in let idx2,v2 = y in (idx1=idx2) && MdMap.equal (fun _ elem1 elem2 -> elem1=elem2) v1 v2) s1 s2



type ScalarScenarios(output) =
inherit SingleUseOneTimeDirectory(output)

let copyFileCommand =
if isTestRuntimeWindows then
"cmd /C \"copy $in1 $out1\""
else
"/bin/sh -c \"cp $in1 $out1\""


let twoOutputsCommand =
if isTestRuntimeWindows then
"cmd /C \"cmd.exe /C copy $in1 $out1 & cmd.exe /C copy $in2 $out2\""
else
"/bin/sh -c \"cat $in1 > $out1; cat $in2 >> $out2\""

let copyDirCommand =
if isTestRuntimeWindows then
"robocopy /E $in1 $out1"
else
// ubuntu cp works differently if the dest dir exists.
// as alpheus creates output dirs before running the command we need to delete it
"/bin/sh -c \"rm -Rv $out1 ; cp -Rv $in1 $out1\""

let concatCommand =
if isTestRuntimeWindows then
"cmd /C \"cat.cmd $out1 $in1 $in2\""
else
"/bin/sh -c \"cat $in1 > $out1; cat $in2 >> $out1\""

// first file is duplicated
let concatCommand2 =
if isTestRuntimeWindows then
"cmd /C \"cat.cmd $out1 $in1 $in1\""
else
"/bin/sh -c \"cat $in1 > $out1; cat $in1 >> $out1\""


let buildExperiment(path) =
async {
Expand Down Expand Up @@ -1448,4 +1415,3 @@ type ScalarScenarios(output) =
| Error e->
Assert.True(false, sprintf "Error: %A" e)
}

Loading