Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskSupport overridden in ForkJoinTasks #160

Open
scabug opened this issue Mar 30, 2016 · 12 comments
Open

TaskSupport overridden in ForkJoinTasks #160

scabug opened this issue Mar 30, 2016 · 12 comments

Comments

@scabug
Copy link

scabug commented Mar 30, 2016

The wrong forkJoinPool is used in the following example:

implicit val someForkJoinPoolExecutionContext = ...
val otherForkJoinPoolEC = ...
val someCollection = ...
def foo() {
   // Future runs in someForkJoinPoolExecutionContext
   Future{
       val parCollection = someCollection.par
       parCollection.tasksupport = new ExecutionContextTaskSupport(otherForkJoinPoolEC)
       // this is executed in the someForkJoinPoolExecutionContext rather than otherForkJoinPoolEC
       parCollection.foreach(...)
    }
} 

In Tasks.scala, we see at line 420:
https://github.com/scala/scala/blob/5cb3d4ec14488ce2fc5a1cc8ebdd12845859c57d/src/library/scala/collection/parallel/Tasks.scala#L420

if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {

Which overrides the given tasksupport, preferring the executioncontext of the currentThread.

@scabug
Copy link
Author

scabug commented Mar 30, 2016

Imported From: https://issues.scala-lang.org/browse/SI-9727?orig=1
Reporter: Sietse Au (@stau)
Affected Versions: 2.12.0-M3
See scala/bug#6998

@scabug
Copy link
Author

scabug commented Apr 5, 2016

@soc said (edited on Apr 5, 2016 1:43:02 PM UTC):
Thanks for the report, Sietse!

Only slightly related to this, I think the mutable bits (mostly taskSupport) need to be cleaned up and removed, and a better immutable way should be provided.

@scabug
Copy link
Author

scabug commented Apr 6, 2016

@szeiger said:
Assigning to M5 because it changes semantics. Maybe out of scope for 2.12 anyway?

@scabug
Copy link
Author

scabug commented Apr 7, 2016

@soc said:
Don't think it is out of scope, even if we change the syntax later, the tests written for this issue will still be useful.

@michellemay
Copy link

When porting from scala 2.11 to 2.12.12, we found something similar. Here is a simple piece of code to reproduce the problem.

        val nbThreads = 2 // less than the number of cores
        val stuff = (0 until 100).par
        stuff.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nbThreads))

        val simultaneousExecutions = new AtomicInteger(0)
        stuff.foreach { userIndex =>
          val sim = simultaneousExecutions.incrementAndGet()
          println(sim)
          (0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
          Thread.sleep(50)
          simultaneousExecutions.decrementAndGet()
        }

@SethTisue
Copy link
Member

is the problem also reproducible on Scala 2.13.5?

@som-snytt
Copy link
Contributor

@michellemay
Copy link

scala/bug#11036

@michellemay
Copy link

is the problem also reproducible on Scala 2.13.5?

Yes it does..
https://scastie.scala-lang.org/I9WfbV6qS5WRR7zrmrIYHA

@som-snytt
Copy link
Contributor

som-snytt commented Feb 28, 2021

The umbrella issue for TaskSupport ergonomics might be #152

Probably @SethTisue intended to close this ticket with other module-related tickets?

I tried to follow the conclusions from a few years ago. For par, using the ForkJoinPool ctor with maximumPoolSize and minimumRunnable isn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.

A workaround is to call sum inside Future (using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)

Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import scala.collection.parallel.CollectionConverters._

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

object Test extends App {
  def t = Thread.currentThread
  def count = Thread.activeCount()
  def custom = {
    val nbThreads = 2 // less than the number of cores
    val asyncly = true
    val limited = false
    if (!limited) new ForkJoinPool(nbThreads)
    else new ForkJoinPool(
      nbThreads,
      ForkJoinPool.defaultForkJoinWorkerThreadFactory,
      /*handler=*/ null,
      /*asyncMode=*/ asyncly,
      /*corePoolSize=*/ nbThreads,
      /*maximumPoolSize=*/ nbThreads, // no excess, default is + 256
      /*mininumRunnable=*/ 0, // allow all blocked, default is 1
      /*saturate=*/ null,
      /*keepAliveTime=*/ 60L,
      TimeUnit.SECONDS,
    )
  }
  val stuff = (0 until 100).par
  stuff.tasksupport = new ForkJoinTaskSupport(custom)
  val commonsupport = new ForkJoinTaskSupport(ForkJoinPool.commonPool)

  val simultaneousExecutions = new AtomicInteger(0)
  val total = new AtomicInteger(0)
  stuff.foreach { userIndex =>
    val sim = simultaneousExecutions.incrementAndGet()
    val loop = total.incrementAndGet()
    println(s"$t: of $count: loop $loop ($sim): ${stuff.tasksupport.environment}")
    //(0 to 1000000).par.sum // Doing a parallel collection operation here seems to change 'stuff' collection tasksupport
    def sum = Future {
      val p = (0 to 10).par
      p.tasksupport = commonsupport
      p.sum
    }
    println(s"$t: of $count: loop $loop ($sim): sum: ${Await.result(sum, Duration.Inf)}")
    //Thread.sleep(50L)
    simultaneousExecutions.decrementAndGet()
  }
}

@SethTisue SethTisue transferred this issue from scala/bug Feb 28, 2021
@michellemay
Copy link

The umbrella issue for TaskSupport ergonomics might be #152

Probably @SethTisue intended to close this ticket with other module-related tickets?

I tried to follow the conclusions from a few years ago. For par, using the ForkJoinPool ctor with maximumPoolSize and minimumRunnable isn't sufficient to limit the "outer loop" to 2 submitted tasks and max 2 threads.

A workaround is to call sum inside Future (using default context aka common pool). (When the inner par operation is run, on the common pool I assume, is that run on the same thread? is it a fork or submit?)

Here is the updated snippet. The printlns make it easier to see order of operations. In the absence of blocking, the outer loop happily submits tasks, and the FJP may spawn an extra thread or 2 to ensure parallelism. It's crucial to distinguish tasks from the threads. I assume the expectation or goal is to run 2 tasks on 2 threads, with the next task (or foreach iteration) running when a thread is available.

This is an interesting solution but not one that is possible given the current code I use. The inner loop is not directly accessible.
It would require to change all method signatures and objects to add commonsupport parameter. It does not make sense since the inner code might run parallel operations anywhere.

My workaround is kind of the opposite of your code sample.. I execute the top level loop with futures on a fixed execution pool and leave the inner code do whatever it wants.

@som-snytt
Copy link
Contributor

I agree your solution is best. The promise of .par was that I can sprinkle it around and benefit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants