Решение на Concurrent Crawling от Георги Фарашев

Обратно към всички решения

Към профила на Георги Фарашев

Резултати

  • 6 точки от тестове
  • 0 бонус точки
  • 6 точки общо
  • 7 успешни тест(а)
  • 4 неуспешни тест(а)

Код

package main
import (
"bytes"
"errors"
"net/http"
"sync/atomic"
"time"
)
type Helper struct {
Workers int32
MaxWorkers int
Result string
Queue []string
}
func (h Helper) AddWorker() {
atomic.AddInt32(&h.Workers, 1)
}
func (h Helper) ReleaseWorker() {
atomic.AddInt32(&h.Workers, -1)
}
func doWork(callback func(string) bool, h* Helper) {
for len(h.Queue) > 0 && h.Workers < int32(h.MaxWorkers) {
if len(h.Result) == 0 {
h.AddWorker()
elemIndex := len(h.Queue) - 1
go func(url string) {
defer h.ReleaseWorker()
timeout := time.Duration(3 * time.Second)
client := http.Client{
Timeout: timeout,
}
content, err := client.Get(url) // Fetch the URL.
if err != nil || content.StatusCode < 200 || content.StatusCode >= 300 {
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(content.Body)
s := buf.String()
if callback(s) {
h.Result = url
}
}(h.Queue[elemIndex])
h.Queue = h.Queue[:elemIndex]
} else {
return
}
}
}
func RepeateDoWork(callback func(string) bool, h Helper) (string, error) {
for len(h.Queue) > 0 {
doWork(callback, &h)
if len(h.Result) > 0 {
return h.Result, nil
}
}
for h.Workers != 0 && len(h.Result) == 0 {
}
if len(h.Result) > 0 {
return h.Result, nil
}
return "", errors.New("Stopped listening for urls")
}
func SeekAndDestroy(callback func(string) bool,
chunkedUrlsToCheck <-chan []string,
workersCount int) (string, error) {
if workersCount <= 0 {
return "", errors.New("workersCount is not positive")
}
if chunkedUrlsToCheck == nil {
return "", errors.New("chunkedUrlsToCheck is not initialized")
}
h := Helper{0, workersCount, "", make([]string, 0, workersCount)}
for {
select {
case urls, isOk := <-chunkedUrlsToCheck:
if !isOk {
return RepeateDoWork(callback, h)
}
h.Queue = append(h.Queue, urls...)
doWork(callback, &h)
if len(h.Result) > 0 {
return h.Result, nil
}
case <-time.After(15 * time.Second):
return RepeateDoWork(callback, h)
}
}
}

Лог от изпълнението

[/tmp/go-build641262825/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithNegativeWorkersCount -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	0.005s
[/tmp/go-build889770964/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithZeroWorkersCount -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	0.005s
[/tmp/go-build751566989/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithInvalidCallback -test.timeout=120s]
--- FAIL: TestWithInvalidCallback-2 (1.00 seconds)
	solution_test.go:43: Test exceeded allowed time of 1 seconds: parameter errors should be immediately returned (callback is nil)
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-19h0djr	1.005s
[/tmp/go-build122642719/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithNilChannel -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	0.005s
[/tmp/go-build494604839/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithClosedChannelWhenStarting -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	0.005s
[/tmp/go-build147394678/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithClosedChannelMidway -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	5.009s
[/tmp/go-build155725753/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWhetherGlobalTimeoutIsHandled -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	15.006s
[/tmp/go-build899195866/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestWithLoremIpsum -test.timeout=120s]
--- FAIL: TestWithLoremIpsum-2 (4.00 seconds)
	solution_test.go:43: Test exceeded allowed time of 4 seconds: Connecting to localhost should be pretty fast...
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-19h0djr	4.005s
[/tmp/go-build572553593/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestIfTimeoutAndErrorCodesAreHonoured -test.timeout=120s]
--- FAIL: TestIfTimeoutAndErrorCodesAreHonoured-2 (10.00 seconds)
	solution_test.go:43: Test exceeded allowed time of 10 seconds: This should have finished in approx. 8 seconds
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-19h0djr	10.005s
[/tmp/go-build574112616/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestRaceCondition -test.timeout=120s]
--- FAIL: TestRaceCondition-2 (5.00 seconds)
	solution_test.go:43: Test exceeded allowed time of 5 seconds: This should have finished in approx. 3 seconds
FAIL
exit status 1
FAIL	_/tmp/d20150111-16649-19h0djr	5.005s
[/tmp/go-build370006041/_/tmp/d20150111-16649-19h0djr/_test/d20150111-16649-19h0djr.test -test.run=TestCloseChannelBeforeFinish -test.timeout=120s]
PASS
ok  	_/tmp/d20150111-16649-19h0djr	1.005s

История (2 версии и 2 коментара)

Георги обнови решението на 11.12.2014 06:54 (преди над 3 години)

+package main
+
+import (
+ "bytes"
+ "errors"
+ "net/http"
+ "sync/atomic"
+ "time"
+)
+
+type Helper struct {
+ Workers int32
+ MaxWorkers int
+ Result string
+ Queue []string
+}
+
+func (h Helper) AddWorker() {
+ atomic.AddInt32(&h.Workers, 1)
+}
+
+func (h Helper) ReleaseWorker() {
+ atomic.AddInt32(&h.Workers, -1)
+}
+
+func doWork(callback func(string) bool, h* Helper) {
+ for len(h.Queue) > 0 && h.Workers < int32(h.MaxWorkers) {
+ if len(h.Result) == 0 {
+ h.AddWorker()
+ elemIndex := len(h.Queue) - 1
+ go func(url string) {
+ defer h.ReleaseWorker()
+ timeout := time.Duration(3 * time.Second)
+ client := http.Client{
+ Timeout: timeout,
+ }
+ content, err := client.Get(url) // Fetch the URL.
+ if err != nil || content.StatusCode < 200 || content.StatusCode >= 300 {
+ return
+ }
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(content.Body)
+ s := buf.String()
+ if callback(s) {
+ h.Result = url
+ }
+ }(h.Queue[elemIndex])
+ h.Queue = h.Queue[:elemIndex]
+ } else {
+ return
+ }
+ }
+}
+
+func RepeateDoWork(callback func(string) bool, h Helper) (string, error) {
+ for len(h.Queue) > 0 {
+ doWork(callback, &h)
+ if len(h.Result) > 0 {
+ return h.Result, nil
+ }
+ }
+ for h.Workers != 0 && len(h.Result) == 0 {
+ }
+ if len(h.Result) > 0 {
+ return h.Result, nil
+ }
+ return "", errors.New("Stopped listening for urls")
+}
+
+func SeekAndDestroy(callback func(string) bool,
+ chunkedUrlsToCheck <-chan []string,
+ workersCount int) (string, error) {
+
+ if workersCount <= 0 {
+ return "", errors.New("workersCount is not positive")
+ }
+ if chunkedUrlsToCheck == nil {
+ return "", errors.New("chunkedUrlsToCheck is not initialized")
+ }
+
+ h := Helper{0, workersCount, "", make([]string, 0, workersCount)}
+ for {
+ select {
+ case urls, isOk := <-chunkedUrlsToCheck:
+ if !isOk {
+ return RepeateDoWork(callback, h)
+ }
+ h.Queue = append(h.Queue, urls...)
+ doWork(callback, &h)
+ if len(h.Result) > 0 {
+ return h.Result, nil
+ }
+ case <-time.After(15 * time.Second):
+ return RepeateDoWork(callback, h)
+ }
+ }
+}
+
+func main() {
+}

Георги обнови решението на 11.12.2014 16:06 (преди над 3 години)

package main
import (
"bytes"
"errors"
"net/http"
"sync/atomic"
"time"
)
type Helper struct {
Workers int32
MaxWorkers int
Result string
Queue []string
}
func (h Helper) AddWorker() {
atomic.AddInt32(&h.Workers, 1)
}
func (h Helper) ReleaseWorker() {
atomic.AddInt32(&h.Workers, -1)
}
func doWork(callback func(string) bool, h* Helper) {
for len(h.Queue) > 0 && h.Workers < int32(h.MaxWorkers) {
if len(h.Result) == 0 {
h.AddWorker()
elemIndex := len(h.Queue) - 1
go func(url string) {
defer h.ReleaseWorker()
timeout := time.Duration(3 * time.Second)
client := http.Client{
Timeout: timeout,
}
content, err := client.Get(url) // Fetch the URL.
if err != nil || content.StatusCode < 200 || content.StatusCode >= 300 {
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(content.Body)
s := buf.String()
if callback(s) {
h.Result = url
}
}(h.Queue[elemIndex])
h.Queue = h.Queue[:elemIndex]
} else {
return
}
}
}
func RepeateDoWork(callback func(string) bool, h Helper) (string, error) {
for len(h.Queue) > 0 {
doWork(callback, &h)
if len(h.Result) > 0 {
return h.Result, nil
}
}
for h.Workers != 0 && len(h.Result) == 0 {
}
if len(h.Result) > 0 {
return h.Result, nil
}
return "", errors.New("Stopped listening for urls")
}
func SeekAndDestroy(callback func(string) bool,
chunkedUrlsToCheck <-chan []string,
workersCount int) (string, error) {
if workersCount <= 0 {
return "", errors.New("workersCount is not positive")
}
if chunkedUrlsToCheck == nil {
return "", errors.New("chunkedUrlsToCheck is not initialized")
}
h := Helper{0, workersCount, "", make([]string, 0, workersCount)}
for {
select {
case urls, isOk := <-chunkedUrlsToCheck:
if !isOk {
return RepeateDoWork(callback, h)
}
h.Queue = append(h.Queue, urls...)
doWork(callback, &h)
if len(h.Result) > 0 {
return h.Result, nil
}
case <-time.After(15 * time.Second):
return RepeateDoWork(callback, h)
}
}
}
-
-func main() {
-}

Не и това е проблем за който знам, но не се сещам в момента как да го реша, а и нямам време за съжаление. Иначе след последния трансфер по канала се изчаква 15-те секунди и тогава се довършва опашката (което е супер глупаво, но се надявам да не е прекалено голям проблем :) ).