package forgepb import ( "fmt" "sync" "time" "github.com/destel/rill" "go.wit.com/lib/cobol" "go.wit.com/lib/config" "go.wit.com/lib/env" "go.wit.com/lib/protobuf/gitpb" "go.wit.com/log" ) // rill is awesome. long live rill // // x is the size of the queued up pool (shouldn't matter here for this I think) // y is how many simultanous functions will run // todo: tune and compute x,y by # of CPUs and disk io // todo: store x,y in forge config ? (or compute them. notsure) func (f *Forge) RillReload() int { var all []*gitpb.Repo for repo := range f.Repos.IterAll() { if !repo.IsValidDir() { log.Printf("%s %-50s\n", "got an invalid repo in forgepb.RillReload()", repo.GetFullPath()) f.Repos.Delete(repo) config.SetChanged("repos", true) continue } all = append(all, repo) } // Convert a slice of user IDs into a channel ids := rill.FromSlice(all, nil) var counter int // Read users from the API. // Concurrency = 20 dirs := rill.Map(ids, cobol.Int(env.Get("RillX")), func(repo *gitpb.Repo) (*gitpb.Repo, error) { return repo, nil }) rill.ForEach(dirs, cobol.Int(env.Get("RillY")), func(repo *gitpb.Repo) error { if err := repo.HasChanged(); err != nil { counter += 1 return err } return nil }) return counter } // x is the size of the queued up pool (shouldn't matter here for this I think) // y is how many simultanous functions will run // todo: tune and compute x,y by # of CPUs and disk io // todo: store x,y in forge config ? (or compute them. notsure) func (f *Forge) RillFuncError(rillf func(*gitpb.Repo) error) map[string]*RillStats { return f.RillRepos(rillf) } func (f *Forge) ConfigRill(rillX int, rillY int) { env.Set("RillX", fmt.Sprintf("%d", rillX)) env.Set("RillY", fmt.Sprintf("%d", rillY)) // log.Infof("Setting rill values to %d,%d\n", f.Config.RillX, f.Config.RillY) } type RillStats struct { Name string Err error Start time.Time End time.Time } var rillMu sync.Mutex func rillSetError(stats map[string]*RillStats, fullpath string, err error) { rillMu.Lock() defer rillMu.Unlock() if s, ok := stats[fullpath]; ok { s.Err = err return } log.Info("WHAT THE FUCK STATS ERROR", fullpath) } func rillSetStartTime(stats map[string]*RillStats, fullpath string) { rillMu.Lock() defer rillMu.Unlock() if s, ok := stats[fullpath]; ok { s.Start = time.Now() return } var s *RillStats s = new(RillStats) s.Start = time.Now() stats[fullpath] = s } func rillSetEndTime(stats map[string]*RillStats, fullpath string) { rillMu.Lock() defer rillMu.Unlock() if s, ok := stats[fullpath]; ok { s.End = time.Now() return } log.Info("WHAT THE FUCK STATS END TIME", fullpath) } // x is the size of the queued up pool (shouldn't matter here for this I think) // y is how many simultanous functions will run // todo: tune and compute x,y by # of CPUs and disk io // todo: store x,y in forge config ? (or compute them. notsure) func (f *Forge) RillRepos(rillf func(*gitpb.Repo) error) map[string]*RillStats { var all []*gitpb.Repo var stats map[string]*RillStats stats = make(map[string]*RillStats) for repo := range f.Repos.IterAll() { if !repo.IsValidDir() { log.Printf("got an invalid repo in forgepb.RillRepos() %-50s\n", repo.GetFullPath()) f.Repos.Delete(repo) config.SetChanged("repos", true) continue } all = append(all, repo) } // log.Info("Rill Repos len =", len(all)) // Convert a slice of user IDs into a channel ids := rill.FromSlice(all, nil) var counter int var watch int = 10 // Read users from the API. // Concurrency = 20 dirs := rill.Map(ids, cobol.Int(env.Get("RillX")), func(id *gitpb.Repo) (*gitpb.Repo, error) { return id, nil }) rill.ForEach(dirs, cobol.Int(env.Get("RillY")), func(repo *gitpb.Repo) error { // todo: make this a goroutine to show stats to the user rillMu.Lock() counter += 1 if counter > watch { // log.Info("Processed", watch, "repos") // this doesn't work watch += 50 } rillMu.Unlock() rillSetStartTime(stats, repo.GetFullPath()) if err := rillf(repo); err != nil { rillSetError(stats, repo.GetFullPath(), err) } rillSetEndTime(stats, repo.GetFullPath()) return nil }) return stats } // returns the set of failed repos func (f *Forge) RunOnReposNew(repos *gitpb.Repos, rillf func(*gitpb.Repo) error) *gitpb.Repos { stats := f.RunOnRepos(repos, rillf) failed := gitpb.NewRepos() for s, stat := range stats { if stat.Err == nil { continue } found := f.Repos.FindByFullPath(s) if found == nil { log.Info("found", found, "'"+s+"'") panic("wrong namespace logic. couldn't find repo from stats") } newr := failed.Clone(found) newr.State = fmt.Sprintf("%v", stat.Err) } return failed } // dumb & slow, but needed. This is old school dumb. Before we had the awesome machines we have today. // Use this if you think SMP processing might be the problem. // if this works, and GO Rill doesn't work, then you, yes you, are the problem. Your code sucks. // fix it, happy hacking func (f *Forge) RunOnReposNewDumb(repos *gitpb.Repos, rillf func(*gitpb.Repo) error) *gitpb.Repos { failed := gitpb.NewRepos() counter := 1 for repo := range repos.IterAll() { err := rillf(repo) if err == nil { continue } newr := failed.Clone(repo) newr.State = fmt.Sprintf("%v) (%d)", err, counter) counter += 1 } return failed } // x is the size of the queued up pool (shouldn't matter here for this I think) // y is how many simultanous functions will run // todo: tune and compute x,y by # of CPUs and disk io // todo: store x,y in forge config ? (or compute them. notsure) func (f *Forge) RunOnRepos(repos *gitpb.Repos, rillf func(*gitpb.Repo) error) map[string]*RillStats { var all []*gitpb.Repo var stats map[string]*RillStats stats = make(map[string]*RillStats) for repo := range repos.IterAll() { if !repo.IsValidDir() { log.Printf("got an invalid repo in forgepb.RillRepos() %-50s\n", repo.GetFullPath()) continue } all = append(all, repo) } // log.Info("Rill Repos len =", len(all)) // Convert a slice of user IDs into a channel ids := rill.FromSlice(all, nil) var counter int var watch int = 10 // Read users from the API. // Concurrency = 20 dirs := rill.Map(ids, cobol.Int(env.Get("RillX")), func(id *gitpb.Repo) (*gitpb.Repo, error) { return id, nil }) rill.ForEach(dirs, cobol.Int(env.Get("RillY")), func(repo *gitpb.Repo) error { // todo: make this a goroutine to show stats to the user rillMu.Lock() counter += 1 if counter > watch { // log.Info("Processed", watch, "repos") // this doesn't work watch += 50 } rillMu.Unlock() rillSetStartTime(stats, repo.GetFullPath()) if err := rillf(repo); err != nil { rillSetError(stats, repo.GetFullPath(), err) } rillSetEndTime(stats, repo.GetFullPath()) return nil }) return stats }