Skip to content
This repository was archived by the owner on Aug 26, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Exporter struct {
asyncMetricsURI string
eventsURI string
partsURI string
replicasURI string
client *http.Client

scrapeFailures prometheus.Counter
Expand Down Expand Up @@ -53,11 +54,16 @@ func NewExporter(uri url.URL, insecure bool, user, password string) *Exporter {
q.Set("query", "select database, table, sum(bytes) as bytes, count() as parts, sum(rows) as rows from system.parts where active = 1 group by database, table")
partsURI.RawQuery = q.Encode()

replicasURI := uri
q.Set("query", "SELECT database,table,is_leader,is_readonly,is_session_expired,future_parts,parts_to_check,columns_version,queue_size,inserts_in_queue,merges_in_queue,log_max_index,log_pointer,total_replicas,active_replicas FROM system.replicas")
replicasURI.RawQuery = q.Encode()

return &Exporter{
metricsURI: metricsURI.String(),
asyncMetricsURI: asyncMetricsURI.String(),
eventsURI: eventsURI.String(),
partsURI: partsURI.String(),
replicasURI: replicasURI.String(),
scrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "exporter_scrape_failures_total",
Expand Down Expand Up @@ -172,6 +178,117 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) error {
newRowsMetric.Collect(ch)
}

replicas, err := e.parseReplicasResponse(e.replicasURI)
if err != nil {
return fmt.Errorf("Error scraping clickhouse url %v: %v", e.replicasURI, err)
}

for _, replica := range replicas {
newLeaderMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_leader",
Help: "Whether the replica is the leader",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newLeaderMetric.Set(float64(replica.isLeader))
newLeaderMetric.Collect(ch)

newReadonlyMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_readyonly",
Help: "Whether the replica is in read-only mode",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newReadonlyMetric.Set(float64(replica.isReadonly))
newReadonlyMetric.Collect(ch)

newSessionExpiredMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_session_expired",
Help: "Whether the session with ZK has expired",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newSessionExpiredMetric.Set(float64(replica.isSessionExpired))
newSessionExpiredMetric.Collect(ch)

newFuturePartsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_future_parts",
Help: "The number of data parts that will appear as the result of INSERTs or merges that have not been done yet",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newFuturePartsMetric.Set(float64(replica.futureParts))
newFuturePartsMetric.Collect(ch)

newPartsToCheckMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_parts_to_check",
Help: "The number of data parts in the queue for verification",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newPartsToCheckMetric.Set(float64(replica.partsToCheck))
newPartsToCheckMetric.Collect(ch)

newColumnsVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_columns_version",
Help: "Version number of the table structure",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newColumnsVersionMetric.Set(float64(replica.columnsVersion))
newColumnsVersionMetric.Collect(ch)

newQueueSizeMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_queue_size",
Help: "Size of the queue for operations waiting to be performed",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newQueueSizeMetric.Set(float64(replica.queueSize))
newQueueSizeMetric.Collect(ch)

newInsertsInQueueMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_inserts_in_queue",
Help: "Number of inserts of blocks of data that need to be made",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newInsertsInQueueMetric.Set(float64(replica.insertsInQueue))
newInsertsInQueueMetric.Collect(ch)

newMergesInQueueMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_merges_in_queue",
Help: "The number of merges waiting to be made",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newMergesInQueueMetric.Set(float64(replica.mergesInQueue))
newMergesInQueueMetric.Collect(ch)

newLogMaxIndexMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_log_max_index",
Help: "Maximum entry number in the log of general activity",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newLogMaxIndexMetric.Set(float64(replica.logMaxIndex))
newLogMaxIndexMetric.Collect(ch)

newLogPointerMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_log_pointer",
Help: "Maximum entry number from the log of general activity that the replica copied to its queue for execution, plus one",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newLogPointerMetric.Set(float64(replica.logPointer))
newLogPointerMetric.Collect(ch)

newTotalReplicasMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_total_replicas",
Help: "The total number of known replicas of this table",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newTotalReplicasMetric.Set(float64(replica.totalReplicas))
newTotalReplicasMetric.Collect(ch)

newActiveReplicasMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "table_replicas_active_replicas",
Help: "The number of replicas of this table that have a session in ZK",
}, []string{"database", "table"}).WithLabelValues(replica.database, replica.table)
newActiveReplicasMetric.Set(float64(replica.activeReplicas))
newActiveReplicasMetric.Collect(ch)
}

return nil
}

Expand Down Expand Up @@ -285,6 +402,116 @@ func (e *Exporter) parsePartsResponse(uri string) ([]partsResult, error) {
return results, nil
}

type replicasResult struct {
database string
table string
isLeader int
isReadonly int
isSessionExpired int
futureParts int
partsToCheck int
columnsVersion int
queueSize int
insertsInQueue int
mergesInQueue int
logMaxIndex int
logPointer int
totalReplicas int
activeReplicas int
}

func (e *Exporter) parseReplicasResponse(uri string) ([]replicasResult, error) {
data, err := e.handleResponse(uri)
if err != nil {
return nil, err
}

// Parsing results
lines := strings.Split(string(data), "\n")
var results = make([]replicasResult, 0)

for i, line := range lines {
replicas := strings.Fields(line)
if len(replicas) == 0 {
continue
}
if len(replicas) != 5 {
return nil, fmt.Errorf("parsePartsResponse: unexpected %d line: %s", i, line)
}
database := strings.TrimSpace(replicas[0])
table := strings.TrimSpace(replicas[1])

isLeader, err := strconv.Atoi(strings.TrimSpace(replicas[2]))
if err != nil {
return nil, err
}

isReadonly, err := strconv.Atoi(strings.TrimSpace(replicas[3]))
if err != nil {
return nil, err
}

isSessionExpired, err := strconv.Atoi(strings.TrimSpace(replicas[4]))
if err != nil {
return nil, err
}

futureParts, err := strconv.Atoi(strings.TrimSpace(replicas[5]))
if err != nil {
return nil, err
}

partsToCheck, err := strconv.Atoi(strings.TrimSpace(replicas[6]))
if err != nil {
return nil, err
}

columnsVersion, err := strconv.Atoi(strings.TrimSpace(replicas[7]))
if err != nil {
return nil, err
}

queueSize, err := strconv.Atoi(strings.TrimSpace(replicas[8]))
if err != nil {
return nil, err
}

insertsInQueue, err := strconv.Atoi(strings.TrimSpace(replicas[9]))
if err != nil {
return nil, err
}

mergesInQueue, err := strconv.Atoi(strings.TrimSpace(replicas[10]))
if err != nil {
return nil, err
}

logMaxIndex, err := strconv.Atoi(strings.TrimSpace(replicas[11]))
if err != nil {
return nil, err
}

logPointer, err := strconv.Atoi(strings.TrimSpace(replicas[12]))
if err != nil {
return nil, err
}

totalReplicas, err := strconv.Atoi(strings.TrimSpace(replicas[13]))
if err != nil {
return nil, err
}

activeReplicas, err := strconv.Atoi(strings.TrimSpace(replicas[14]))
if err != nil {
return nil, err
}

results = append(results, replicasResult{database, table, isLeader, isReadonly, isSessionExpired, futureParts, partsToCheck, columnsVersion, queueSize, insertsInQueue, mergesInQueue, logMaxIndex, logPointer, totalReplicas, activeReplicas})
}

return results, nil
}

// Collect fetches the stats from configured clickhouse location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
Expand Down