diff options
| author | Jeff Carr <[email protected]> | 2024-01-03 22:10:13 -0600 |
|---|---|---|
| committer | Jeff Carr <[email protected]> | 2024-01-03 22:10:13 -0600 |
| commit | 973f6411f4a59dcf21a8151d21578775deb340e0 (patch) | |
| tree | 7cd008f5ff5b3e8960dc26e73e4ac1297f7faa7d /chan.go | |
| parent | cbf944b0b305800cb07b2aab380d882c66bee94a (diff) | |
more learning/debugging codev0.2.1
Signed-off-by: Jeff Carr <[email protected]>
Diffstat (limited to 'chan.go')
| -rw-r--r-- | chan.go | 124 |
1 files changed, 124 insertions, 0 deletions
@@ -0,0 +1,124 @@ +package debugger + +// channel communication to the plugins +// https://github.com/sourcegraph/conc +// https://www.reddit.com/r/golang/comments/11x1oek/hello_gophers_show_me_your_concurrent_code/ + +import ( + // "regexp" + // "go.wit.com/gui/toolkit" + "sync" + "runtime" + "github.com/sourcegraph/conc" + "github.com/sourcegraph/conc/stream" + "github.com/sourcegraph/conc/panics" + + "go.wit.com/log" +) + +// this should never exit +// TODO: clean up all this poorly named code +func makeConc() { + var wg conc.WaitGroup + defer wg.Wait() + + startTheThing(&wg) + log.Warn("panic?") + log.Sleep(2) + log.Warn("panic? after sleep(5)") +} + +func startTheThing(wg *conc.WaitGroup) { + f := func() { + log.Warn("startTheThing() == about to panic now") + panic("test conc.WaitGroup") + } + wg.Go(func() { + ExampleCatcher(f) + }) +} + +func ExampleCatcher(f func()) { + var pc panics.Catcher + i := 0 + pc.Try(func() { i += 1 }) + pc.Try(f) + pc.Try(func() { i += 1 }) + + recovered := pc.Recovered() + + log.Warn("panic.Recovered():", recovered.Value.(string)) + frames := runtime.CallersFrames(recovered.Callers) + for { + frame, more := frames.Next() + log.Warn("\t", frame.Function) + + if !more { + break + } + } +} + +func mapStream( + in chan int, + out chan int, + f func(int) int, +) { + tasks := make(chan func()) + taskResults := make(chan chan int) + + // Worker goroutines + var workerWg sync.WaitGroup + for i := 0; i < 10; i++ { + workerWg.Add(1) + go func() { + defer workerWg.Done() + for task := range tasks { + task() + } + }() + } + + // Ordered reader goroutines + var readerWg sync.WaitGroup + readerWg.Add(1) + go func() { + defer readerWg.Done() + for result := range taskResults { + item := <-result + out <- item + } + }() + + // Feed the workers with tasks + for elem := range in { + resultCh := make(chan int, 1) + taskResults <- resultCh + tasks <- func() { + resultCh <- f(elem) + } + } + + // We've exhausted input. + // Wait for everything to finish + close(tasks) + workerWg.Wait() + close(taskResults) + readerWg.Wait() +} + +func mapStream2( + in chan int, + out chan int, + f func(int) int, +) { + s := stream.New().WithMaxGoroutines(10) + for elem := range in { + elem := elem + s.Go(func() stream.Callback { + res := f(elem) + return func() { out <- res } + }) + } + s.Wait() +} |
