/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.metrics;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;

public class StreamsMetricsImpl
implements StreamsMetrics {
    private final Metrics metrics;
    private final Map<Sensor, Sensor> parentSensors;
    private final String clientId;
    private final String processId;
    private final Version version;
    private final Deque<MetricName> clientLevelMetrics = new LinkedList<MetricName>();
    private final Deque<String> clientLevelSensors = new LinkedList<String>();
    private final Map<String, Deque<MetricName>> threadLevelMetrics = new HashMap<String, Deque<MetricName>>();
    private final Map<String, Deque<String>> threadLevelSensors = new HashMap<String, Deque<String>>();
    private final Map<String, Deque<String>> taskLevelSensors = new HashMap<String, Deque<String>>();
    private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<String, Deque<String>>();
    private final Map<String, Deque<String>> topicLevelSensors = new HashMap<String, Deque<String>>();
    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<String, Deque<String>>();
    private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap<String, Deque<String>>();
    private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap<String, Deque<MetricName>>();
    private final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger;
    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    private static final String SENSOR_TASK_LABEL = "task";
    private static final String SENSOR_NODE_LABEL = "node";
    private static final String SENSOR_TOPIC_LABEL = "topic";
    private static final String SENSOR_CACHE_LABEL = "cache";
    private static final String SENSOR_STORE_LABEL = "store";
    private static final String SENSOR_ENTITY_LABEL = "entity";
    private static final String SENSOR_EXTERNAL_LABEL = "external";
    private static final String SENSOR_INTERNAL_LABEL = "internal";
    public static final String CLIENT_ID_TAG = "client-id";
    public static final String PROCESS_ID_TAG = "process-id";
    public static final String THREAD_ID_TAG = "thread-id";
    public static final String TASK_ID_TAG = "task-id";
    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
    public static final String TOPIC_NAME_TAG = "topic";
    public static final String STORE_ID_TAG = "state-id";
    public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
    public static final String ROLLUP_VALUE = "all";
    public static final String LATENCY_SUFFIX = "-latency";
    public static final String RECORDS_SUFFIX = "-records";
    public static final String AVG_SUFFIX = "-avg";
    public static final String MAX_SUFFIX = "-max";
    public static final String MIN_SUFFIX = "-min";
    public static final String RATE_SUFFIX = "-rate";
    public static final String TOTAL_SUFFIX = "-total";
    public static final String RATIO_SUFFIX = "-ratio";
    public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
    public static final String GROUP_PREFIX = "stream-";
    public static final String GROUP_SUFFIX = "-metrics";
    public static final String CLIENT_LEVEL_GROUP = "stream-metrics";
    public static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
    public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
    public static final String PROCESSOR_NODE_LEVEL_GROUP = "stream-processor-node-metrics";
    public static final String TOPIC_LEVEL_GROUP = "stream-topic-metrics";
    public static final String STATE_STORE_LEVEL_GROUP = "stream-state-metrics";
    public static final String CACHE_LEVEL_GROUP = "stream-record-cache-metrics";
    public static final String OPERATIONS = " operations";
    public static final String TOTAL_DESCRIPTION = "The total number of ";
    public static final String RATE_DESCRIPTION = "The average per-second number of ";
    public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on ";
    public static final String AVG_LATENCY_DESCRIPTION = "The average latency of ";
    public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of ";
    public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds";
    public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
    public static final String RATE_DESCRIPTION_SUFFIX = " per second";
    public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
    public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = "end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_AVG_DESCRIPTION = "The average end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the system time when it has been fully processed by the node";

    public StreamsMetricsImpl(Metrics metrics, String clientId, String processId, Time time) {
        Objects.requireNonNull(metrics, "Metrics cannot be null");
        this.metrics = metrics;
        this.clientId = clientId;
        this.processId = processId;
        this.version = Version.LATEST;
        this.rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
        this.parentSensors = new HashMap<Sensor, Sensor>();
    }

    public Version version() {
        return this.version;
    }

    public Metrics metricsRegistry() {
        return this.metrics;
    }

    public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
        return this.rocksDBMetricsRecordingTrigger;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> void addClientLevelImmutableMetric(String name, String description, Sensor.RecordingLevel recordingLevel, T value) {
        MetricName metricName = this.metrics.metricName(name, CLIENT_LEVEL_GROUP, description, this.clientLevelTagMap());
        MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
        Deque<MetricName> deque = this.clientLevelMetrics;
        synchronized (deque) {
            this.metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<T>(value));
            this.clientLevelMetrics.push(metricName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> void addClientLevelMutableMetric(String name, String description, Sensor.RecordingLevel recordingLevel, Gauge<T> valueProvider) {
        MetricName metricName = this.metrics.metricName(name, CLIENT_LEVEL_GROUP, description, this.clientLevelTagMap());
        MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
        Deque<MetricName> deque = this.clientLevelMetrics;
        synchronized (deque) {
            this.metrics.addMetric(metricName, metricConfig, valueProvider);
            this.clientLevelMetrics.push(metricName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> void addThreadLevelImmutableMetric(String name, String description, String threadId, T value) {
        MetricName metricName = this.metrics.metricName(name, THREAD_LEVEL_GROUP, description, this.threadLevelTagMap(threadId));
        Map<String, Deque<MetricName>> map = this.threadLevelMetrics;
        synchronized (map) {
            this.threadLevelMetrics.computeIfAbsent(this.threadSensorPrefix(threadId), tid -> new LinkedList()).add(metricName);
            this.metrics.addMetric(metricName, new ImmutableMetricValue<T>(value));
        }
    }

    public <T> void addThreadLevelMutableMetric(String name, String description, String threadId, Gauge<T> valueProvider) {
        this.addThreadLevelMutableMetric(name, description, threadId, Collections.emptyMap(), valueProvider);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> void addThreadLevelMutableMetric(String name, String description, String threadId, Map<String, String> additionalTags, Gauge<T> valueProvider) {
        MetricName metricName = this.metrics.metricName(name, THREAD_LEVEL_GROUP, description, this.threadLevelTagMap(threadId, additionalTags));
        Map<String, Deque<MetricName>> map = this.threadLevelMetrics;
        synchronized (map) {
            this.threadLevelMetrics.computeIfAbsent(this.threadSensorPrefix(threadId), tid -> new LinkedList()).add(metricName);
            this.metrics.addMetric(metricName, valueProvider);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final Sensor clientLevelSensor(String sensorName, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        Deque<String> deque = this.clientLevelSensors;
        synchronized (deque) {
            String fullSensorName = "stream-metrics.s." + sensorName;
            Sensor sensor = this.metrics.getSensor(fullSensorName);
            if (sensor == null) {
                this.clientLevelSensors.push(fullSensorName);
                return this.metrics.sensor(fullSensorName, recordingLevel, parents);
            }
            return sensor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final Sensor threadLevelSensor(String threadId, String sensorSuffix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.threadSensorPrefix(threadId);
        Map<String, Deque<String>> map = this.threadLevelSensors;
        synchronized (map) {
            return this.getSensors(this.threadLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
        }
    }

    private String threadSensorPrefix(String threadId) {
        return "internal." + threadId;
    }

    public Map<String, String> clientLevelTagMap() {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(CLIENT_ID_TAG, this.clientId);
        tagMap.put(PROCESS_ID_TAG, this.processId);
        return tagMap;
    }

    public Map<String, String> threadLevelTagMap(String threadId) {
        return this.threadLevelTagMap(threadId, Collections.emptyMap());
    }

    public Map<String, String> threadLevelTagMap(String threadId, Map<String, String> additionalTags) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>(additionalTags);
        tagMap.put(THREAD_ID_TAG, threadId);
        return tagMap;
    }

    public final void removeAllClientLevelSensorsAndMetrics() {
        this.removeAllClientLevelSensors();
        this.removeAllClientLevelMetrics();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeAllClientLevelMetrics() {
        Deque<MetricName> deque = this.clientLevelMetrics;
        synchronized (deque) {
            while (!this.clientLevelMetrics.isEmpty()) {
                this.metrics.removeMetric(this.clientLevelMetrics.pop());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeAllClientLevelSensors() {
        Deque<String> deque = this.clientLevelSensors;
        synchronized (deque) {
            while (!this.clientLevelSensors.isEmpty()) {
                this.metrics.removeSensor(this.clientLevelSensors.pop());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllThreadLevelSensors(String threadId) {
        String key = this.threadSensorPrefix(threadId);
        Map<String, Deque<String>> map = this.threadLevelSensors;
        synchronized (map) {
            Deque<String> sensors = this.threadLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                this.metrics.removeSensor(sensors.pop());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllThreadLevelMetrics(String threadId) {
        Map<String, Deque<MetricName>> map = this.threadLevelMetrics;
        synchronized (map) {
            Deque<MetricName> names = this.threadLevelMetrics.remove(this.threadSensorPrefix(threadId));
            while (names != null && !names.isEmpty()) {
                this.metrics.removeMetric(names.pop());
            }
        }
    }

    public void removeMetric(MetricName metricName) {
        this.metrics.removeMetric(metricName);
    }

    public void removeStoreLevelMetric(MetricName metricName) {
        this.metrics.removeMetric(metricName);
        List metricsScopeCandidates = metricName.tags().keySet().stream().filter(tag -> !tag.equals(THREAD_ID_TAG) && !tag.equals(TASK_ID_TAG)).collect(Collectors.toList());
        if (metricsScopeCandidates.size() != 1) {
            throw new IllegalStateException("Expected exactly one metric scope tag, but found " + String.valueOf(metricsScopeCandidates));
        }
        Deque metricsForStore = (Deque)this.storeLevelMetrics.get(this.storeSensorPrefix((String)metricName.tags().get(THREAD_ID_TAG), (String)metricName.tags().get(TASK_ID_TAG), (String)metricName.tags().get(metricsScopeCandidates.get(0))));
        if (metricsForStore != null) {
            metricsForStore.remove(metricName);
        }
    }

    public Map<String, String> taskLevelTagMap(String threadId, String taskId) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(THREAD_ID_TAG, threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        return tagMap;
    }

    public Map<String, String> nodeLevelTagMap(String threadId, String taskId, String processorNodeName) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(THREAD_ID_TAG, threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
        return tagMap;
    }

    public Map<String, String> topicLevelTagMap(String threadId, String taskId, String processorNodeName, String topicName) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(THREAD_ID_TAG, threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
        tagMap.put("topic", topicName);
        return tagMap;
    }

    public Map<String, String> storeLevelTagMap(String taskId, String storeType, String storeName) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(THREAD_ID_TAG, Thread.currentThread().getName());
        tagMap.put(TASK_ID_TAG, taskId);
        tagMap.put(storeType + "-state-id", storeName);
        return tagMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final Sensor taskLevelSensor(String threadId, String taskId, String sensorSuffix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.taskSensorPrefix(threadId, taskId);
        Map<String, Deque<String>> map = this.taskLevelSensors;
        synchronized (map) {
            return this.getSensors(this.taskLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllTaskLevelSensors(String threadId, String taskId) {
        String key = this.taskSensorPrefix(threadId, taskId);
        Map<String, Deque<String>> map = this.taskLevelSensors;
        synchronized (map) {
            Deque<String> sensors = this.taskLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                this.metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String taskSensorPrefix(String threadId, String taskId) {
        return this.threadSensorPrefix(threadId) + ".task." + taskId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sensor nodeLevelSensor(String threadId, String taskId, String processorNodeName, String sensorSuffix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.nodeSensorPrefix(threadId, taskId, processorNodeName);
        Map<String, Deque<String>> map = this.nodeLevelSensors;
        synchronized (map) {
            return this.getSensors(this.nodeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllNodeLevelSensors(String threadId, String taskId, String processorNodeName) {
        String key = this.nodeSensorPrefix(threadId, taskId, processorNodeName);
        Map<String, Deque<String>> map = this.nodeLevelSensors;
        synchronized (map) {
            Deque<String> sensors = this.nodeLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                this.metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String nodeSensorPrefix(String threadId, String taskId, String processorNodeName) {
        return this.taskSensorPrefix(threadId, taskId) + ".node." + processorNodeName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sensor topicLevelSensor(String threadId, String taskId, String processorNodeName, String topicName, String sensorSuffix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
        Map<String, Deque<String>> map = this.topicLevelSensors;
        synchronized (map) {
            return this.getSensors(this.topicLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllTopicLevelSensors(String threadId, String taskId, String processorNodeName, String topicName) {
        String key = this.topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
        Map<String, Deque<String>> map = this.topicLevelSensors;
        synchronized (map) {
            Deque<String> sensors = this.topicLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                this.metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String topicSensorPrefix(String threadId, String taskId, String processorNodeName, String topicName) {
        return this.nodeSensorPrefix(threadId, taskId, processorNodeName) + ".topic." + topicName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sensor cacheLevelSensor(String threadId, String taskName, String storeName, String ratioName, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.cacheSensorPrefix(threadId, taskName, storeName);
        Map<String, Deque<String>> map = this.cacheLevelSensors;
        synchronized (map) {
            return this.getSensors(this.cacheLevelSensors, ratioName, sensorPrefix, recordingLevel, parents);
        }
    }

    public Map<String, String> cacheLevelTagMap(String threadId, String taskId, String storeName) {
        LinkedHashMap<String, String> tagMap = new LinkedHashMap<String, String>();
        tagMap.put(THREAD_ID_TAG, threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        tagMap.put(RECORD_CACHE_ID_TAG, storeName);
        return tagMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeAllCacheLevelSensors(String threadId, String taskId, String cacheName) {
        String key = this.cacheSensorPrefix(threadId, taskId, cacheName);
        Map<String, Deque<String>> map = this.cacheLevelSensors;
        synchronized (map) {
            Deque<String> strings = this.cacheLevelSensors.remove(key);
            while (strings != null && !strings.isEmpty()) {
                this.metrics.removeSensor(strings.pop());
            }
        }
    }

    private String cacheSensorPrefix(String threadId, String taskId, String cacheName) {
        return this.taskSensorPrefix(threadId, taskId) + ".cache." + cacheName;
    }

    public final Sensor storeLevelSensor(String taskId, String storeName, String sensorSuffix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String sensorPrefix = this.storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
        return this.getSensors(this.storeLevelSensors, sensorSuffix, sensorPrefix, recordingLevel, parents);
    }

    public <T> MetricName addStoreLevelMutableMetric(String taskId, String metricsScope, String storeName, String name, String description, Sensor.RecordingLevel recordingLevel, Gauge<T> valueProvider) {
        MetricName metricName = this.metrics.metricName(name, STATE_STORE_LEVEL_GROUP, description, this.storeLevelTagMap(taskId, metricsScope, storeName));
        if (this.metrics.metric(metricName) == null) {
            this.metrics.addMetricIfAbsent(metricName, new MetricConfig().recordLevel(recordingLevel), valueProvider);
            String key = this.storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
            this.storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList()).push(metricName);
        }
        return metricName;
    }

    public final void removeAllStoreLevelSensorsAndMetrics(String taskId, String storeName) {
        String threadId = Thread.currentThread().getName();
        this.removeAllStoreLevelSensors(threadId, taskId, storeName);
        this.removeAllStoreLevelMetrics(threadId, taskId, storeName);
    }

    private void removeAllStoreLevelSensors(String threadId, String taskId, String storeName) {
        String key = this.storeSensorPrefix(threadId, taskId, storeName);
        Deque sensors = (Deque)this.storeLevelSensors.remove(key);
        while (sensors != null && !sensors.isEmpty()) {
            this.metrics.removeSensor((String)sensors.pop());
        }
    }

    private void removeAllStoreLevelMetrics(String threadId, String taskId, String storeName) {
        String key = this.storeSensorPrefix(threadId, taskId, storeName);
        Deque metricNames = (Deque)this.storeLevelMetrics.remove(key);
        while (metricNames != null && !metricNames.isEmpty()) {
            this.metrics.removeMetric((MetricName)metricNames.pop());
        }
    }

    private String storeSensorPrefix(String threadId, String taskId, String storeName) {
        return this.taskSensorPrefix(threadId, taskId) + ".store." + storeName;
    }

    @Override
    public Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel) {
        return this.metrics.sensor(name, recordingLevel);
    }

    @Override
    public Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        return this.metrics.sensor(name, recordingLevel, parents);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    private Map<String, String> customizedTags(String threadId, String scopeName, String entityName, String ... tags) {
        Map<String, String> tagMap = this.threadLevelTagMap(threadId);
        tagMap.put(scopeName + "-id", entityName);
        if (tags != null) {
            if (tags.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < tags.length; i += 2) {
                tagMap.put(tags[i], tags[i + 1]);
            }
        }
        return tagMap;
    }

    private Sensor customInvocationRateAndCountSensor(String threadId, String groupName, String entityName, String operationName, Map<String, String> tags, Sensor.RecordingLevel recordingLevel) {
        Sensor sensor = this.metrics.sensor(this.externalChildSensorName(threadId, operationName, entityName), recordingLevel);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(sensor, groupName, tags, operationName, RATE_DESCRIPTION_PREFIX + operationName + " operations per second", TOTAL_DESCRIPTION + operationName + OPERATIONS);
        return sensor;
    }

    @Override
    public Sensor addLatencyRateTotalSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String ... tags) {
        String threadId = Thread.currentThread().getName();
        String group = StreamsMetricsImpl.groupNameFromScope(scopeName);
        Map<String, String> tagMap = this.customizedTags(threadId, scopeName, entityName, tags);
        Sensor sensor = this.customInvocationRateAndCountSensor(threadId, group, entityName, operationName, tagMap, recordingLevel);
        StreamsMetricsImpl.addAvgAndMaxToSensor(sensor, group, tagMap, operationName + LATENCY_SUFFIX, AVG_LATENCY_DESCRIPTION + operationName, MAX_LATENCY_DESCRIPTION + operationName);
        return sensor;
    }

    @Override
    public Sensor addRateTotalSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String ... tags) {
        String threadId = Thread.currentThread().getName();
        Map<String, String> tagMap = this.customizedTags(threadId, scopeName, entityName, tags);
        return this.customInvocationRateAndCountSensor(threadId, StreamsMetricsImpl.groupNameFromScope(scopeName), entityName, operationName, tagMap, recordingLevel);
    }

    private String externalChildSensorName(String threadId, String operationName, String entityName) {
        return "external." + threadId + ".entity." + entityName + SENSOR_NAME_DELIMITER + operationName;
    }

    public static void addAvgAndMaxToSensor(Sensor sensor, String group, Map<String, String> tags, String gaugeName, String descriptionOfAvg, String descriptionOfMax) {
        sensor.add(new MetricName(gaugeName + AVG_SUFFIX, group, descriptionOfAvg, tags), (MeasurableStat)new Avg());
        sensor.add(new MetricName(gaugeName + MAX_SUFFIX, group, descriptionOfMax, tags), (MeasurableStat)new Max());
    }

    public static void addMinAndMaxToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String descriptionOfMin, String descriptionOfMax) {
        sensor.add(new MetricName(operation + MIN_SUFFIX, group, descriptionOfMin, tags), (MeasurableStat)new Min());
        sensor.add(new MetricName(operation + MAX_SUFFIX, group, descriptionOfMax, tags), (MeasurableStat)new Max());
    }

    public static void addAvgAndMaxLatencyToSensor(Sensor sensor, String group, Map<String, String> tags, String operation) {
        sensor.add(new MetricName(operation + "-latency-avg", group, AVG_LATENCY_DESCRIPTION + operation + " operation.", tags), (MeasurableStat)new Avg());
        sensor.add(new MetricName(operation + "-latency-max", group, MAX_LATENCY_DESCRIPTION + operation + " operation.", tags), (MeasurableStat)new Max());
    }

    public static void addAvgAndMinAndMaxToSensor(Sensor sensor, String group, Map<String, String> tags, String gaugeName, String descriptionOfAvg, String descriptionOfMin, String descriptionOfMax) {
        StreamsMetricsImpl.addAvgAndMaxToSensor(sensor, group, tags, gaugeName, descriptionOfAvg, descriptionOfMax);
        sensor.add(new MetricName(gaugeName + MIN_SUFFIX, group, descriptionOfMin, tags), (MeasurableStat)new Min());
    }

    public static void addInvocationRateAndCountToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String descriptionOfRate, String descriptionOfCount) {
        StreamsMetricsImpl.addInvocationRateToSensor(sensor, group, tags, operation, descriptionOfRate);
        sensor.add(new MetricName(operation + TOTAL_SUFFIX, group, descriptionOfCount, tags), (MeasurableStat)new CumulativeCount());
    }

    public static void addInvocationRateToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String descriptionOfRate) {
        sensor.add(new MetricName(operation + RATE_SUFFIX, group, descriptionOfRate, tags), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new WindowedCount()));
    }

    public static void addRateOfSumAndSumMetricsToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String descriptionOfRate, String descriptionOfTotal) {
        StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, group, tags, operation, descriptionOfRate);
        StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, operation, descriptionOfTotal);
    }

    public static void addRateOfSumMetricToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String description) {
        sensor.add(new MetricName(operation + RATE_SUFFIX, group, description, tags), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new WindowedSum()));
    }

    public static void addSumMetricToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, String description) {
        StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, operation, true, description);
    }

    public static void addSumMetricToSensor(Sensor sensor, String group, Map<String, String> tags, String operation, boolean withSuffix, String description) {
        sensor.add(new MetricName((String)(withSuffix ? operation + TOTAL_SUFFIX : operation), group, description, tags), (MeasurableStat)new CumulativeSum());
    }

    public static void addValueMetricToSensor(Sensor sensor, String group, Map<String, String> tags, String name, String description) {
        sensor.add(new MetricName(name, group, description, tags), (MeasurableStat)new Value());
    }

    public static void addAvgAndSumMetricsToSensor(Sensor sensor, String group, Map<String, String> tags, String metricNamePrefix, String descriptionOfAvg, String descriptionOfTotal) {
        sensor.add(new MetricName(metricNamePrefix + AVG_SUFFIX, group, descriptionOfAvg, tags), (MeasurableStat)new Avg());
        sensor.add(new MetricName(metricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags), (MeasurableStat)new CumulativeSum());
    }

    public static void addTotalCountAndSumMetricsToSensor(Sensor sensor, String group, Map<String, String> tags, String countMetricNamePrefix, String sumMetricNamePrefix, String descriptionOfCount, String descriptionOfTotal) {
        sensor.add(new MetricName(countMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfCount, tags), (MeasurableStat)new CumulativeCount());
        sensor.add(new MetricName(sumMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags), (MeasurableStat)new CumulativeSum());
    }

    public static void maybeRecordSensor(double value, Time time, Sensor sensor) {
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            sensor.record(value, time.milliseconds());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void maybeMeasureLatency(Runnable actionToMeasure, Time time, Sensor sensor) {
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            long startNs = time.nanoseconds();
            try {
                actionToMeasure.run();
            }
            finally {
                sensor.record((double)(time.nanoseconds() - startNs));
            }
        } else {
            actionToMeasure.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <T> T maybeMeasureLatency(Supplier<T> actionToMeasure, Time time, Sensor sensor) {
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            long startNs = time.nanoseconds();
            try {
                T t = actionToMeasure.get();
                return t;
            }
            finally {
                sensor.record((double)(time.nanoseconds() - startNs));
            }
        }
        return actionToMeasure.get();
    }

    private Sensor getSensors(Map<String, Deque<String>> sensors, String sensorSuffix, String sensorPrefix, Sensor.RecordingLevel recordingLevel, Sensor ... parents) {
        String fullSensorName = sensorPrefix + SENSOR_NAME_DELIMITER + sensorSuffix;
        Sensor sensor = this.metrics.getSensor(fullSensorName);
        if (sensor == null) {
            sensors.computeIfAbsent(sensorPrefix, ignored -> new LinkedList()).push(fullSensorName);
            return this.metrics.sensor(fullSensorName, recordingLevel, parents);
        }
        return sensor;
    }

    @Override
    public void removeSensor(Sensor sensor) {
        Objects.requireNonNull(sensor, "Sensor is null");
        this.metrics.removeSensor(sensor.name());
        Sensor parent = this.parentSensors.remove(sensor);
        if (parent != null) {
            this.metrics.removeSensor(parent.name());
        }
    }

    Map<Sensor, Sensor> parentSensors() {
        return Collections.unmodifiableMap(this.parentSensors);
    }

    private static String groupNameFromScope(String scopeName) {
        return GROUP_PREFIX + scopeName + GROUP_SUFFIX;
    }

    public static enum Version {
        LATEST;

    }

    static class ImmutableMetricValue<T>
    implements Gauge<T> {
        private final T value;

        public ImmutableMetricValue(T value) {
            this.value = value;
        }

        public T value(MetricConfig config, long now) {
            return this.value;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ImmutableMetricValue that = (ImmutableMetricValue)o;
            return Objects.equals(this.value, that.value);
        }

        public int hashCode() {
            return Objects.hash(this.value);
        }
    }
}

