From 65f58e2a742205c9e8470b360bd439642a5c8211 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 7 Jan 2015 14:43:04 -0800 Subject: [PATCH 1/9] Implement container stats collection in daemon Signed-off-by: Michael Crosby --- api/server/server.go | 14 ++++++ daemon/container.go | 7 +++ daemon/daemon.go | 16 +++++++ daemon/execdriver/driver.go | 9 ++++ daemon/execdriver/lxc/driver.go | 5 +++ daemon/execdriver/native/driver.go | 18 ++++++++ daemon/start.go | 15 +++++++ daemon/stats_collector.go | 71 ++++++++++++++++++++++++++++++ 8 files changed, 155 insertions(+) create mode 100644 daemon/stats_collector.go diff --git a/api/server/server.go b/api/server/server.go index d2715f1bc..d5cdbd00c 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -411,6 +411,19 @@ func getContainersJSON(eng *engine.Engine, version version.Version, w http.Respo return nil } +func getContainersStats(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := parseForm(r); err != nil { + return err + } + if vars == nil { + return fmt.Errorf("Missing parameter") + } + name := vars["name"] + job := eng.Job("container_stats", name) + streamJSON(job, w, true) + return job.Run() +} + func getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := parseForm(r); err != nil { return err @@ -1323,6 +1336,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st "/containers/{name:.*}/json": getContainersByName, "/containers/{name:.*}/top": getContainersTop, "/containers/{name:.*}/logs": getContainersLogs, + "/containers/{name:.*}/stats": getContainersStats, "/containers/{name:.*}/attach/ws": wsContainersAttach, "/exec/{id:.*}/json": getExecByID, }, diff --git a/daemon/container.go b/daemon/container.go index b0eaea03b..4a0232878 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -1414,3 +1414,10 @@ func (container *Container) getNetworkedContainer() (*Container, error) { return nil, fmt.Errorf("network mode not set to container") } } + +func (container *Container) Stats() (*execdriver.ResourceStats, error) { + if !container.IsRunning() { + return nil, fmt.Errorf("cannot collect stats on a non running container") + } + return container.daemon.Stats(container) +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 1b2d13bb6..01d8245de 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -104,6 +104,7 @@ type Daemon struct { driver graphdriver.Driver execDriver execdriver.Driver trustStore *trust.TrustStore + statsCollector *statsCollector } // Install installs daemon capabilities to eng. @@ -116,6 +117,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error { "container_copy": daemon.ContainerCopy, "container_rename": daemon.ContainerRename, "container_inspect": daemon.ContainerInspect, + "container_stats": daemon.ContainerStats, "containers": daemon.Containers, "create": daemon.ContainerCreate, "rm": daemon.ContainerRm, @@ -982,6 +984,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error) execDriver: ed, eng: eng, trustStore: t, + statsCollector: newStatsCollector(1 * time.Second), } if err := daemon.restore(); err != nil { return nil, err @@ -1092,6 +1095,19 @@ func (daemon *Daemon) Kill(c *Container, sig int) error { return daemon.execDriver.Kill(c.command, sig) } +func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { + return daemon.execDriver.Stats(c.ID) +} + +func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) { + c := daemon.Get(name) + if c == nil { + return nil, fmt.Errorf("no such container") + } + ch := daemon.statsCollector.collect(c) + return ch, nil +} + // Nuke kills all containers then removes all content // from the content root, including images, volumes and // container filesystems. diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index fe99e062d..044a2ea0a 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -5,7 +5,9 @@ import ( "io" "os" "os/exec" + "time" + "github.com/docker/libcontainer" "github.com/docker/libcontainer/devices" ) @@ -61,6 +63,7 @@ type Driver interface { GetPidsForContainer(id string) ([]int, error) // Returns a list of pids for the given container. Terminate(c *Command) error // kill it with fire Clean(id string) error // clean all traces of container exec + Stats(id string) (*ResourceStats, error) // Get resource stats for a running container } // Network settings of the container @@ -101,6 +104,12 @@ type Resources struct { Cpuset string `json:"cpuset"` } +type ResourceStats struct { + *libcontainer.ContainerStats + Read time.Time `json:"read"` + ClockTicks int `json:"clock_ticks"` +} + type Mount struct { Source string `json:"source"` Destination string `json:"destination"` diff --git a/daemon/execdriver/lxc/driver.go b/daemon/execdriver/lxc/driver.go index c02ceae97..7dca19d76 100644 --- a/daemon/execdriver/lxc/driver.go +++ b/daemon/execdriver/lxc/driver.go @@ -524,3 +524,8 @@ func (t *TtyConsole) Close() error { func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) { return -1, ErrExec } + +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + return nil, fmt.Errorf("container stats are not support with LXC") + +} diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index f6099bd04..e82d784aa 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "syscall" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" @@ -279,6 +280,23 @@ func (d *driver) Clean(id string) error { return os.RemoveAll(filepath.Join(d.root, id)) } +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + state, err := libcontainer.GetState(filepath.Join(d.root, id)) + if err != nil { + return nil, err + } + now := time.Now() + stats, err := libcontainer.GetStats(nil, state) + if err != nil { + return nil, err + } + return &execdriver.ResourceStats{ + ContainerStats: stats, + ClockTicks: system.GetClockTicks(), + Read: now, + }, nil +} + func getEnv(key string, env []string) string { for _, pair := range env { parts := strings.Split(pair, "=") diff --git a/daemon/start.go b/daemon/start.go index 363461080..89116a84d 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -1,6 +1,7 @@ package daemon import ( + "encoding/json" "fmt" "os" "strings" @@ -77,3 +78,17 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig. return nil } + +func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { + stats, err := daemon.SubscribeToContainerStats(job.Args[0]) + if err != nil { + return job.Error(err) + } + enc := json.NewEncoder(job.Stdout) + for update := range stats { + if err := enc.Encode(update); err != nil { + return job.Error(err) + } + } + return engine.StatusOK +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go new file mode 100644 index 000000000..0d1059d8b --- /dev/null +++ b/daemon/stats_collector.go @@ -0,0 +1,71 @@ +package daemon + +import ( + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/execdriver" +) + +func newStatsCollector(interval time.Duration) *statsCollector { + s := &statsCollector{ + interval: interval, + containers: make(map[string]*statsCollectorData), + } + s.start() + return s +} + +type statsCollectorData struct { + c *Container + lastStats *execdriver.ResourceStats + subs []chan *execdriver.ResourceStats +} + +// statsCollector manages and provides container resource stats +type statsCollector struct { + m sync.Mutex + interval time.Duration + containers map[string]*statsCollectorData +} + +func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats { + s.m.Lock() + ch := make(chan *execdriver.ResourceStats, 1024) + s.containers[c.ID] = &statsCollectorData{ + c: c, + subs: []chan *execdriver.ResourceStats{ + ch, + }, + } + s.m.Unlock() + return ch +} + +func (s *statsCollector) stopCollection(c *Container) { + s.m.Lock() + delete(s.containers, c.ID) + s.m.Unlock() +} + +func (s *statsCollector) start() { + go func() { + for _ = range time.Tick(s.interval) { + log.Debugf("starting collection of container stats") + s.m.Lock() + for id, d := range s.containers { + stats, err := d.c.Stats() + if err != nil { + // TODO: @crosbymichael evict container depending on error + log.Errorf("collecting stats for %s: %v", id, err) + continue + } + for _, sub := range s.containers[id].subs { + sub <- stats + } + } + s.m.Unlock() + } + }() +} From 2640a10bca29c4a4199c906a26f658aac8a68dc2 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 7 Jan 2015 16:22:42 -0800 Subject: [PATCH 2/9] Implement client side display for stats Signed-off-by: Michael Crosby --- api/client/commands.go | 105 +++++++++++++ daemon/execdriver/driver.go | 6 +- daemon/execdriver/execdrivers/execdrivers.go | 11 +- daemon/execdriver/native/driver.go | 12 +- daemon/start.go | 16 +- daemon/stats_collector.go | 44 ++++++ stats/stats.go | 156 +++++++++++++++++++ 7 files changed, 340 insertions(+), 10 deletions(-) create mode 100644 stats/stats.go diff --git a/api/client/commands.go b/api/client/commands.go index 4cfe97cdd..6c6595c24 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -16,6 +16,7 @@ import ( "path" "path/filepath" "runtime" + "sort" "strconv" "strings" "text/tabwriter" @@ -42,6 +43,7 @@ import ( "github.com/docker/docker/pkg/urlutil" "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" + "github.com/docker/docker/stats" "github.com/docker/docker/utils" "github.com/docker/libtrust" ) @@ -2618,3 +2620,106 @@ func (cli *DockerCli) CmdExec(args ...string) error { return nil } + +type containerStats struct { + Name string + CpuPercentage float64 + Memory float64 + MemoryPercentage float64 + NetworkRx int + NetworkTx int +} + +type statSorter struct { + stats []containerStats +} + +func (s *statSorter) Len() int { + return len(s.stats) +} + +func (s *statSorter) Swap(i, j int) { + s.stats[i], s.stats[j] = s.stats[j], s.stats[i] +} + +func (s *statSorter) Less(i, j int) bool { + return s.stats[i].Name < s.stats[j].Name +} + +func (cli *DockerCli) CmdStats(args ...string) error { + cmd := cli.Subcmd("stats", "CONTAINER", "Stream the stats of a container", true) + cmd.Require(flag.Min, 1) + utils.ParseFlags(cmd, args, true) + + cStats := map[string]containerStats{} + for _, name := range cmd.Args() { + go cli.streamStats(name, cStats) + } + w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) + for _ = range time.Tick(1000 * time.Millisecond) { + fmt.Fprint(cli.out, "\033[2J") + fmt.Fprint(cli.out, "\033[H") + fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM\tMEM %\tNET I/O") + sStats := []containerStats{} + for _, s := range cStats { + sStats = append(sStats, s) + } + sorter := &statSorter{sStats} + sort.Sort(sorter) + for _, s := range sStats { + fmt.Fprintf(w, "%s\t%f%%\t%s\t%f%%\t%d/%d\n", + s.Name, + s.CpuPercentage, + units.HumanSize(s.Memory), + s.MemoryPercentage, + s.NetworkRx, s.NetworkTx) + } + w.Flush() + } + return nil +} + +func (cli *DockerCli) streamStats(name string, data map[string]containerStats) error { + stream, _, err := cli.call("GET", "/containers/"+name+"/stats", nil, false) + if err != nil { + return err + } + + var ( + previousCpu uint64 + previousSystem uint64 + start = true + dec = json.NewDecoder(stream) + ) + for { + var v *stats.Stats + if err := dec.Decode(&v); err != nil { + return err + } + memPercent := float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + cpuPercent := 0.0 + + if !start { + cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) + systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) + + if systemDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) + } + } + start = false + d := data[name] + d.Name = name + d.CpuPercentage = cpuPercent + d.Memory = float64(v.MemoryStats.Usage) + d.MemoryPercentage = memPercent + d.NetworkRx = int(v.Network.RxBytes) + d.NetworkTx = int(v.Network.TxBytes) + data[name] = d + + previousCpu = v.CpuStats.CpuUsage.TotalUsage + previousSystem = v.CpuStats.SystemUsage + } + return nil + +} diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index 044a2ea0a..f33f1671d 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -106,8 +106,10 @@ type Resources struct { type ResourceStats struct { *libcontainer.ContainerStats - Read time.Time `json:"read"` - ClockTicks int `json:"clock_ticks"` + Read time.Time `json:"read"` + ClockTicks int `json:"clock_ticks"` + MemoryLimit int64 `json:"memory_limit"` + SystemUsage uint64 `json:"system_usage"` } type Mount struct { diff --git a/daemon/execdriver/execdrivers/execdrivers.go b/daemon/execdriver/execdrivers/execdrivers.go index 2a050b483..b7dd98cf3 100644 --- a/daemon/execdriver/execdrivers/execdrivers.go +++ b/daemon/execdriver/execdrivers/execdrivers.go @@ -2,14 +2,21 @@ package execdrivers import ( "fmt" + "path" + "github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver/lxc" "github.com/docker/docker/daemon/execdriver/native" "github.com/docker/docker/pkg/sysinfo" - "path" + "github.com/docker/docker/pkg/system" ) func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdriver.Driver, error) { + meminfo, err := system.ReadMemInfo() + if err != nil { + return nil, err + } + switch name { case "lxc": // we want to give the lxc driver the full docker root because it needs @@ -17,7 +24,7 @@ func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdrive // to be backwards compatible return lxc.NewDriver(root, initPath, sysInfo.AppArmor) case "native": - return native.NewDriver(path.Join(root, "execdriver", "native"), initPath) + return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal/1000) } return nil, fmt.Errorf("unknown exec driver %s", name) } diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index e82d784aa..83e07f392 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -42,23 +42,23 @@ type driver struct { root string initPath string activeContainers map[string]*activeContainer + machineMemory int64 sync.Mutex } -func NewDriver(root, initPath string) (*driver, error) { +func NewDriver(root, initPath string, machineMemory int64) (*driver, error) { if err := os.MkdirAll(root, 0700); err != nil { return nil, err } - // native driver root is at docker_root/execdriver/native. Put apparmor at docker_root if err := apparmor.InstallDefaultProfile(); err != nil { return nil, err } - return &driver{ root: root, initPath: initPath, activeContainers: make(map[string]*activeContainer), + machineMemory: machineMemory, }, nil } @@ -281,6 +281,7 @@ func (d *driver) Clean(id string) error { } func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + c := d.activeContainers[id] state, err := libcontainer.GetState(filepath.Join(d.root, id)) if err != nil { return nil, err @@ -290,10 +291,15 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { if err != nil { return nil, err } + memoryLimit := c.container.Cgroups.Memory + if memoryLimit == 0 { + memoryLimit = d.machineMemory + } return &execdriver.ResourceStats{ ContainerStats: stats, ClockTicks: system.GetClockTicks(), Read: now, + MemoryLimit: memoryLimit, }, nil } diff --git a/daemon/start.go b/daemon/start.go index 89116a84d..150a87f57 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/engine" "github.com/docker/docker/runconfig" + "github.com/docker/docker/stats" ) func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status { @@ -80,15 +81,24 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig. } func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { - stats, err := daemon.SubscribeToContainerStats(job.Args[0]) + s, err := daemon.SubscribeToContainerStats(job.Args[0]) if err != nil { return job.Error(err) } enc := json.NewEncoder(job.Stdout) - for update := range stats { - if err := enc.Encode(update); err != nil { + for update := range s { + ss := stats.ToStats(update.ContainerStats) + ss.MemoryStats.Limit = uint64(update.MemoryLimit) + ss.Read = update.Read + ss.ClockTicks = update.ClockTicks + ss.CpuStats.SystemUsage = update.SystemUsage + if err := enc.Encode(ss); err != nil { return job.Error(err) } } return engine.StatusOK } + +func mapToAPIStats() { + +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index 0d1059d8b..a21092a85 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -1,6 +1,11 @@ package daemon import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" "sync" "time" @@ -55,12 +60,18 @@ func (s *statsCollector) start() { log.Debugf("starting collection of container stats") s.m.Lock() for id, d := range s.containers { + systemUsage, err := getSystemCpuUsage() + if err != nil { + log.Errorf("collecting system cpu usage for %s: %v", id, err) + continue + } stats, err := d.c.Stats() if err != nil { // TODO: @crosbymichael evict container depending on error log.Errorf("collecting stats for %s: %v", id, err) continue } + stats.SystemUsage = systemUsage for _, sub := range s.containers[id].subs { sub <- stats } @@ -69,3 +80,36 @@ func (s *statsCollector) start() { } }() } + +// returns value in nanoseconds +func getSystemCpuUsage() (uint64, error) { + f, err := os.Open("/proc/stat") + if err != nil { + return 0, err + } + defer f.Close() + + sc := bufio.NewScanner(f) + for sc.Scan() { + parts := strings.Fields(sc.Text()) + switch parts[0] { + case "cpu": + if len(parts) < 8 { + return 0, fmt.Errorf("invalid number of cpu fields") + } + + var total uint64 + for _, i := range parts[1:8] { + v, err := strconv.ParseUint(i, 10, 64) + if err != nil { + return 0.0, fmt.Errorf("Unable to convert value %s to int: %s", i, err) + } + total += v + } + return total * 1000000000, nil + default: + continue + } + } + return 0, fmt.Errorf("invalid stat format") +} diff --git a/stats/stats.go b/stats/stats.go new file mode 100644 index 000000000..e151014f3 --- /dev/null +++ b/stats/stats.go @@ -0,0 +1,156 @@ +package stats + +import ( + "time" + + "github.com/docker/libcontainer" + "github.com/docker/libcontainer/cgroups" +) + +type ThrottlingData struct { + // Number of periods with throttling active + Periods uint64 `json:"periods,omitempty"` + // Number of periods when the container hit its throttling limit. + ThrottledPeriods uint64 `json:"throttled_periods,omitempty"` + // Aggregate time the container was throttled for in nanoseconds. + ThrottledTime uint64 `json:"throttled_time,omitempty"` +} + +// All CPU stats are aggregate since container inception. +type CpuUsage struct { + // Total CPU time consumed. + // Units: nanoseconds. + TotalUsage uint64 `json:"total_usage,omitempty"` + // Total CPU time consumed per core. + // Units: nanoseconds. + PercpuUsage []uint64 `json:"percpu_usage,omitempty"` + // Time spent by tasks of the cgroup in kernel mode. + // Units: nanoseconds. + UsageInKernelmode uint64 `json:"usage_in_kernelmode"` + // Time spent by tasks of the cgroup in user mode. + // Units: nanoseconds. + UsageInUsermode uint64 `json:"usage_in_usermode"` +} + +type CpuStats struct { + CpuUsage CpuUsage `json:"cpu_usage,omitempty"` + SystemUsage uint64 `json:"system_cpu_usage"` + ThrottlingData ThrottlingData `json:"throttling_data,omitempty"` +} + +type MemoryStats struct { + // current res_counter usage for memory + Usage uint64 `json:"usage,omitempty"` + // maximum usage ever recorded. + MaxUsage uint64 `json:"max_usage,omitempty"` + // TODO(vishh): Export these as stronger types. + // all the stats exported via memory.stat. + Stats map[string]uint64 `json:"stats,omitempty"` + // number of times memory usage hits limits. + Failcnt uint64 `json:"failcnt"` + Limit uint64 `json:"limit"` +} + +type BlkioStatEntry struct { + Major uint64 `json:"major,omitempty"` + Minor uint64 `json:"minor,omitempty"` + Op string `json:"op,omitempty"` + Value uint64 `json:"value,omitempty"` +} + +type BlkioStats struct { + // number of bytes tranferred to and from the block device + IoServiceBytesRecursive []BlkioStatEntry `json:"io_service_bytes_recursive,omitempty"` + IoServicedRecursive []BlkioStatEntry `json:"io_serviced_recursive,omitempty"` + IoQueuedRecursive []BlkioStatEntry `json:"io_queue_recursive,omitempty"` + IoServiceTimeRecursive []BlkioStatEntry `json:"io_service_time_recursive,omitempty"` + IoWaitTimeRecursive []BlkioStatEntry `json:"io_wait_time_recursive,omitempty"` + IoMergedRecursive []BlkioStatEntry `json:"io_merged_recursive,omitempty"` + IoTimeRecursive []BlkioStatEntry `json:"io_time_recursive,omitempty"` + SectorsRecursive []BlkioStatEntry `json:"sectors_recursive,omitempty"` +} + +type Network struct { + RxBytes uint64 `json:"rx_bytes"` + RxPackets uint64 `json:"rx_packets"` + RxErrors uint64 `json:"rx_errors"` + RxDropped uint64 `json:"rx_dropped"` + TxBytes uint64 `json:"tx_bytes"` + TxPackets uint64 `json:"tx_packets"` + TxErrors uint64 `json:"tx_errors"` + TxDropped uint64 `json:"tx_dropped"` +} + +type Stats struct { + Read time.Time `json:"read"` + ClockTicks int `json:"clock_ticks"` + Interval int `json:"interval"` // in ms + Network Network `json:"network,omitempty"` + CpuStats CpuStats `json:"cpu_stats,omitempty"` + MemoryStats MemoryStats `json:"memory_stats,omitempty"` + BlkioStats BlkioStats `json:"blkio_stats,omitempty"` +} + +func ToStats(ls *libcontainer.ContainerStats) *Stats { + s := &Stats{} + if ls.NetworkStats != nil { + s.Network = Network{ + RxBytes: ls.NetworkStats.RxBytes, + RxPackets: ls.NetworkStats.RxPackets, + RxErrors: ls.NetworkStats.RxErrors, + RxDropped: ls.NetworkStats.RxDropped, + TxBytes: ls.NetworkStats.TxBytes, + TxPackets: ls.NetworkStats.TxPackets, + TxErrors: ls.NetworkStats.TxErrors, + TxDropped: ls.NetworkStats.TxDropped, + } + } + cs := ls.CgroupStats + if cs != nil { + s.BlkioStats = BlkioStats{ + IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive), + IoServicedRecursive: copyBlkioEntry(cs.BlkioStats.IoServicedRecursive), + IoQueuedRecursive: copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive), + IoServiceTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive), + IoWaitTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive), + IoMergedRecursive: copyBlkioEntry(cs.BlkioStats.IoMergedRecursive), + IoTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoTimeRecursive), + SectorsRecursive: copyBlkioEntry(cs.BlkioStats.SectorsRecursive), + } + cpu := cs.CpuStats + s.CpuStats = CpuStats{ + CpuUsage: CpuUsage{ + TotalUsage: cpu.CpuUsage.TotalUsage, + PercpuUsage: cpu.CpuUsage.PercpuUsage, + UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode, + UsageInUsermode: cpu.CpuUsage.UsageInUsermode, + }, + ThrottlingData: ThrottlingData{ + Periods: cpu.ThrottlingData.Periods, + ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods, + ThrottledTime: cpu.ThrottlingData.ThrottledTime, + }, + } + mem := cs.MemoryStats + s.MemoryStats = MemoryStats{ + Usage: mem.Usage, + MaxUsage: mem.MaxUsage, + Stats: mem.Stats, + Failcnt: mem.Failcnt, + } + } + return s +} + +func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []BlkioStatEntry { + out := make([]BlkioStatEntry, len(entries)) + for i, re := range entries { + out[i] = BlkioStatEntry{ + Major: re.Major, + Minor: re.Minor, + Op: re.Op, + Value: re.Value, + } + } + return out +} From 4f174aa79276c12a1b2b98df2f02d6bee36b7a93 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 7 Jan 2015 18:02:08 -0800 Subject: [PATCH 3/9] Evict stopped containers Signed-off-by: Michael Crosby --- api/client/commands.go | 97 ++++++++++---------- api/client/sort.go | 29 ++++++ {stats => api/stats}/stats.go | 4 +- daemon/container.go | 3 - daemon/daemon.go | 11 ++- daemon/delete.go | 3 + daemon/execdriver/driver.go | 2 +- daemon/execdriver/execdrivers/execdrivers.go | 2 +- daemon/execdriver/lxc/driver.go | 2 +- daemon/execdriver/native/driver.go | 7 +- daemon/start.go | 25 ----- daemon/stats.go | 29 ++++++ daemon/stats_collector.go | 72 ++++++++++++--- docker/flags.go | 1 + 14 files changed, 192 insertions(+), 95 deletions(-) create mode 100644 api/client/sort.go rename {stats => api/stats}/stats.go (96%) create mode 100644 daemon/stats.go diff --git a/api/client/commands.go b/api/client/commands.go index 6c6595c24..34ca32c29 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -16,15 +16,16 @@ import ( "path" "path/filepath" "runtime" - "sort" "strconv" "strings" + "sync" "text/tabwriter" "text/template" "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/api" + "github.com/docker/docker/api/stats" "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" "github.com/docker/docker/graph" @@ -43,7 +44,6 @@ import ( "github.com/docker/docker/pkg/urlutil" "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" - "github.com/docker/docker/stats" "github.com/docker/docker/utils" "github.com/docker/libtrust" ) @@ -2625,25 +2625,10 @@ type containerStats struct { Name string CpuPercentage float64 Memory float64 + MemoryLimit float64 MemoryPercentage float64 - NetworkRx int - NetworkTx int -} - -type statSorter struct { - stats []containerStats -} - -func (s *statSorter) Len() int { - return len(s.stats) -} - -func (s *statSorter) Swap(i, j int) { - s.stats[i], s.stats[j] = s.stats[j], s.stats[i] -} - -func (s *statSorter) Less(i, j int) bool { - return s.stats[i].Name < s.stats[j].Name + NetworkRx float64 + NetworkTx float64 } func (cli *DockerCli) CmdStats(args ...string) error { @@ -2651,40 +2636,49 @@ func (cli *DockerCli) CmdStats(args ...string) error { cmd.Require(flag.Min, 1) utils.ParseFlags(cmd, args, true) + m := &sync.Mutex{} cStats := map[string]containerStats{} for _, name := range cmd.Args() { - go cli.streamStats(name, cStats) + go cli.streamStats(name, cStats, m) } w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) - for _ = range time.Tick(1000 * time.Millisecond) { + for _ = range time.Tick(500 * time.Millisecond) { fmt.Fprint(cli.out, "\033[2J") fmt.Fprint(cli.out, "\033[H") - fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM\tMEM %\tNET I/O") - sStats := []containerStats{} - for _, s := range cStats { - sStats = append(sStats, s) - } - sorter := &statSorter{sStats} - sort.Sort(sorter) - for _, s := range sStats { - fmt.Fprintf(w, "%s\t%f%%\t%s\t%f%%\t%d/%d\n", + fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") + m.Lock() + ss := sortStatsByName(cStats) + m.Unlock() + for _, s := range ss { + fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", s.Name, s.CpuPercentage, - units.HumanSize(s.Memory), + units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), s.MemoryPercentage, - s.NetworkRx, s.NetworkTx) + units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) } w.Flush() } return nil } -func (cli *DockerCli) streamStats(name string, data map[string]containerStats) error { +func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m *sync.Mutex) error { + m.Lock() + data[name] = containerStats{ + Name: name, + } + m.Unlock() + stream, _, err := cli.call("GET", "/containers/"+name+"/stats", nil, false) if err != nil { return err } - + defer func() { + stream.Close() + m.Lock() + delete(data, name) + m.Unlock() + }() var ( previousCpu uint64 previousSystem uint64 @@ -2696,30 +2690,37 @@ func (cli *DockerCli) streamStats(name string, data map[string]containerStats) e if err := dec.Decode(&v); err != nil { return err } - memPercent := float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - cpuPercent := 0.0 - + var ( + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + cpuPercent = 0.0 + ) if !start { - cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) - systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) - - if systemDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) - } + cpuPercent = calcuateCpuPercent(previousCpu, previousSystem, v) } start = false + m.Lock() d := data[name] - d.Name = name d.CpuPercentage = cpuPercent d.Memory = float64(v.MemoryStats.Usage) + d.MemoryLimit = float64(v.MemoryStats.Limit) d.MemoryPercentage = memPercent - d.NetworkRx = int(v.Network.RxBytes) - d.NetworkTx = int(v.Network.TxBytes) + d.NetworkRx = float64(v.Network.RxBytes) + d.NetworkTx = float64(v.Network.TxBytes) data[name] = d + m.Unlock() previousCpu = v.CpuStats.CpuUsage.TotalUsage previousSystem = v.CpuStats.SystemUsage } return nil - +} + +func calcuateCpuPercent(previousCpu, previousSystem uint64, v *stats.Stats) float64 { + cpuPercent := 0.0 + cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) + systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) + if systemDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) + } + return cpuPercent } diff --git a/api/client/sort.go b/api/client/sort.go new file mode 100644 index 000000000..1b8232c3f --- /dev/null +++ b/api/client/sort.go @@ -0,0 +1,29 @@ +package client + +import "sort" + +func sortStatsByName(cStats map[string]containerStats) []containerStats { + sStats := []containerStats{} + for _, s := range cStats { + sStats = append(sStats, s) + } + sorter := &statSorter{sStats} + sort.Sort(sorter) + return sStats +} + +type statSorter struct { + stats []containerStats +} + +func (s *statSorter) Len() int { + return len(s.stats) +} + +func (s *statSorter) Swap(i, j int) { + s.stats[i], s.stats[j] = s.stats[j], s.stats[i] +} + +func (s *statSorter) Less(i, j int) bool { + return s.stats[i].Name < s.stats[j].Name +} diff --git a/stats/stats.go b/api/stats/stats.go similarity index 96% rename from stats/stats.go rename to api/stats/stats.go index e151014f3..b2820f243 100644 --- a/stats/stats.go +++ b/api/stats/stats.go @@ -16,7 +16,7 @@ type ThrottlingData struct { ThrottledTime uint64 `json:"throttled_time,omitempty"` } -// All CPU stats are aggregate since container inception. +// All CPU stats are aggregated since container inception. type CpuUsage struct { // Total CPU time consumed. // Units: nanoseconds. @@ -91,6 +91,8 @@ type Stats struct { BlkioStats BlkioStats `json:"blkio_stats,omitempty"` } +// ToStats converts the libcontainer.ContainerStats to the api specific +// structs. This is done to preserve API compatibility and versioning. func ToStats(ls *libcontainer.ContainerStats) *Stats { s := &Stats{} if ls.NetworkStats != nil { diff --git a/daemon/container.go b/daemon/container.go index 4a0232878..046ec71e8 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -1416,8 +1416,5 @@ func (container *Container) getNetworkedContainer() (*Container, error) { } func (container *Container) Stats() (*execdriver.ResourceStats, error) { - if !container.IsRunning() { - return nil, fmt.Errorf("cannot collect stats on a non running container") - } return container.daemon.Stats(container) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 01d8245de..82d2bc757 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1099,7 +1099,7 @@ func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { return daemon.execDriver.Stats(c.ID) } -func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) { +func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.ResourceStats, error) { c := daemon.Get(name) if c == nil { return nil, fmt.Errorf("no such container") @@ -1108,6 +1108,15 @@ func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver return ch, nil } +func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan *execdriver.ResourceStats) error { + c := daemon.Get(name) + if c == nil { + return fmt.Errorf("no such container") + } + daemon.statsCollector.unsubscribe(c, ch) + return nil +} + // Nuke kills all containers then removes all content // from the content root, including images, volumes and // container filesystems. diff --git a/daemon/delete.go b/daemon/delete.go index 990e4b448..59c765178 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -49,6 +49,9 @@ func (daemon *Daemon) ContainerRm(job *engine.Job) engine.Status { } if container != nil { + // stop collection of stats for the container regardless + // if stats are currently getting collected. + daemon.statsCollector.stopCollection(container) if container.IsRunning() { if forceRemove { if err := container.Kill(); err != nil { diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index f33f1671d..f6e0ac728 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -16,7 +16,7 @@ import ( type Context map[string]string var ( - ErrNotRunning = errors.New("Process could not be started") + ErrNotRunning = errors.New("Container is not running") ErrWaitTimeoutReached = errors.New("Wait timeout reached") ErrDriverAlreadyRegistered = errors.New("A driver already registered this docker init function") ErrDriverNotFound = errors.New("The requested docker init has not been found") diff --git a/daemon/execdriver/execdrivers/execdrivers.go b/daemon/execdriver/execdrivers/execdrivers.go index b7dd98cf3..a665985d1 100644 --- a/daemon/execdriver/execdrivers/execdrivers.go +++ b/daemon/execdriver/execdrivers/execdrivers.go @@ -24,7 +24,7 @@ func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdrive // to be backwards compatible return lxc.NewDriver(root, initPath, sysInfo.AppArmor) case "native": - return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal/1000) + return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal) } return nil, fmt.Errorf("unknown exec driver %s", name) } diff --git a/daemon/execdriver/lxc/driver.go b/daemon/execdriver/lxc/driver.go index 7dca19d76..44942b1fe 100644 --- a/daemon/execdriver/lxc/driver.go +++ b/daemon/execdriver/lxc/driver.go @@ -526,6 +526,6 @@ func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo } func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { - return nil, fmt.Errorf("container stats are not support with LXC") + return nil, fmt.Errorf("container stats are not supported with LXC") } diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index 83e07f392..450d7e5f3 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -284,6 +284,9 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { c := d.activeContainers[id] state, err := libcontainer.GetState(filepath.Join(d.root, id)) if err != nil { + if os.IsNotExist(err) { + return nil, execdriver.ErrNotRunning + } return nil, err } now := time.Now() @@ -292,13 +295,15 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { return nil, err } memoryLimit := c.container.Cgroups.Memory + // if the container does not have any memory limit specified set the + // limit to the machines memory if memoryLimit == 0 { memoryLimit = d.machineMemory } return &execdriver.ResourceStats{ + Read: now, ContainerStats: stats, ClockTicks: system.GetClockTicks(), - Read: now, MemoryLimit: memoryLimit, }, nil } diff --git a/daemon/start.go b/daemon/start.go index 150a87f57..363461080 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -1,14 +1,12 @@ package daemon import ( - "encoding/json" "fmt" "os" "strings" "github.com/docker/docker/engine" "github.com/docker/docker/runconfig" - "github.com/docker/docker/stats" ) func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status { @@ -79,26 +77,3 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig. return nil } - -func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { - s, err := daemon.SubscribeToContainerStats(job.Args[0]) - if err != nil { - return job.Error(err) - } - enc := json.NewEncoder(job.Stdout) - for update := range s { - ss := stats.ToStats(update.ContainerStats) - ss.MemoryStats.Limit = uint64(update.MemoryLimit) - ss.Read = update.Read - ss.ClockTicks = update.ClockTicks - ss.CpuStats.SystemUsage = update.SystemUsage - if err := enc.Encode(ss); err != nil { - return job.Error(err) - } - } - return engine.StatusOK -} - -func mapToAPIStats() { - -} diff --git a/daemon/stats.go b/daemon/stats.go new file mode 100644 index 000000000..5db1cf608 --- /dev/null +++ b/daemon/stats.go @@ -0,0 +1,29 @@ +package daemon + +import ( + "encoding/json" + + "github.com/docker/docker/api/stats" + "github.com/docker/docker/engine" +) + +func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { + s, err := daemon.SubscribeToContainerStats(job.Args[0]) + if err != nil { + return job.Error(err) + } + enc := json.NewEncoder(job.Stdout) + for update := range s { + ss := stats.ToStats(update.ContainerStats) + ss.MemoryStats.Limit = uint64(update.MemoryLimit) + ss.Read = update.Read + ss.ClockTicks = update.ClockTicks + ss.CpuStats.SystemUsage = update.SystemUsage + if err := enc.Encode(ss); err != nil { + // TODO: handle the specific broken pipe + daemon.UnsubscribeToContainerStats(job.Args[0], s) + return job.Error(err) + } + } + return engine.StatusOK +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index a21092a85..8b5662db1 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -13,16 +13,20 @@ import ( "github.com/docker/docker/daemon/execdriver" ) +// newStatsCollector returns a new statsCollector that collections +// network and cgroup stats for a registered container at the specified +// interval. The collector allows non-running containers to be added +// and will start processing stats when they are started. func newStatsCollector(interval time.Duration) *statsCollector { s := &statsCollector{ interval: interval, - containers: make(map[string]*statsCollectorData), + containers: make(map[string]*statsData), } s.start() return s } -type statsCollectorData struct { +type statsData struct { c *Container lastStats *execdriver.ResourceStats subs []chan *execdriver.ResourceStats @@ -32,43 +36,86 @@ type statsCollectorData struct { type statsCollector struct { m sync.Mutex interval time.Duration - containers map[string]*statsCollectorData + containers map[string]*statsData } -func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats { +// collect registers the container with the collector and adds it to +// the event loop for collection on the specified interval returning +// a channel for the subscriber to receive on. +func (s *statsCollector) collect(c *Container) chan *execdriver.ResourceStats { s.m.Lock() + defer s.m.Unlock() ch := make(chan *execdriver.ResourceStats, 1024) - s.containers[c.ID] = &statsCollectorData{ + if _, exists := s.containers[c.ID]; exists { + s.containers[c.ID].subs = append(s.containers[c.ID].subs, ch) + return ch + } + s.containers[c.ID] = &statsData{ c: c, subs: []chan *execdriver.ResourceStats{ ch, }, } - s.m.Unlock() return ch } +// stopCollection closes the channels for all subscribers and removes +// the container from metrics collection. func (s *statsCollector) stopCollection(c *Container) { s.m.Lock() + defer s.m.Unlock() + d := s.containers[c.ID] + if d == nil { + return + } + for _, sub := range d.subs { + close(sub) + } delete(s.containers, c.ID) +} + +// unsubscribe removes a specific subscriber from receiving updates for a +// container's stats. +func (s *statsCollector) unsubscribe(c *Container, ch chan *execdriver.ResourceStats) { + s.m.Lock() + cd := s.containers[c.ID] + for i, sub := range cd.subs { + if ch == sub { + cd.subs = append(cd.subs[:i], cd.subs[i+1:]...) + close(ch) + } + } + // if there are no more subscribers then remove the entire container + // from collection. + if len(cd.subs) == 0 { + delete(s.containers, c.ID) + } s.m.Unlock() } func (s *statsCollector) start() { go func() { for _ = range time.Tick(s.interval) { - log.Debugf("starting collection of container stats") s.m.Lock() for id, d := range s.containers { - systemUsage, err := getSystemCpuUsage() + systemUsage, err := s.getSystemCpuUsage() if err != nil { log.Errorf("collecting system cpu usage for %s: %v", id, err) continue } stats, err := d.c.Stats() if err != nil { - // TODO: @crosbymichael evict container depending on error + if err == execdriver.ErrNotRunning { + continue + } + // if the error is not because the container is currently running then + // evict the container from the collector and close the channel for + // any subscribers currently waiting on changes. log.Errorf("collecting stats for %s: %v", id, err) + for _, sub := range s.containers[id].subs { + close(sub) + } + delete(s.containers, id) continue } stats.SystemUsage = systemUsage @@ -81,14 +128,14 @@ func (s *statsCollector) start() { }() } -// returns value in nanoseconds -func getSystemCpuUsage() (uint64, error) { +// getSystemdCpuUSage returns the host system's cpu usage +// in nanoseconds. +func (s *statsCollector) getSystemCpuUsage() (uint64, error) { f, err := os.Open("/proc/stat") if err != nil { return 0, err } defer f.Close() - sc := bufio.NewScanner(f) for sc.Scan() { parts := strings.Fields(sc.Text()) @@ -97,7 +144,6 @@ func getSystemCpuUsage() (uint64, error) { if len(parts) < 8 { return 0, fmt.Errorf("invalid number of cpu fields") } - var total uint64 for _, i := range parts[1:8] { v, err := strconv.ParseUint(i, 10, 64) diff --git a/docker/flags.go b/docker/flags.go index 719acbe93..8db636e5c 100644 --- a/docker/flags.go +++ b/docker/flags.go @@ -98,6 +98,7 @@ func init() { {"save", "Save an image to a tar archive"}, {"search", "Search for an image on the Docker Hub"}, {"start", "Start a stopped container"}, + {"stats", "Receive container stats"}, {"stop", "Stop a running container"}, {"tag", "Tag an image into a repository"}, {"top", "Lookup the running processes of a container"}, From cc658804c000b8f652750ccf3233a73cc6f03073 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Mon, 19 Jan 2015 13:20:58 -0800 Subject: [PATCH 4/9] Refactor cli for stats Signed-off-by: Alexander Morozov --- api/client/commands.go | 119 +++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 59 deletions(-) diff --git a/api/client/commands.go b/api/client/commands.go index 34ca32c29..27dc62769 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -16,6 +16,7 @@ import ( "path" "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -2629,56 +2630,12 @@ type containerStats struct { MemoryPercentage float64 NetworkRx float64 NetworkTx float64 + mu sync.RWMutex + err error } -func (cli *DockerCli) CmdStats(args ...string) error { - cmd := cli.Subcmd("stats", "CONTAINER", "Stream the stats of a container", true) - cmd.Require(flag.Min, 1) - utils.ParseFlags(cmd, args, true) - - m := &sync.Mutex{} - cStats := map[string]containerStats{} - for _, name := range cmd.Args() { - go cli.streamStats(name, cStats, m) - } - w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) - for _ = range time.Tick(500 * time.Millisecond) { - fmt.Fprint(cli.out, "\033[2J") - fmt.Fprint(cli.out, "\033[H") - fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") - m.Lock() - ss := sortStatsByName(cStats) - m.Unlock() - for _, s := range ss { - fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", - s.Name, - s.CpuPercentage, - units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), - s.MemoryPercentage, - units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) - } - w.Flush() - } - return nil -} - -func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m *sync.Mutex) error { - m.Lock() - data[name] = containerStats{ - Name: name, - } - m.Unlock() - - stream, _, err := cli.call("GET", "/containers/"+name+"/stats", nil, false) - if err != nil { - return err - } - defer func() { - stream.Close() - m.Lock() - delete(data, name) - m.Unlock() - }() +func (s *containerStats) Collect(stream io.ReadCloser) { + defer stream.Close() var ( previousCpu uint64 previousSystem uint64 @@ -2688,7 +2645,10 @@ func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m for { var v *stats.Stats if err := dec.Decode(&v); err != nil { - return err + s.mu.Lock() + s.err = err + s.mu.Unlock() + return } var ( memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 @@ -2698,20 +2658,61 @@ func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m cpuPercent = calcuateCpuPercent(previousCpu, previousSystem, v) } start = false - m.Lock() - d := data[name] - d.CpuPercentage = cpuPercent - d.Memory = float64(v.MemoryStats.Usage) - d.MemoryLimit = float64(v.MemoryStats.Limit) - d.MemoryPercentage = memPercent - d.NetworkRx = float64(v.Network.RxBytes) - d.NetworkTx = float64(v.Network.TxBytes) - data[name] = d - m.Unlock() + s.mu.Lock() + s.CpuPercentage = cpuPercent + s.Memory = float64(v.MemoryStats.Usage) + s.MemoryLimit = float64(v.MemoryStats.Limit) + s.MemoryPercentage = memPercent + s.NetworkRx = float64(v.Network.RxBytes) + s.NetworkTx = float64(v.Network.TxBytes) + s.mu.Unlock() previousCpu = v.CpuStats.CpuUsage.TotalUsage previousSystem = v.CpuStats.SystemUsage } +} + +func (s *containerStats) Display(w io.Writer) { + s.mu.RLock() + defer s.mu.RUnlock() + if s.err != nil { + return + } + fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", + s.Name, + s.CpuPercentage, + units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), + s.MemoryPercentage, + units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) +} + +func (cli *DockerCli) CmdStats(args ...string) error { + cmd := cli.Subcmd("stats", "CONTAINER", "Stream the stats of a container", true) + cmd.Require(flag.Min, 1) + utils.ParseFlags(cmd, args, true) + + names := cmd.Args() + sort.Strings(names) + var cStats []*containerStats + for _, n := range names { + s := &containerStats{Name: n} + cStats = append(cStats, s) + stream, _, err := cli.call("GET", "/containers/"+n+"/stats", nil, false) + if err != nil { + return err + } + go s.Collect(stream) + } + w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) + for _ = range time.Tick(500 * time.Millisecond) { + fmt.Fprint(cli.out, "\033[2J") + fmt.Fprint(cli.out, "\033[H") + fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") + for _, s := range cStats { + s.Display(w) + } + w.Flush() + } return nil } From 2d4fc1de0560c8052b4480035bb364fb28525b39 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 19 Jan 2015 14:07:21 -0800 Subject: [PATCH 5/9] Refactor usage calc for CPU and system usage Signed-off-by: Michael Crosby --- api/client/commands.go | 15 ++++++---- api/client/sort.go | 29 -------------------- api/stats/stats.go | 2 -- daemon/execdriver/driver.go | 1 - daemon/execdriver/execdrivers/execdrivers.go | 8 +----- daemon/execdriver/native/driver.go | 11 ++++++-- daemon/stats.go | 1 - daemon/stats_collector.go | 19 +++++++------ 8 files changed, 30 insertions(+), 56 deletions(-) delete mode 100644 api/client/sort.go diff --git a/api/client/commands.go b/api/client/commands.go index 27dc62769..07554ed60 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -2717,11 +2717,16 @@ func (cli *DockerCli) CmdStats(args ...string) error { } func calcuateCpuPercent(previousCpu, previousSystem uint64, v *stats.Stats) float64 { - cpuPercent := 0.0 - cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) - systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) - if systemDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CpuStats.CpuUsage.TotalUsage - previousCpu) + // calculate the change for the entire system between readings + systemDelta = float64(v.CpuStats.SystemUsage - previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CpuStats.CpuUsage.PercpuUsage)) * 100.0 } return cpuPercent } diff --git a/api/client/sort.go b/api/client/sort.go deleted file mode 100644 index 1b8232c3f..000000000 --- a/api/client/sort.go +++ /dev/null @@ -1,29 +0,0 @@ -package client - -import "sort" - -func sortStatsByName(cStats map[string]containerStats) []containerStats { - sStats := []containerStats{} - for _, s := range cStats { - sStats = append(sStats, s) - } - sorter := &statSorter{sStats} - sort.Sort(sorter) - return sStats -} - -type statSorter struct { - stats []containerStats -} - -func (s *statSorter) Len() int { - return len(s.stats) -} - -func (s *statSorter) Swap(i, j int) { - s.stats[i], s.stats[j] = s.stats[j], s.stats[i] -} - -func (s *statSorter) Less(i, j int) bool { - return s.stats[i].Name < s.stats[j].Name -} diff --git a/api/stats/stats.go b/api/stats/stats.go index b2820f243..43146cf7b 100644 --- a/api/stats/stats.go +++ b/api/stats/stats.go @@ -83,8 +83,6 @@ type Network struct { type Stats struct { Read time.Time `json:"read"` - ClockTicks int `json:"clock_ticks"` - Interval int `json:"interval"` // in ms Network Network `json:"network,omitempty"` CpuStats CpuStats `json:"cpu_stats,omitempty"` MemoryStats MemoryStats `json:"memory_stats,omitempty"` diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index f6e0ac728..2215d03cf 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -107,7 +107,6 @@ type Resources struct { type ResourceStats struct { *libcontainer.ContainerStats Read time.Time `json:"read"` - ClockTicks int `json:"clock_ticks"` MemoryLimit int64 `json:"memory_limit"` SystemUsage uint64 `json:"system_usage"` } diff --git a/daemon/execdriver/execdrivers/execdrivers.go b/daemon/execdriver/execdrivers/execdrivers.go index a665985d1..be3222a8b 100644 --- a/daemon/execdriver/execdrivers/execdrivers.go +++ b/daemon/execdriver/execdrivers/execdrivers.go @@ -8,15 +8,9 @@ import ( "github.com/docker/docker/daemon/execdriver/lxc" "github.com/docker/docker/daemon/execdriver/native" "github.com/docker/docker/pkg/sysinfo" - "github.com/docker/docker/pkg/system" ) func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdriver.Driver, error) { - meminfo, err := system.ReadMemInfo() - if err != nil { - return nil, err - } - switch name { case "lxc": // we want to give the lxc driver the full docker root because it needs @@ -24,7 +18,7 @@ func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdrive // to be backwards compatible return lxc.NewDriver(root, initPath, sysInfo.AppArmor) case "native": - return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal) + return native.NewDriver(path.Join(root, "execdriver", "native"), initPath) } return nil, fmt.Errorf("unknown exec driver %s", name) } diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index 450d7e5f3..533e6d61e 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -17,6 +17,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" + sysinfo "github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/term" "github.com/docker/libcontainer" "github.com/docker/libcontainer/apparmor" @@ -46,7 +47,12 @@ type driver struct { sync.Mutex } -func NewDriver(root, initPath string, machineMemory int64) (*driver, error) { +func NewDriver(root, initPath string) (*driver, error) { + meminfo, err := sysinfo.ReadMemInfo() + if err != nil { + return nil, err + } + if err := os.MkdirAll(root, 0700); err != nil { return nil, err } @@ -58,7 +64,7 @@ func NewDriver(root, initPath string, machineMemory int64) (*driver, error) { root: root, initPath: initPath, activeContainers: make(map[string]*activeContainer), - machineMemory: machineMemory, + machineMemory: meminfo.MemTotal, }, nil } @@ -303,7 +309,6 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { return &execdriver.ResourceStats{ Read: now, ContainerStats: stats, - ClockTicks: system.GetClockTicks(), MemoryLimit: memoryLimit, }, nil } diff --git a/daemon/stats.go b/daemon/stats.go index 5db1cf608..22e7584ac 100644 --- a/daemon/stats.go +++ b/daemon/stats.go @@ -17,7 +17,6 @@ func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { ss := stats.ToStats(update.ContainerStats) ss.MemoryStats.Limit = uint64(update.MemoryLimit) ss.Read = update.Read - ss.ClockTicks = update.ClockTicks ss.CpuStats.SystemUsage = update.SystemUsage if err := enc.Encode(ss); err != nil { // TODO: handle the specific broken pipe diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index 8b5662db1..0fa1b4cae 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -11,6 +11,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" + "github.com/docker/libcontainer/system" ) // newStatsCollector returns a new statsCollector that collections @@ -21,6 +22,7 @@ func newStatsCollector(interval time.Duration) *statsCollector { s := &statsCollector{ interval: interval, containers: make(map[string]*statsData), + clockTicks: uint64(system.GetClockTicks()), } s.start() return s @@ -36,6 +38,7 @@ type statsData struct { type statsCollector struct { m sync.Mutex interval time.Duration + clockTicks uint64 containers map[string]*statsData } @@ -128,8 +131,10 @@ func (s *statsCollector) start() { }() } -// getSystemdCpuUSage returns the host system's cpu usage -// in nanoseconds. +const nanoSeconds = 1e9 + +// getSystemdCpuUSage returns the host system's cpu usage in nanoseconds +// for the system to match the cgroup readings are returned in the same format. func (s *statsCollector) getSystemCpuUsage() (uint64, error) { f, err := os.Open("/proc/stat") if err != nil { @@ -144,17 +149,15 @@ func (s *statsCollector) getSystemCpuUsage() (uint64, error) { if len(parts) < 8 { return 0, fmt.Errorf("invalid number of cpu fields") } - var total uint64 + var sum uint64 for _, i := range parts[1:8] { v, err := strconv.ParseUint(i, 10, 64) if err != nil { - return 0.0, fmt.Errorf("Unable to convert value %s to int: %s", i, err) + return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err) } - total += v + sum += v } - return total * 1000000000, nil - default: - continue + return (sum * nanoSeconds) / s.clockTicks, nil } } return 0, fmt.Errorf("invalid stat format") From 2f46b7601a3f5e11359b79624d73075b69778fbb Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 19 Jan 2015 15:29:42 -0800 Subject: [PATCH 6/9] Add pubsub package to handle robust publisher Signed-off-by: Michael Crosby --- api/stats/stats.go | 75 +--------------------- daemon/daemon.go | 4 +- daemon/stats.go | 78 +++++++++++++++++++++-- daemon/stats_collector.go | 118 ++++++++++++----------------------- pkg/pubsub/publisher.go | 66 ++++++++++++++++++++ pkg/pubsub/publisher_test.go | 63 +++++++++++++++++++ 6 files changed, 248 insertions(+), 156 deletions(-) create mode 100644 pkg/pubsub/publisher.go create mode 100644 pkg/pubsub/publisher_test.go diff --git a/api/stats/stats.go b/api/stats/stats.go index 43146cf7b..d58fdd4f5 100644 --- a/api/stats/stats.go +++ b/api/stats/stats.go @@ -1,11 +1,8 @@ +// This package is used for API stability in the types and response to the +// consumers of the API stats endpoint. package stats -import ( - "time" - - "github.com/docker/libcontainer" - "github.com/docker/libcontainer/cgroups" -) +import "time" type ThrottlingData struct { // Number of periods with throttling active @@ -88,69 +85,3 @@ type Stats struct { MemoryStats MemoryStats `json:"memory_stats,omitempty"` BlkioStats BlkioStats `json:"blkio_stats,omitempty"` } - -// ToStats converts the libcontainer.ContainerStats to the api specific -// structs. This is done to preserve API compatibility and versioning. -func ToStats(ls *libcontainer.ContainerStats) *Stats { - s := &Stats{} - if ls.NetworkStats != nil { - s.Network = Network{ - RxBytes: ls.NetworkStats.RxBytes, - RxPackets: ls.NetworkStats.RxPackets, - RxErrors: ls.NetworkStats.RxErrors, - RxDropped: ls.NetworkStats.RxDropped, - TxBytes: ls.NetworkStats.TxBytes, - TxPackets: ls.NetworkStats.TxPackets, - TxErrors: ls.NetworkStats.TxErrors, - TxDropped: ls.NetworkStats.TxDropped, - } - } - cs := ls.CgroupStats - if cs != nil { - s.BlkioStats = BlkioStats{ - IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive), - IoServicedRecursive: copyBlkioEntry(cs.BlkioStats.IoServicedRecursive), - IoQueuedRecursive: copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive), - IoServiceTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive), - IoWaitTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive), - IoMergedRecursive: copyBlkioEntry(cs.BlkioStats.IoMergedRecursive), - IoTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoTimeRecursive), - SectorsRecursive: copyBlkioEntry(cs.BlkioStats.SectorsRecursive), - } - cpu := cs.CpuStats - s.CpuStats = CpuStats{ - CpuUsage: CpuUsage{ - TotalUsage: cpu.CpuUsage.TotalUsage, - PercpuUsage: cpu.CpuUsage.PercpuUsage, - UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode, - UsageInUsermode: cpu.CpuUsage.UsageInUsermode, - }, - ThrottlingData: ThrottlingData{ - Periods: cpu.ThrottlingData.Periods, - ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods, - ThrottledTime: cpu.ThrottlingData.ThrottledTime, - }, - } - mem := cs.MemoryStats - s.MemoryStats = MemoryStats{ - Usage: mem.Usage, - MaxUsage: mem.MaxUsage, - Stats: mem.Stats, - Failcnt: mem.Failcnt, - } - } - return s -} - -func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []BlkioStatEntry { - out := make([]BlkioStatEntry, len(entries)) - for i, re := range entries { - out[i] = BlkioStatEntry{ - Major: re.Major, - Minor: re.Minor, - Op: re.Op, - Value: re.Value, - } - } - return out -} diff --git a/daemon/daemon.go b/daemon/daemon.go index 82d2bc757..c03e9d7aa 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1099,7 +1099,7 @@ func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { return daemon.execDriver.Stats(c.ID) } -func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.ResourceStats, error) { +func (daemon *Daemon) SubscribeToContainerStats(name string) (chan interface{}, error) { c := daemon.Get(name) if c == nil { return nil, fmt.Errorf("no such container") @@ -1108,7 +1108,7 @@ func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.R return ch, nil } -func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan *execdriver.ResourceStats) error { +func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan interface{}) error { c := daemon.Get(name) if c == nil { return fmt.Errorf("no such container") diff --git a/daemon/stats.go b/daemon/stats.go index 22e7584ac..e047497ec 100644 --- a/daemon/stats.go +++ b/daemon/stats.go @@ -4,25 +4,95 @@ import ( "encoding/json" "github.com/docker/docker/api/stats" + "github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/engine" + "github.com/docker/libcontainer" + "github.com/docker/libcontainer/cgroups" ) func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { - s, err := daemon.SubscribeToContainerStats(job.Args[0]) + updates, err := daemon.SubscribeToContainerStats(job.Args[0]) if err != nil { return job.Error(err) } enc := json.NewEncoder(job.Stdout) - for update := range s { - ss := stats.ToStats(update.ContainerStats) + for v := range updates { + update := v.(*execdriver.ResourceStats) + ss := convertToAPITypes(update.ContainerStats) ss.MemoryStats.Limit = uint64(update.MemoryLimit) ss.Read = update.Read ss.CpuStats.SystemUsage = update.SystemUsage if err := enc.Encode(ss); err != nil { // TODO: handle the specific broken pipe - daemon.UnsubscribeToContainerStats(job.Args[0], s) + daemon.UnsubscribeToContainerStats(job.Args[0], updates) return job.Error(err) } } return engine.StatusOK } + +// convertToAPITypes converts the libcontainer.ContainerStats to the api specific +// structs. This is done to preserve API compatibility and versioning. +func convertToAPITypes(ls *libcontainer.ContainerStats) *stats.Stats { + s := &stats.Stats{} + if ls.NetworkStats != nil { + s.Network = stats.Network{ + RxBytes: ls.NetworkStats.RxBytes, + RxPackets: ls.NetworkStats.RxPackets, + RxErrors: ls.NetworkStats.RxErrors, + RxDropped: ls.NetworkStats.RxDropped, + TxBytes: ls.NetworkStats.TxBytes, + TxPackets: ls.NetworkStats.TxPackets, + TxErrors: ls.NetworkStats.TxErrors, + TxDropped: ls.NetworkStats.TxDropped, + } + } + cs := ls.CgroupStats + if cs != nil { + s.BlkioStats = stats.BlkioStats{ + IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive), + IoServicedRecursive: copyBlkioEntry(cs.BlkioStats.IoServicedRecursive), + IoQueuedRecursive: copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive), + IoServiceTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive), + IoWaitTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive), + IoMergedRecursive: copyBlkioEntry(cs.BlkioStats.IoMergedRecursive), + IoTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoTimeRecursive), + SectorsRecursive: copyBlkioEntry(cs.BlkioStats.SectorsRecursive), + } + cpu := cs.CpuStats + s.CpuStats = stats.CpuStats{ + CpuUsage: stats.CpuUsage{ + TotalUsage: cpu.CpuUsage.TotalUsage, + PercpuUsage: cpu.CpuUsage.PercpuUsage, + UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode, + UsageInUsermode: cpu.CpuUsage.UsageInUsermode, + }, + ThrottlingData: stats.ThrottlingData{ + Periods: cpu.ThrottlingData.Periods, + ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods, + ThrottledTime: cpu.ThrottlingData.ThrottledTime, + }, + } + mem := cs.MemoryStats + s.MemoryStats = stats.MemoryStats{ + Usage: mem.Usage, + MaxUsage: mem.MaxUsage, + Stats: mem.Stats, + Failcnt: mem.Failcnt, + } + } + return s +} + +func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []stats.BlkioStatEntry { + out := make([]stats.BlkioStatEntry, len(entries)) + for i, re := range entries { + out[i] = stats.BlkioStatEntry{ + Major: re.Major, + Minor: re.Minor, + Op: re.Op, + Value: re.Value, + } + } + return out +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index 0fa1b4cae..fe0a1f763 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -11,6 +11,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" + "github.com/docker/docker/pkg/pubsub" "github.com/docker/libcontainer/system" ) @@ -21,114 +22,75 @@ import ( func newStatsCollector(interval time.Duration) *statsCollector { s := &statsCollector{ interval: interval, - containers: make(map[string]*statsData), + publishers: make(map[*Container]*pubsub.Publisher), clockTicks: uint64(system.GetClockTicks()), } - s.start() + go s.run() return s } -type statsData struct { - c *Container - lastStats *execdriver.ResourceStats - subs []chan *execdriver.ResourceStats -} - // statsCollector manages and provides container resource stats type statsCollector struct { m sync.Mutex interval time.Duration clockTicks uint64 - containers map[string]*statsData + publishers map[*Container]*pubsub.Publisher } // collect registers the container with the collector and adds it to // the event loop for collection on the specified interval returning // a channel for the subscriber to receive on. -func (s *statsCollector) collect(c *Container) chan *execdriver.ResourceStats { +func (s *statsCollector) collect(c *Container) chan interface{} { s.m.Lock() defer s.m.Unlock() - ch := make(chan *execdriver.ResourceStats, 1024) - if _, exists := s.containers[c.ID]; exists { - s.containers[c.ID].subs = append(s.containers[c.ID].subs, ch) - return ch + publisher, exists := s.publishers[c] + if !exists { + publisher = pubsub.NewPublisher(100*time.Millisecond, 1024) + s.publishers[c] = publisher } - s.containers[c.ID] = &statsData{ - c: c, - subs: []chan *execdriver.ResourceStats{ - ch, - }, - } - return ch + return publisher.Subscribe() } // stopCollection closes the channels for all subscribers and removes // the container from metrics collection. func (s *statsCollector) stopCollection(c *Container) { s.m.Lock() - defer s.m.Unlock() - d := s.containers[c.ID] - if d == nil { - return - } - for _, sub := range d.subs { - close(sub) - } - delete(s.containers, c.ID) -} - -// unsubscribe removes a specific subscriber from receiving updates for a -// container's stats. -func (s *statsCollector) unsubscribe(c *Container, ch chan *execdriver.ResourceStats) { - s.m.Lock() - cd := s.containers[c.ID] - for i, sub := range cd.subs { - if ch == sub { - cd.subs = append(cd.subs[:i], cd.subs[i+1:]...) - close(ch) - } - } - // if there are no more subscribers then remove the entire container - // from collection. - if len(cd.subs) == 0 { - delete(s.containers, c.ID) + if publisher, exists := s.publishers[c]; exists { + publisher.Close() + delete(s.publishers, c) } s.m.Unlock() } -func (s *statsCollector) start() { - go func() { - for _ = range time.Tick(s.interval) { - s.m.Lock() - for id, d := range s.containers { - systemUsage, err := s.getSystemCpuUsage() - if err != nil { - log.Errorf("collecting system cpu usage for %s: %v", id, err) - continue - } - stats, err := d.c.Stats() - if err != nil { - if err == execdriver.ErrNotRunning { - continue - } - // if the error is not because the container is currently running then - // evict the container from the collector and close the channel for - // any subscribers currently waiting on changes. - log.Errorf("collecting stats for %s: %v", id, err) - for _, sub := range s.containers[id].subs { - close(sub) - } - delete(s.containers, id) - continue - } - stats.SystemUsage = systemUsage - for _, sub := range s.containers[id].subs { - sub <- stats - } +// unsubscribe removes a specific subscriber from receiving updates for a container's stats. +func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) { + s.m.Lock() + publisher := s.publishers[c] + if publisher != nil { + publisher.Evict(ch) + } + s.m.Unlock() +} + +func (s *statsCollector) run() { + for _ = range time.Tick(s.interval) { + for container, publisher := range s.publishers { + systemUsage, err := s.getSystemCpuUsage() + if err != nil { + log.Errorf("collecting system cpu usage for %s: %v", container.ID, err) + continue } - s.m.Unlock() + stats, err := container.Stats() + if err != nil { + if err != execdriver.ErrNotRunning { + log.Errorf("collecting stats for %s: %v", container.ID, err) + } + continue + } + stats.SystemUsage = systemUsage + publisher.Publish(stats) } - }() + } } const nanoSeconds = 1e9 diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go new file mode 100644 index 000000000..98d035687 --- /dev/null +++ b/pkg/pubsub/publisher.go @@ -0,0 +1,66 @@ +package pubsub + +import ( + "sync" + "time" +) + +// NewPublisher creates a new pub/sub publisher to broadcast messages. +// The duration is used as the send timeout as to not block the publisher publishing +// messages to other clients if one client is slow or unresponsive. +// The buffer is used when creating new channels for subscribers. +func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { + return &Publisher{ + buffer: buffer, + timeout: publishTimeout, + subscribers: make(map[subscriber]struct{}), + } +} + +type subscriber chan interface{} + +type Publisher struct { + m sync.RWMutex + buffer int + timeout time.Duration + subscribers map[subscriber]struct{} +} + +// Subscribe adds a new subscriber to the publisher returning the channel. +func (p *Publisher) Subscribe() chan interface{} { + ch := make(chan interface{}, p.buffer) + p.m.Lock() + p.subscribers[ch] = struct{}{} + p.m.Unlock() + return ch +} + +// Evict removes the specified subscriber from receiving any more messages. +func (p *Publisher) Evict(sub chan interface{}) { + p.m.Lock() + delete(p.subscribers, sub) + close(sub) + p.m.Unlock() +} + +// Publish sends the data in v to all subscribers currently registered with the publisher. +func (p *Publisher) Publish(v interface{}) { + p.m.RLock() + for sub := range p.subscribers { + // send under a select as to not block if the receiver is unavailable + select { + case sub <- v: + case <-time.After(p.timeout): + } + } + p.m.RUnlock() +} + +// Close closes the channels to all subscribers registered with the publisher. +func (p *Publisher) Close() { + p.m.Lock() + for sub := range p.subscribers { + close(sub) + } + p.m.Unlock() +} diff --git a/pkg/pubsub/publisher_test.go b/pkg/pubsub/publisher_test.go new file mode 100644 index 000000000..c19059a8a --- /dev/null +++ b/pkg/pubsub/publisher_test.go @@ -0,0 +1,63 @@ +package pubsub + +import ( + "testing" + "time" +) + +func TestSendToOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + c := p.Subscribe() + + p.Publish("hi") + + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestSendToMultipleSubs(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + subs := []chan interface{}{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + + p.Publish("hi") + + for _, c := range subs { + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } + } +} + +func TestEvictOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + s1 := p.Subscribe() + s2 := p.Subscribe() + + p.Evict(s1) + p.Publish("hi") + if _, ok := <-s1; ok { + t.Fatal("expected s1 to not receive the published message") + } + + msg := <-s2 + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestClosePublisher(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + subs := []chan interface{}{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + p.Close() + + for _, c := range subs { + if _, ok := <-c; ok { + t.Fatal("expected all subscriber channels to be closed") + } + } +} From 76141a00779880368b15ef2a5ffd28a80e4637df Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 19 Jan 2015 16:10:26 -0800 Subject: [PATCH 7/9] Add documentation for stats feature Signed-off-by: Michael Crosby --- api/client/commands.go | 2 +- docker/flags.go | 2 +- docs/man/docker-stats.1.md | 32 +++++++ .../reference/api/docker_remote_api.md | 6 ++ .../reference/api/docker_remote_api_v1.17.md | 88 +++++++++++++++++++ docs/sources/reference/commandline/cli.md | 24 ++++- integration-cli/docker_api_containers_test.go | 30 +++++++ 7 files changed, 180 insertions(+), 4 deletions(-) create mode 100644 docs/man/docker-stats.1.md diff --git a/api/client/commands.go b/api/client/commands.go index 07554ed60..40de033d4 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -2687,7 +2687,7 @@ func (s *containerStats) Display(w io.Writer) { } func (cli *DockerCli) CmdStats(args ...string) error { - cmd := cli.Subcmd("stats", "CONTAINER", "Stream the stats of a container", true) + cmd := cli.Subcmd("stats", "CONTAINER", "Display live container stats based on resource usage", true) cmd.Require(flag.Min, 1) utils.ParseFlags(cmd, args, true) diff --git a/docker/flags.go b/docker/flags.go index 8db636e5c..525e8cbfd 100644 --- a/docker/flags.go +++ b/docker/flags.go @@ -98,7 +98,7 @@ func init() { {"save", "Save an image to a tar archive"}, {"search", "Search for an image on the Docker Hub"}, {"start", "Start a stopped container"}, - {"stats", "Receive container stats"}, + {"stats", "Display live container stats based on resource usage"}, {"stop", "Stop a running container"}, {"tag", "Tag an image into a repository"}, {"top", "Lookup the running processes of a container"}, diff --git a/docs/man/docker-stats.1.md b/docs/man/docker-stats.1.md new file mode 100644 index 000000000..991b3d9f1 --- /dev/null +++ b/docs/man/docker-stats.1.md @@ -0,0 +1,32 @@ +% DOCKER(1) Docker User Manuals +% Docker Community +% JUNE 2014 +# NAME +docker-stats - Display live container stats based on resource usage. + +# SYNOPSIS +**docker top** +[**--help**] +[CONTAINERS] + +# DESCRIPTION + +Display live container stats based on resource usage. + +# OPTIONS +**--help** + Print usage statement + +# EXAMPLES + +Run **docker stats** with multiple containers. + + $ sudo docker stats redis1 redis2 + CONTAINER CPU % MEM USAGE/LIMIT MEM % NET I/O + redis1 0.07% 796 KiB/64 MiB 1.21% 788 B/648 B + redis2 0.07% 2.746 MiB/64 MiB 4.29% 1.266 KiB/648 B + +# HISTORY +April 2014, Originally compiled by William Henry (whenry at redhat dot com) +based on docker.com source material and internal work. +June 2014, updated by Sven Dowideit diff --git a/docs/sources/reference/api/docker_remote_api.md b/docs/sources/reference/api/docker_remote_api.md index a36133795..2d52d53b8 100644 --- a/docs/sources/reference/api/docker_remote_api.md +++ b/docs/sources/reference/api/docker_remote_api.md @@ -68,6 +68,12 @@ New endpoint to rename a container `id` to a new name. (`ReadonlyRootfs`) can be passed in the host config to mount the container's root filesystem as read only. +`GET /containers/(id)/stats` + +**New!** +This endpoint returns a stream of container stats based on resource usage. + + ## v1.16 ### Full Documentation diff --git a/docs/sources/reference/api/docker_remote_api_v1.17.md b/docs/sources/reference/api/docker_remote_api_v1.17.md index a44dcbf3a..77e9a8713 100644 --- a/docs/sources/reference/api/docker_remote_api_v1.17.md +++ b/docs/sources/reference/api/docker_remote_api_v1.17.md @@ -514,6 +514,94 @@ Status Codes: - **404** – no such container - **500** – server error +### Get container stats based on resource usage + +`GET /containers/(id)/stats` + +Returns a stream of json objects of the container's stats + +**Example request**: + + GET /containers/redis1/stats HTTP/1.1 + +**Example response**: + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "read" : "2015-01-08T22:57:31.547920715Z", + "network" : { + "rx_dropped" : 0, + "rx_bytes" : 648, + "rx_errors" : 0, + "tx_packets" : 8, + "tx_dropped" : 0, + "rx_packets" : 8, + "tx_errors" : 0, + "tx_bytes" : 648 + }, + "memory_stats" : { + "stats" : { + "total_pgmajfault" : 0, + "cache" : 0, + "mapped_file" : 0, + "total_inactive_file" : 0, + "pgpgout" : 414, + "rss" : 6537216, + "total_mapped_file" : 0, + "writeback" : 0, + "unevictable" : 0, + "pgpgin" : 477, + "total_unevictable" : 0, + "pgmajfault" : 0, + "total_rss" : 6537216, + "total_rss_huge" : 6291456, + "total_writeback" : 0, + "total_inactive_anon" : 0, + "rss_huge" : 6291456, + "hierarchical_memory_limit" : 67108864, + "total_pgfault" : 964, + "total_active_file" : 0, + "active_anon" : 6537216, + "total_active_anon" : 6537216, + "total_pgpgout" : 414, + "total_cache" : 0, + "inactive_anon" : 0, + "active_file" : 0, + "pgfault" : 964, + "inactive_file" : 0, + "total_pgpgin" : 477 + }, + "max_usage" : 6651904, + "usage" : 6537216, + "failcnt" : 0, + "limit" : 67108864 + }, + "blkio_stats" : {}, + "cpu_stats" : { + "cpu_usage" : { + "percpu_usage" : [ + 16970827, + 1839451, + 7107380, + 10571290 + ], + "usage_in_usermode" : 10000000, + "total_usage" : 36488948, + "usage_in_kernelmode" : 20000000 + }, + "system_cpu_usage" : 20091722000000000, + "throttling_data" : {} + } + } + +Status Codes: + +- **200** – no error +- **404** – no such container +- **500** – server error + ### Resize a container TTY `POST /containers/(id)/resize?h=&w=` diff --git a/docs/sources/reference/commandline/cli.md b/docs/sources/reference/commandline/cli.md index 36d0b18cc..b8af02da3 100644 --- a/docs/sources/reference/commandline/cli.md +++ b/docs/sources/reference/commandline/cli.md @@ -2001,8 +2001,28 @@ more details on finding shared images from the command line. -a, --attach=false Attach container's STDOUT and STDERR and forward all signals to the process -i, --interactive=false Attach container's STDIN -When run on a container that has already been started, -takes no action and succeeds unconditionally. +## stats + + Usage: docker stats [CONTAINERS] + + Display live container stats based on resource usage + + --help=false Print usage + +Running `docker stats` on two redis containers + + $ sudo docker stats redis1 redis2 + CONTAINER CPU % MEM USAGE/LIMIT MEM % NET I/O + redis1 0.07% 796 KiB/64 MiB 1.21% 788 B/648 B + redis2 0.07% 2.746 MiB/64 MiB 4.29% 1.266 KiB/648 B + + +When run on running containers live container stats will be streamed +back and displayed to the client. Stopped containers will not +receive any updates to their stats unless the container is started again. + +> **Note:** +> If you want more in depth resource usage for a container use the API endpoint ## stop diff --git a/integration-cli/docker_api_containers_test.go b/integration-cli/docker_api_containers_test.go index 8b0b8fd69..43ae8edde 100644 --- a/integration-cli/docker_api_containers_test.go +++ b/integration-cli/docker_api_containers_test.go @@ -9,7 +9,9 @@ import ( "os/exec" "strings" "testing" + "time" + "github.com/docker/docker/api/stats" "github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar" ) @@ -251,3 +253,31 @@ func TestVolumesFromHasPriority(t *testing.T) { logDone("container REST API - check VolumesFrom has priority") } + +func TestGetContainerStats(t *testing.T) { + defer deleteAllContainers() + name := "statscontainer" + + runCmd := exec.Command(dockerBinary, "run", "-d", "--name", name, "busybox", "top") + out, _, err := runCommandWithOutput(runCmd) + if err != nil { + t.Fatalf("Error on container creation: %v, output: %q", err, out) + } + go func() { + time.Sleep(4 * time.Second) + runCommand(exec.Command(dockerBinary, "kill", name)) + runCommand(exec.Command(dockerBinary, "rm", name)) + }() + + body, err := sockRequest("GET", "/containers/"+name+"/stats", nil) + if err != nil { + t.Fatalf("GET containers/stats sockRequest failed: %v", err) + } + + var s *stats.Stats + if err := json.Unmarshal(body, &s); err != nil { + t.Fatal(err) + } + + logDone("container REST API - check GET containers/stats") +} From 217a2bd1b62788e53fd38810b30672db58a4efc5 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 20 Jan 2015 11:37:50 -0800 Subject: [PATCH 8/9] Remove publisher if no one is listening Signed-off-by: Michael Crosby --- daemon/stats_collector.go | 3 +++ integration-cli/docker_api_containers_test.go | 4 ++-- pkg/pubsub/publisher.go | 8 ++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index fe0a1f763..50ae6baf5 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -68,6 +68,9 @@ func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) { publisher := s.publishers[c] if publisher != nil { publisher.Evict(ch) + if publisher.Len() == 0 { + delete(s.publishers, c) + } } s.m.Unlock() } diff --git a/integration-cli/docker_api_containers_test.go b/integration-cli/docker_api_containers_test.go index 43ae8edde..34cc82aff 100644 --- a/integration-cli/docker_api_containers_test.go +++ b/integration-cli/docker_api_containers_test.go @@ -274,10 +274,10 @@ func TestGetContainerStats(t *testing.T) { t.Fatalf("GET containers/stats sockRequest failed: %v", err) } + dec := json.NewDecoder(bytes.NewBuffer(body)) var s *stats.Stats - if err := json.Unmarshal(body, &s); err != nil { + if err := dec.Decode(&s); err != nil { t.Fatal(err) } - logDone("container REST API - check GET containers/stats") } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 98d035687..f017262ae 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -26,6 +26,14 @@ type Publisher struct { subscribers map[subscriber]struct{} } +// Len returns the number of subscribers for the publisher +func (p *Publisher) Len() int { + p.m.RLock() + i := len(p.subscribers) + p.m.RUnlock() + return i +} + // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { ch := make(chan interface{}, p.buffer) From 4b173199fde99a2b275421ed070b0ec004730e35 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 20 Jan 2015 18:13:47 -0800 Subject: [PATCH 9/9] Exit cli when all containers when no more containers to monitor Signed-off-by: Michael Crosby --- api/client/commands.go | 18 ++++++++++++++---- daemon/stats_collector.go | 2 +- docs/man/docker-stats.1.md | 6 +----- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/api/client/commands.go b/api/client/commands.go index 40de033d4..f9cc10079 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -2672,11 +2672,11 @@ func (s *containerStats) Collect(stream io.ReadCloser) { } } -func (s *containerStats) Display(w io.Writer) { +func (s *containerStats) Display(w io.Writer) error { s.mu.RLock() defer s.mu.RUnlock() if s.err != nil { - return + return s.err } fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", s.Name, @@ -2684,6 +2684,7 @@ func (s *containerStats) Display(w io.Writer) { units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), s.MemoryPercentage, units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) + return nil } func (cli *DockerCli) CmdStats(args ...string) error { @@ -2708,8 +2709,17 @@ func (cli *DockerCli) CmdStats(args ...string) error { fmt.Fprint(cli.out, "\033[2J") fmt.Fprint(cli.out, "\033[H") fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") - for _, s := range cStats { - s.Display(w) + toRemove := []int{} + for i, s := range cStats { + if err := s.Display(w); err != nil { + toRemove = append(toRemove, i) + } + } + for _, i := range toRemove { + cStats = append(cStats[:i], cStats[i+1:]...) + } + if len(cStats) == 0 { + return nil } w.Flush() } diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index 50ae6baf5..779bd1a59 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -98,7 +98,7 @@ func (s *statsCollector) run() { const nanoSeconds = 1e9 -// getSystemdCpuUSage returns the host system's cpu usage in nanoseconds +// getSystemCpuUSage returns the host system's cpu usage in nanoseconds // for the system to match the cgroup readings are returned in the same format. func (s *statsCollector) getSystemCpuUsage() (uint64, error) { f, err := os.Open("/proc/stat") diff --git a/docs/man/docker-stats.1.md b/docs/man/docker-stats.1.md index 991b3d9f1..fdad99719 100644 --- a/docs/man/docker-stats.1.md +++ b/docs/man/docker-stats.1.md @@ -5,7 +5,7 @@ docker-stats - Display live container stats based on resource usage. # SYNOPSIS -**docker top** +**docker stats** [**--help**] [CONTAINERS] @@ -26,7 +26,3 @@ Run **docker stats** with multiple containers. redis1 0.07% 796 KiB/64 MiB 1.21% 788 B/648 B redis2 0.07% 2.746 MiB/64 MiB 4.29% 1.266 KiB/648 B -# HISTORY -April 2014, Originally compiled by William Henry (whenry at redhat dot com) -based on docker.com source material and internal work. -June 2014, updated by Sven Dowideit