Решение на Concurrent Crawling от Иван Боршуков

Обратно към всички решения

Към профила на Иван Боршуков

Резултати

  • 8 точки от тестове
  • 0 бонус точки
  • 8 точки общо
  • 9 успешни тест(а)
  • 2 неуспешни тест(а)

Код

package main
import (
"errors"
"io/ioutil"
"net/http"
"sync"
"time"
)
type manager struct {
maxWorkers int
inProgress int
mtx sync.Mutex
input chan string
workQueue chan string
output chan string
callback func(string) bool
done chan struct{}
}
func NewManager(maxWorkers int, input chan string, callback func(string) bool) *manager {
m := &manager{}
m.maxWorkers = maxWorkers
m.callback = callback
m.input = input
m.inProgress = 0
m.done = make(chan struct{})
m.output = make(chan string)
m.workQueue = make(chan string, 1)
go m.start()
return m
}
func (m *manager) OutputChan() chan string {
return m.output
}
func (m *manager) start() {
for {
select {
case work := <-m.input:
m.workQueue <- work
if m.inProgress < m.maxWorkers {
m.mtx.Lock()
m.inProgress++
m.mtx.Unlock()
m.startWorker()
}
case <-m.done:
return
}
}
}
func (m *manager) startWorker() {
go func() {
MainLoop:
for {
select {
case url := <-m.workQueue:
client := http.Client{
Timeout: time.Duration(time.Second * 3),
}
response, err := client.Get(url)
if err != nil {
continue MainLoop
} else {
defer response.Body.Close()
content, err := ioutil.ReadAll(response.Body)
if err != nil {
continue MainLoop
}
if m.callback(string(content)) {
m.output <- url
m.done <- struct{}{}
}
}
case <-time.After(time.Millisecond * 200):
m.mtx.Lock()
m.inProgress--
m.mtx.Unlock()
return
}
}
}()
}
func dispatch(from <-chan []string, to chan<- string, stopped chan struct{}) {
for input := range from {
go func(urls []string) {
for _, url := range urls {
to <- url
}
}(input)
}
stopped <- struct{}{}
}
func SeekAndDestroy(callback func(string) bool, chunkedUrlsToCheck <-chan []string, workersCount int) (string, error) {
if workersCount < 1 {
return "", errors.New("Expected positive integer")
}
if chunkedUrlsToCheck == nil {
return "", errors.New("Expected initialized channel")
}
input := make(chan string)
inputClosed := make(chan struct{})
go dispatch(chunkedUrlsToCheck, input, inputClosed)
m := NewManager(workersCount, input, callback)
result := m.OutputChan()
select {
case result := <-result:
return result, nil
case <-inputClosed:
return "", errors.New("Input closed")
case <-time.After(time.Second * 15):
return "", errors.New("An error occurred - probably a timeout :)")
}
}

Лог от изпълнението

[/tmp/go-build834001405/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithNegativeWorkersCount -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	0.005s
[/tmp/go-build295754881/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithZeroWorkersCount -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	0.005s
[/tmp/go-build499646221/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithInvalidCallback -test.timeout=120s]
--- FAIL: TestWithInvalidCallback-2 (1.00 seconds)
	solution_test.go:43: Test exceeded allowed time of 1 seconds: parameter errors should be immediately returned (callback is nil)
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-1o088nm	1.005s
[/tmp/go-build611679542/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithNilChannel -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	0.005s
[/tmp/go-build198778591/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithClosedChannelWhenStarting -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	0.005s
[/tmp/go-build374086668/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithClosedChannelMidway -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	5.005s
[/tmp/go-build431656211/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWhetherGlobalTimeoutIsHandled -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	15.008s
[/tmp/go-build032899658/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestWithLoremIpsum -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	2.007s
[/tmp/go-build438459554/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestIfTimeoutAndErrorCodesAreHonoured -test.timeout=120s]
--- FAIL: TestIfTimeoutAndErrorCodesAreHonoured-2 (0.00 seconds)
	solution_test.go:267: Function returned 'http://127.0.0.2:32980/page_with_error_code' when it should have returned 'http://127.0.0.2:32980/correct_page'
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-1o088nm	0.006s
[/tmp/go-build771977395/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestRaceCondition -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	1.008s
[/tmp/go-build730480348/_/tmp/d20150111-16649-1o088nm/_test/d20150111-16649-1o088nm.test -test.run=TestCloseChannelBeforeFinish -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-1o088nm	1.006s

История (1 версия и 0 коментара)

Иван обнови решението на 10.12.2014 17:12 (преди над 3 години)

+package main
+
+import (
+ "errors"
+ "io/ioutil"
+ "net/http"
+ "sync"
+ "time"
+)
+
+type manager struct {
+ maxWorkers int
+ inProgress int
+ mtx sync.Mutex
+ input chan string
+ workQueue chan string
+ output chan string
+ callback func(string) bool
+ done chan struct{}
+}
+
+func NewManager(maxWorkers int, input chan string, callback func(string) bool) *manager {
+ m := &manager{}
+
+ m.maxWorkers = maxWorkers
+ m.callback = callback
+ m.input = input
+
+ m.inProgress = 0
+ m.done = make(chan struct{})
+ m.output = make(chan string)
+ m.workQueue = make(chan string, 1)
+ go m.start()
+ return m
+}
+
+func (m *manager) OutputChan() chan string {
+ return m.output
+}
+
+func (m *manager) start() {
+ for {
+ select {
+ case work := <-m.input:
+ m.workQueue <- work
+ if m.inProgress < m.maxWorkers {
+ m.mtx.Lock()
+ m.inProgress++
+ m.mtx.Unlock()
+ m.startWorker()
+ }
+ case <-m.done:
+ return
+ }
+ }
+}
+
+func (m *manager) startWorker() {
+ go func() {
+ MainLoop:
+ for {
+
+ select {
+ case url := <-m.workQueue:
+ client := http.Client{
+ Timeout: time.Duration(time.Second * 3),
+ }
+
+ response, err := client.Get(url)
+ if err != nil {
+ continue MainLoop
+ } else {
+ defer response.Body.Close()
+ content, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ continue MainLoop
+ }
+
+ if m.callback(string(content)) {
+ m.output <- url
+ m.done <- struct{}{}
+ }
+ }
+ case <-time.After(time.Millisecond * 200):
+ m.mtx.Lock()
+ m.inProgress--
+ m.mtx.Unlock()
+ return
+ }
+
+ }
+ }()
+}
+
+func dispatch(from <-chan []string, to chan<- string, stopped chan struct{}) {
+ for input := range from {
+ go func(urls []string) {
+ for _, url := range urls {
+ to <- url
+ }
+ }(input)
+ }
+ stopped <- struct{}{}
+}
+
+func SeekAndDestroy(callback func(string) bool, chunkedUrlsToCheck <-chan []string, workersCount int) (string, error) {
+ if workersCount < 1 {
+ return "", errors.New("Expected positive integer")
+ }
+ if chunkedUrlsToCheck == nil {
+ return "", errors.New("Expected initialized channel")
+ }
+ input := make(chan string)
+ inputClosed := make(chan struct{})
+ go dispatch(chunkedUrlsToCheck, input, inputClosed)
+ m := NewManager(workersCount, input, callback)
+
+ result := m.OutputChan()
+ select {
+ case result := <-result:
+ return result, nil
+ case <-inputClosed:
+ return "", errors.New("Input closed")
+ case <-time.After(time.Second * 15):
+ return "", errors.New("An error occurred - probably a timeout :)")
+ }
+}