DEV Community

Mohamed ABDELLANI
Mohamed ABDELLANI

Posted on

Exploring Prometheus Code - part 1 (personal notes)

I recently started learning the Go language and part of the process was to explore the code of popular open-source projects. Prometheus is one of the projects that I selected. It's a popular project with 848 contributors and more than 51k starts.

The objectives of this article:

  • understand how the code is organized in the filesystem,

  • explore some of the popular packages used on this project,

  • understand how the different modules are loaded

  • understand how the communication between the modules is implemented.

I started with the /cmd folder, as it stores the main function that will be called when we run Prometheus from the CLI.

Before reading that function, it'll be useful to learn about the following packages:

  1. kingpin: to parse the command line arguments

  2. run: A universal mechanism to manage goroutine lifecycles (from the repo description). check reference

  3. context: to share state between function/goroutines. check reference

The main function does the following things:

  1. parsing arguments and load configuration and populate the data structure flagConfig\ (using kingpin package)
    a.Flag("config.file", "Prometheus configuration file path.").
        Default("prometheus.yml").StringVar(&cfg.configFile)

    a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
        Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)
Enter fullscreen mode Exit fullscreen mode
  1. checking the consistency between the configurations
    if agentMode && len(serverOnlyFlags) > 0 {
        fmt.Fprintf(os.Stderr, "The following flag(s) can not be used in agent mode: %q", serverOnlyFlags)
        os.Exit(3)
    }
Enter fullscreen mode Exit fullscreen mode
  1. Launching the different modules (using run package)
var g run.Group
...
    {
        // Scrape manager.
        g.Add(
            func() error {
                <-reloadReady.C
                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                level.Info(logger).Log("msg", "Scrape manager stopped")
                return err
            },
            func(err error) {
                level.Info(logger).Log("msg", "Stopping scrape manager...")
                scrapeManager.Stop()
            },
        )
    }
...
Enter fullscreen mode Exit fullscreen mode

the project is composed of several modules, each module is stored in a folder at the root level of the project. (internal architecture)

I started with the two modules: discovery and scrape.

discovery implements the service discovery (ScrapeDiscoveryManager) that collects the target groups and sends them to scrape

scrape implement the scrape manager (ScrapeManager) that controls the scraper pools (check internal architecture for details).

Each of those two managers will run in a different go-routine.

ScrapeDiscoveryManager will send the target groups to the ScrapeManager through a sync channel (SyncCh). This channel will be passed on the creation time in the main function.

        g.Add(
            func() error {
                // When the scrape manager receives a new targets list
                // it needs to read a valid config for each job.
                // It depends on the config being in sync with the discovery manager so
                // we wait until the config is fully loaded.
                <-reloadReady.C

                err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
                return err
            },
Enter fullscreen mode Exit fullscreen mode

the ScrapeManager#Run runs a loop that will watch for updates from the ScrapeDiscoveryManager and update the scraper pools accordingly.

// tsets is returned channel from discoveryManagerScrape.SyncCh()
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
    go m.reloader()
    for {
        select {
        case ts := <-tsets: // New Updates!!
            m.updateTsets(ts)

            select {
            case m.triggerReload <- struct{}{}: // trigger reload
            default:
            }

        case <-m.graceShut:
            return nil
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

the reloader will reload that will create/update the scraper pools

func (m *Manager) reloader() {
...
    ticker := time.NewTicker(time.Duration(reloadIntervalDuration))

    defer ticker.Stop()

    for {
        select {
        case <-m.graceShut:
            return
        case <-ticker.C:
            select {
            case <-m.triggerReload:
                m.reload() // Update the scraper pools
            case <-m.graceShut:
                return
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Now, let's return to ScrapeDiscoveryManager , to understand how it works we need to explore the data structure (some details are omitted).

//discovery/manager.go
type Manager struct {
    targets //  contains the list of all the targets
    providers []*Provider 
    syncCh // the communication channel to send targets to ScrapeManager
}

type Provider struct {
    name   string
    d      Discoverer
//....
}
type Discoverer interface {
    // Run hands a channel to the discovery provider (Consul, DNS, etc.) through which
    // it can send updated target groups.
    Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
Enter fullscreen mode Exit fullscreen mode

The manager has a list of providers, each one includes a discoverer that'll periodically create lists of targets and send them to the discovery manager.

We can see that on the ScrapeDiscoveryManager\'s startProvider an update channel is created and passed to the discoverer and the updater .

func (m *Manager) startProvider(ctx context.Context, p *Provider) {
    level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
    ctx, cancel := context.WithCancel(ctx)
    updates := make(chan []*targetgroup.Group)

    p.cancel = cancel

    go p.d.Run(ctx, updates)
    go m.updater(ctx, p, updates)
}
Enter fullscreen mode Exit fullscreen mode

Each discoverer implements the function refresh function that will collect data and send it. (Some discoverers interact with the provider through refresh/refresh ).

The discoverer's logic can be summarized as follows:

 func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
    ticker := time.NewTicker(d.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            tgs, err := d.refresh(ctx)
            if err != nil {
                if !errors.Is(ctx.Err(), context.Canceled) {
                    level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
                }
                continue
            }

            select {
            case ch <- tgs:
            case <-ctx.Done():
                return
            }
        case <-ctx.Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Now on ScrapeDiscoveryManager, when updater receives new updates from the discoverer, it'll

  1. update the statistics

  2. update the list of targets

  3. trigger send: unblock the sender (check ScrapeDiscoveryManager'Run below) loop to send the update to the ScrapeManager

func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
    // Ensure targets from this provider are cleaned up.
    defer m.cleaner(p)
    for {
        select {
        case <-ctx.Done():
            return
        case tgs, ok := <-updates:
            receivedUpdates.WithLabelValues(m.name).Inc()
            //...
            p.mu.RLock()
            for s := range p.subs {
                m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
            }
            p.mu.RUnlock()

            select {
            case m.triggerSend <- struct{}{}:
            default:
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The ScrapeDiscoveryManager 's Run method will run the sender function which is responsible for sending updates to the ScrapeManager

func (m *Manager) Run() error {
    go m.sender() // send updates to ScrapeManager
    <-m.ctx.Done()
    m.cancelDiscoverers()
    return m.ctx.Err()
}

func (m *Manager) sender() {
    ticker := time.NewTicker(m.updatert)
    defer ticker.Stop()

    for {
        select {
        case <-m.ctx.Done():
            return
        case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
            select {
            case <-m.triggerSend:
                sentUpdates.WithLabelValues(m.name).Inc()
                select {
                case m.syncCh <- m.allGroups():
                  //...
            default:
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The diagram below summarizes the communication flow between the discoverer to the scrape manager

Communication between ScrapeDiscoveryManager and ScrapeManager

Top comments (0)