Question: Why do the goroutines not scale?

Question

Why do the goroutines not scale?

Answers 1
Added at 2017-09-07 18:09
Tags
Question

Background

I am currently working on my bachelor thesis and basically my task is to optimise a given code in Go, i.e. make it run as fast as possible. First, I optimised the serial function and then tried to introduce parallelism via goroutines. After researching on the internet I now understand the difference between concurrency and parallelism thanks to the following slides from talks.golang. I visited some parallel programming courses where we parallelised a c/c++ code with help of pthread/openmp, thus I tried to apply these paradigms in Go. That said, in this particular case I am optimising a function which computes the moving average of a slice with length len:=n+(window_size-1) (it equals either 9393 or 10175), hence we have n windows of which we compute the corresponding arithmetic average and save that properly in the output slice.

Note that this task is inherently embarrassing parallel.

My optimisation attempts and results

In moving_avg_concurrent2 I split up the slice into num_goroutines smaller pieces and ran each with one goroutine. This function performed with one goroutine, out of some reason (could not find out why yet, but we are getting tangent here), better than moving_avg_serial4 but with more than one goroutine it started to perform worse than moving_avg_serial4.
In moving_avg_concurrent3 I adopted the master/worker paradigm. The performance was worse than moving_avg_serial4 when using one goroutine. Here we at least I got a better performance when increasing num_goroutines but still not better than moving_avg_serial4. To compare the performances of moving_avg_serial4, moving_avg_concurrent2 and moving_avg_concurrent3 I wrote a benchmark and I tabulated the results:

fct & num_goroutines | timing in ns/op | percentage  
---------------------------------------------------------------------   
          serial4    |         4357893 |   100.00%  
          concur2_1  |         5174818 |   118.75%  
          concur2_4  |         9986386 |   229.16%  
          concur2_8  |        18973443 |   435.38%  
          concur2_32 |        75602438 |  1734.84%  
          concur3_1  |        32423150 |   744.01%  
          concur3_4  |        21083897 |   483.81%  
          concur3_8  |        16427430 |   376.96%  
          concur3_32 |        15157314 |   347.81%  

Question

Since as mentioned above this problem is embarrassingly parallel I was expecting to see a tremendous performance increase but that was not the case.

Why does moving_avg_concurrent2 not scale at all?
And why is moving_avg_concurrent3 that much slower than moving_avg_serial4?
I know that goroutines are cheap but still are not free, but is it possible that this generates that much overhead such that we are even slower than moving_avg_serial4?

Code

Functions:

// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i, val := range input {
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = val
            if !NaN_in_slice(buffer) && first_time {
                sum := 0.0
                for _, entry := range buffer {
                    sum += entry
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
                output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i := range input {
            //            fmt.Printf("in mvg_avg4: i=%v\n", i)
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
            if first_time && !NaN_in_slice(buffer) {
                sum := 0.0
                for j := range buffer {
                    sum += buffer[j]
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
                output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN's in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_items := len(input) - (window_size - 1)
        var barrier_wg sync.WaitGroup
        n := num_items / num_goroutines
        go_avg := make([][]float64, num_goroutines)
        for i := 0; i < num_goroutines; i++ {
            go_avg[i] = make([]float64, 0, num_goroutines)
        }

        for i := 0; i < num_goroutines; i++ {
            barrier_wg.Add(1)
            go func(go_id int) {
                defer barrier_wg.Done()

                // computing boundaries
                var start, stop int
                start = go_id*int(n) + (window_size - 1) // starting index
                // ending index
                if go_id != (num_goroutines - 1) {
                    stop = start + n // Ending index
                } else {
                    stop = num_items + (window_size - 1) // Ending index
                }

                loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

                loc_avg = make([]float64, stop-start)
                current_sum := 0.0
                for i := start - (window_size - 1); i < start+1; i++ {
                    current_sum += input[i]
                }
                loc_avg[0] = current_sum / float64(window_size)
                idx := 1

                for i := start + 1; i < stop; i++ {
                    loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
                    idx++
                }

                go_avg[go_id] = append(go_avg[go_id], loc_avg...)

            }(i)
        }
        barrier_wg.Wait()

        for i := 0; i < num_goroutines; i++ {
            output = append(output, go_avg[i]...)
        }

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
    sum := 0.0
    size := end - start
    for _, val := range input[start:end] {
        sum += val
    }
    output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_windows := len(input) - (window_size - 1)
        var output = make([]float64, len(input))
        for i := 0; i < window_size-1; i++ {
            output[i] = math.NaN()
        }

        pending := make(chan *Work)
        done := make(chan *Work)

        // creating work
        go func() {
            for i := 0; i < num_windows; i++ {
                pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
            }
        }()

        // start goroutines which work through pending till there is nothing left
        for i := 0; i < num_goroutines; i++ {
            go func() {
                Worker(pending, done)
            }()
        }

        // wait till every work is done
        for i := 0; i < num_windows; i++ {
            <-done
        }

        return output

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

Benchmarks:

//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res17 = r
}



func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
    benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
    benchmarkMoving_avg_serial4(b, 261*10)
}


func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}


func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############

Remarks:
This is my very first post, I am still learning, so any constructive criticism is also welcome.

Answers to

Why do the goroutines not scale?

nr: #1 dodano: 2017-09-08 23:09

Fact #0: Pre-mature optimisation efforts often have negative yields
showing they are just a waste of time & efforts


Why?
A single "wrong" SLOC may devastate performance into more than about +37%
or may improve performance to spend less than -57% of the baseline processing time

51.151µs on MA(200) [10000]float64    ~ 22.017µs on MA(200) [10000]int
70.325µs on MA(200) [10000]float64

Why []int-s?
You see it on your own above - this is the bread and butter for HPC/fintech efficient sub-[us] processing strategies ( and we still speak in terms of just [SERIAL] process scheduling ).

This one may test on any scale - but rather test first ( here ) your own implementations, on the very the same scale - MA(200) [10000]float64 setup - and post your baseline durations in [us] to view the initial process performance and to compare apples-to-apples, having the posted 51.2 [us] threshold to compare against.

Next comes the harder part:


Fact #1: This task is NOT embarrasingly parallel

Yes, one may go and implement a Moving Average calculation, so that it indeed proceeds through the heaps of data using some intentionally indoctrinated "just"-[CONCURRENT] processing approach ( irrespective whether being due to some kind of error, some authority's "advice", professional blindness or just from a dual-Socrates-fair ignorance ) which obviously does not mean that the nature of the convolutional stream-processing, present inside the Moving Average mathematical formulation, has forgotten to be a pure [SERIAL] process, just due to an attempt to enforce it get calculated inside some degree of "just"-[CONCURRENT] processing.

( Btw. The Hard Computer-Scientists and dual-domain nerds will also object here, that Go-language is by design using best Rob Pike's skills into having a framework of concurrent coroutines, not any true-[PARALLEL] process-scheduling, even though Hoare's CSP-tools, available in the language concept, may add some salt and pepper and introduce a stop-block type of inter-process communication tools, that will block "just"-[CONCURRENT] code sections into some hardwired CSP-p2p-synchronisation. )


Fact #2: Go distributed ( for any kind of speedup ) only AT THE END

Having a poor level of performance in [SERIAL] does not set any yardstick. Having a reasonable amount of performance tuning in single-thread, only then one may benefit from going distributed ( still having to pay additional serial costs, which makes Amdahl Law ( rather Overhead-strict-Amdahl Law ) get into the game ).

If one can introduce such low level of additional setup-overheads and still achieve any remarkable parallelism, scaled into the non-[SEQ] part of the processing, there and only there comes a chance to increase the process effective-performance.

It is not hard to loose much more than to gain in this, so always benchmark the pure-[SEQ] against the potential tradeoffs between a non-[SEQ] / N[PAR]_processes theoretical, overhead-naive speedup, for which one will pay a cost of a sum of all add-on-[SEQ]-overheads, so if and only if :

(         pure-[SEQ]_processing      [ns]
+       add-on-[SEQ]-setup-overheads [ns]
+        ( non-[SEQ]_processing      [ns] / N[PAR]_processes )
  ) << (  pure-[SEQ]_processing      [ns]
       + ( non-[SEQ]_processing      [ns] / 1 )
         )

Not having this jet-fighters advantage of both the surplus height and Sun behind you, never attempt going into any kind of HPC / parallelisation attempts - they will never pay for themselves not being remarkably << better, than a smart [SEQ]-process.


enter image description here

Epilogue: on overhead-strict Amdahl's Law interactive experiment UI

One animation is worth million words.

An interactive animation even better:

So,
assume a process-under-test, which has both a [SERIAL] and a [PARALLEL] part of the process schedule.

Let p be the [PARALLEL] fraction of the process duration ~ ( 0.0 .. 1.0 ) thus the [SERIAL] part does not last longer than ( 1 - p ), right?

So, let's start interactive experimentation from such a test-case, where the p == 1.0, meaning all such process duration is spent in just a [PARALLEL] part, and the both initial serial and the terminating parts of the process-flow ( that principally are always [SERIAL] ) have zero-durations ( ( 1 - p ) == 0. )

Assume the system does no particular magic and thus needs to spend some real steps on intialisation of each of the [PARALLEL] part, so as to run it on a different processor ( (1), 2, .., N ), so let's add some overheads, if asked to re-organise the process flow and to marshal + distribute + un-marshal all the necessary instructions and data, so as the intended process now can start and run on N processors in parallel.

These costs are called o ( here initially assumed for simplicity to be just constant and invariant to N, which is not always the case in real, on silicon / on NUMA / on distributed infrastructures ).

By clicking on the Epilogue headline above, an interactive environment opens and is free for one's own experimentation.

With p == 1. && o == 0. && N > 1 the performance is steeply growing to current achievable [PARALLEL]-hardware O/S limits for a still monolytical O/S code-execution ( where still no additional distribution costs for MPI- and similar depeche-mode distributions of work-units ( where one would immediately have to add indeed a big number of [ms], while our so far best just [SERIAL] implementation has obviously did the whole job in less than just ~ 22.1 [us] ) ).

But except such artificially optimistic case, the job does not look so cheap to get efficiently parallelised.

  • Try having not a zero, but just about ~ 0.01% the setup overhead costs of o, and the line starts to show some very different nature of the overhead-aware scaling for even the utmost extreme [PARALLEL] case ( having still p == 1.0 ), and having the potential speedup somewhere about just near the half of the initially super-idealistic linear speedup case.

  • Now, turn the p to something closer to reality, somewhere less artificially set than the initial super-idealistic case of == 1.00 --> { 0.99, 0.98, 0.95 } and ... bingo, this is the reality, where process-scheduling ought be tested and pre-validated.

What does that mean?

As an example, if an overhead ( of launching + final joining a pool of coroutines ) would take more than ~ 0.1% of the actual [PARALLEL] processing section duration, there would be not bigger speedup of 4x ( about a 1/4 of the original duration in time ) for 5 coroutines ( having p ~ 0.95 ), not more than 10x ( a 10-times faster duration ) for 20 coroutines ( all assuming that a system has 5-CPU-cores, resp. 20-CPU-cores free & available and ready ( best with O/S-level CPU-core-affinity mapped processes / threads ) for uninterrupted serving all those coroutines during their whole life-span, so as to achieve any above expected speedups.

Not having such amount of hardware resources free and ready for all of those task-units, intended for implementing the [PARALLEL]-part of the process-schedule, the blocking/waiting states will introduce additional absolute wait-states and the resulting-performance adds these new-[SERIAL]-blocking/waiting sections to the overall process-duration and the initially wished-to-have speedups suddenly cease to exist and performance factor falls well under << 1.00 ( meaning that the effective run-time was due to the blocking-states way slower, than the non-parallelised just-[SERIAL] workflow ).

This may sound complicated for new keen experimentators, however we may put it in a reversed perspective. Given the whole process of distribution the intended [PARALLEL] pool-of-tasks is known not to be shorter than, say, about a 10 [us], the overhead-strict graphs show, there needs to be at least about 1000 x 10 [us] of non-blocking computing intensive processing inside the [PARALLEL] section so as not to devastate the efficiency of the parallelised-processing.

If there is not a sufficiently "fat"-piece of processing, the overhead-costs ( going remarkably above the above cited threshold of ~ 0.1% ) then brutally devastate the net-efficiency of the successfully parallellised-processing ( but having performed at such unjustifiably high relative costs of the setup vs the limited net effects of any-N-processors, as was demonstrated in the available live-graphs ).

There is no surprise for distributed-computing nerds, that the overhead o comes with also additional dependencies - on N ( the more processes, the more efforts are to be spent to distribute work-packages ), on marshalled data-BLOBs' sizes ( the larger the BLOBs, the longer the MEM-/IO-devices remain blocked, before serving the next process to receive a distributed BLOB across such device/resource for each of the target 2..N-th receiving process ), on avoided / CSP-signalled, channel-mediated inter-process coordinations ( call it additional per-incident blocking, reducing the p further and further below the ultimately nice ideal of 1. ).

So, the real-world reality is rather very far from the initially idealised, nice and promising p == 1.0, ( 1 - p ) == 0.0 and o == 0.0

As obvious from the very beginning, try to beat rather the 22.1 [us] [SERIAL] threshold, than trying to beat this, while getting worse and worse, if going [PARALLEL] where realistic overheads and scaling, using already under-performing approaches, does not help a single bit.

Source Show
◀ Wstecz