Concurrency 101

28.10.2014

Но преди това...

Въпрос за мъфин #1

За какво можем да ползваме тестове

Въпрос за мъфин #2

За какво не можем да ползваме тестове

Въпрос за мъфин #3

Какво трябва да направим за да напишем тест за пакет foo

Въпрос за мъфин #4

Какво прави t.Parallel()

Въпрос за мъфин #5

Как документираме go код

Що е то concurrency?

Concurrency vs. Parallelism

Обяснение с малко повече gophers

Moore's law

А какво става, когато имаме много ядра?

IO-bound vs. CPU-bound

Processes vs. Threads (Green & Native)

Подходи

В C ползват вилици

#include <stdio.h>

int main()
{
    printf("before\n");
    if (fork())
        printf("father\n");
    else
        printf("son\n");
    printf("both\n");
}

Синхронизация на вилици

#include <stdio.h>
#include <unistd.h>

int main()
{
    pid_t pid = fork();
    if (pid == 0)
    {
        execl("/bin/sleep", "/bin/sleep", "2", (char *) 0);
    }
    else
    {
        waitpid(pid, NULL, 0);
    }
    printf("done!\n");
    return 0;
}

Как работи 'ps aux | grep myprocess'?

Demo pipes.c

Предимства и недостатъци на fork

Против:

За:

В Go се правим на модерни

Нишки

Goroutines

Скучно

За да се съсредоточим върху това, което се опитваме да кажем ще дадем скучен пример.

package main

import (
	"fmt"
	"time"
)

func main() {
	boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(1 * time.Second)
    }
}

За конкурентноста тайминга е важен. Нека е малко по - непредвидим.

Малко по - малко скучно

Ще сложим случайно време за сън.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

Скучната ни програма ще продължи да работи така до безкрайност. Като много скучна лекция, от която ви е неудобно да си тръгнете.

Да я игнорираме

Скучната програма не заслужава вниманието ни, нека не я чакаме.

С go пускаме функция нормално, но пускащия няма нужда чака приключването й.

Пускаме goroutine.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
}

func boring(msg string) {
	for i := 0; ; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

Когато main приключи програмата спира.

Да я игнорираме малко по - малко

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
    fmt.Println("Listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You are way too boring. I am leaving.")
}

func boring(msg string) {
	for i := 0; ; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

Изпълнявахме main и скучната функция едновременно.

С края на main дойде и края на скучната функция.

Какво е Goroutine

Вдъхновено от

Проблеми, свързани с нишки

От това, че имат една и съща памет, следва, че могат да достъпват едни и същи променливи

int i = 0

thread1 { i++ }
thread2 { i++ }

wait { thread1 } { thread2 }
print i

Тук i накрая може да бъде 1 или 2.

Критични секции

В Go имаме Semaphors и Message passing

sync

Пакет, който ни дава синхронизационни примитиви от ниско ниво:

WaitGroup

Изчаква колекция от горутини да приключат и чак тогава продължава с изпълнението.
Така не правим простотии със time.Sleep, както одеве.

package sync

type WaitGroup struct {}

func (*WaitGroup) Add()
func (*WaitGroup) Done()
func (*WaitGroup) Wait()

Пример

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
    var wg sync.WaitGroup

    boring := func(msg string) {
        defer wg.Done()
        for i := 0; i < 8; i++ {
            fmt.Println(msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }

    wg.Add(1)
    go boring("boring!")
    fmt.Println("Waiting for the boring function to do its work")
    wg.Wait()
}

Пак ли скуката?

По - интересен пример

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        wg.Add(1) // Increment the WaitGroup counter.
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            defer wg.Done()               // Decrement the counter when the goroutine completes.
            content, err := http.Get(url) // Fetch the URL.
            if err == nil {
                fmt.Println(url, content.Status)
            } else {
                fmt.Println(url, "has failed")
            }
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

Mutex

package sync

type Mutex struct {}

func (*Mutex) Lock()
func (*Mutex) Unlock()

Once

Обект от този тип ще изпълни точно една функция.

package main

import (
	"fmt"
	"sync"
)

func main() {
    var once sync.Once
    var wg sync.WaitGroup

    onceBody := func() {
        fmt.Println("Only once")
    }
    anotherBody := func() {
        fmt.Println("Another")
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            once.Do(onceBody)
            once.Do(anotherBody)
        }()
    }
    wg.Wait()
}

99% шанс за паралелизъм

// In your source
runtime.GOMAXPROCS(runtime.NumCPU())
# With environment variable
$ GOMAXPROCS=4 go run main.go

Въпроси?