Concurrency 102

12.11.2014

Въпрос за мъфин #1

Как постигаме паралелизъм в Go?

Въпрос за мъфин #2

Какво е отношението между go рутини и нишки?

Въпрос за мъфин #3

Каква е разликата между паралелизъм и конкурентност?

Въпрос за мъфин #4

Как Дойчо ви препоръчва да изчакваме много go рутини да си свършат работата?

Въпрос за мъфин #5

Какво става в go, когато всички go рутини блокират? Т.е. при deadblock?

Cond

type Cond struct {
    // L is held while observing or changing the condition
    L Locker
    // contains filtered or unexported fields
}

Communicate by sharing vs. Share by communicating

Channels

Употреба на канали

IO в канал

Операциите по изпращане и получаване се изпълняват с оператора <-

Канал може да бъде затворен

Създаване на канали

intChannel := make(chan int)
stringBufferedChannel := make(chan string, 5)
readOnlyChannel := make(<-chan int)
writeOnlyChannel := make(chan<- int)

Каналите са първокласни обекти в Go

c := make(chan chan int)
func doSomething(input <-chan string) {}
func doSomethingElse() chan string {
  result := make(chan string)
  return result
}

nil channel

Никога не използвайте неинициализиран канал!

package main

func main() {
    var c chan string
    c <- "ping" // deadlock
}
package main

import "fmt"

func main() {
    var c chan string
    fmt.Println(<-c) // deadlock
}

Пример

c := make(chan int)

go func() {
    list.Sort()
    c <- 1
}()

doSomethingForAWhile()
<-c

По-сложен пример

var sem = make(chan struct{}, MaxOutstanding)

func handle(r *Request) {
    <-sem
    process(r)
    sem <- struct{}{}
}

func init() {
    for i := 0; i < MaxOutstanding; i++ {
        sem <- struct{}{}
    }
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)
    }
}

Deadlock

func main() {
     c := make(chan int)
     c <- 42
     val := <-c
     println(val)
}

Затваряне на канали

package main

import "fmt"

func main() {
    ch := make(chan string)

    go func(output chan string) {
        for i := 0; i < 5; i++ {
            output <- fmt.Sprintf("sending N=%d", i)
        }
        close(output)
    }(ch)

    for i := 0; i < 7; i++ {
        val, ok := <-ch
        fmt.Printf("Recieved: %#v, %#v\n", val, ok)
    }

    ch <- fmt.Sprintf("where is your towel?")
}

range

Помните ли как ви казахме, че range е нещо супер яко?

for val := range ch {
    fmt.Printf("Recieved: %#v\n", val)
}

select

select {
case v1 := <-c1:
    fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
    fmt.Printf("received %v from c2\n", v2)
default:
    fmt.Printf("no one was ready to communicate\n")
}

Накратко: switch за канали.
Надълго: Изчаква първия канал, по който е изпратена стойност

Concurrency patterns

Timeout

select {
case result := <-google:
    fmt.Printf("received %v from google\n", result)
case result := <-bing:
    fmt.Printf("received %v from bing\n", result)
case <- time.After(1 * time.Second):
    fmt.Printf("timed out\n")
}

Игра на развален телефон

Игра на развален телефон

package main

import (
	"fmt"
)

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 100000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost

    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }

    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

Generators

package main

func fib() <-chan int {
    c := make(chan int)
    go func() {
        for a, b := 0, 1; ; a, b = b, a+b {
            c <- a
        }
    }()
    return c
}

func main() {
    fibonacci := fib()
    for i := 0; i < 10; i++ {
        println(<-fibonacci)
    }
}

Fan In

func talk(msg string) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s: %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
    kiro := talk("Kiro")
    ned := talk("Ned")
    for i := 0; i < 5; i++ {
        fmt.Println(<-kiro)
        fmt.Println(<-ned)
    }
}

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

Fan In <- Concurrency

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            }
        }
    }()
    return c
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string) <-chan string { // HL
	c := make(chan string)
	go func() { // HL
		for {
			select { // HL
			case s := <-input1:
				c <- s // HL
			case s := <-input2:
				c <- s // HL
			} // HL
		}
	}()
	return c
}

// FANIN END OMIT

func main() {
    c := fanIn(talk("Ned"), talk("Kiril"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
}

Finish channel

func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string {
    c := make(chan string)
    go func() {
        defer wg.Done()
        defer close(c)
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            case <-finish:
                return
            }
        }
    }()
    return c
}

Finish channel

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var wg sync.WaitGroup

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string { // HL
	c := make(chan string)
	go func() {
		defer wg.Done()
		defer close(c) // HL
		for {
			select {
			case s := <-input1:
				c <- s
			case s := <-input2:
				c <- s
			case <-finish: // HL
				return
			}
		}
	}()
	return c
}

// FANIN END OMIT

func main() {
    wg.Add(1)
    finish := make(chan struct{})
    c := fanIn(talk("Ned"), talk("Kiril"), finish)
    for value := range c {
        fmt.Println(value)
        if len(value) > 8 {
            close(finish)
        }
    }
    wg.Wait()
}

Въпроси?