/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointTaskConfig;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointStore
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CheckpointStore.class);
    private final MirrorCheckpointTaskConfig config;
    private final Set<String> consumerGroups;
    private TopicAdmin cpAdmin = null;
    private KafkaBasedLog<byte[], byte[]> backingStore = null;
    Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
    private volatile boolean loadSuccess = false;
    private volatile boolean isInitialized = false;

    public CheckpointStore(MirrorCheckpointTaskConfig config, Set<String> consumerGroups) {
        this.config = config;
        this.consumerGroups = new HashSet<String>(consumerGroups);
    }

    CheckpointStore(Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
        this.config = null;
        this.consumerGroups = null;
        this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
        this.isInitialized = true;
        this.loadSuccess = true;
    }

    public boolean start() {
        this.checkpointsPerConsumerGroup = this.readCheckpoints();
        this.isInitialized = true;
        if (log.isTraceEnabled()) {
            log.trace("CheckpointStore started, load success={}, map={}", (Object)this.loadSuccess, this.checkpointsPerConsumerGroup);
        } else {
            log.debug("CheckpointStore started, load success={}, map.size={}", (Object)this.loadSuccess, (Object)this.checkpointsPerConsumerGroup.size());
        }
        return this.loadSuccess;
    }

    public boolean isInitialized() {
        return this.isInitialized;
    }

    public void update(String group, Map<TopicPartition, Checkpoint> newCheckpoints) {
        Map oldCheckpoints = this.checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap());
        oldCheckpoints.putAll(newCheckpoints);
    }

    public Map<TopicPartition, Checkpoint> get(String group) {
        Map<TopicPartition, Checkpoint> result = this.checkpointsPerConsumerGroup.get(group);
        return result == null ? null : Collections.unmodifiableMap(result);
    }

    public Map<String, Map<TopicPartition, OffsetAndMetadata>> computeConvertedUpstreamOffset() {
        HashMap<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>();
        for (Map.Entry<String, Map<TopicPartition, Checkpoint>> entry : this.checkpointsPerConsumerGroup.entrySet()) {
            String consumerId = entry.getKey();
            HashMap<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Checkpoint checkpoint : entry.getValue().values()) {
                convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
            }
            result.put(consumerId, convertedUpstreamOffset);
        }
        return result;
    }

    @Override
    public void close() {
        this.releaseResources();
    }

    private void releaseResources() {
        Utils.closeQuietly(this.backingStore != null ? () -> this.backingStore.stop() : null, (String)"backing store for previous Checkpoints");
        Utils.closeQuietly((AutoCloseable)this.cpAdmin, (String)"admin client for previous Checkpoints");
        this.cpAdmin = null;
        this.backingStore = null;
    }

    private Map<String, Map<TopicPartition, Checkpoint>> readCheckpoints() {
        HashMap<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<String, Map<TopicPartition, Checkpoint>>();
        Callback consumedCallback = (error, cpRecord) -> {
            if (error != null) {
                checkpoints.clear();
                if (error instanceof RuntimeException) {
                    throw (RuntimeException)error;
                }
                throw new RuntimeException(error);
            }
            try {
                Checkpoint cp = Checkpoint.deserializeRecord((ConsumerRecord)cpRecord);
                if (this.consumerGroups.contains(cp.consumerGroupId())) {
                    Map cps = checkpoints.computeIfAbsent(cp.consumerGroupId(), ignored1 -> new HashMap());
                    cps.put(cp.topicPartition(), cp);
                }
            }
            catch (SchemaException ex) {
                log.warn("Ignored invalid checkpoint record at offset {}", (Object)cpRecord.offset(), (Object)ex);
            }
        };
        try {
            long startTime = System.currentTimeMillis();
            this.readCheckpointsImpl(this.config, (Callback<ConsumerRecord<byte[], byte[]>>)consumedCallback);
            log.debug("starting+stopping KafkaBasedLog took {}ms", (Object)(System.currentTimeMillis() - startTime));
            this.loadSuccess = true;
        }
        catch (Exception error2) {
            this.loadSuccess = false;
            if (error2 instanceof AuthorizationException) {
                log.warn("Not authorized to access checkpoints topic {} - this may degrade offset translation as only checkpoints for offsets which were mirrored after the task started will be emitted", (Object)this.config.checkpointsTopic(), (Object)error2);
            }
            log.info("Exception encountered loading checkpoints topic {} - this may degrade offset translation as only checkpoints for offsets which were mirrored after the task started will be emitted", (Object)this.config.checkpointsTopic(), (Object)error2);
        }
        return checkpoints;
    }

    void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
        try {
            this.cpAdmin = new TopicAdmin(config.targetAdminConfig("checkpoint-target-admin"), (Admin)config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));
            this.backingStore = KafkaBasedLog.withExistingClients((String)config.checkpointsTopic(), MirrorUtils.newConsumer(config.targetConsumerConfig("checkpoints-target-consumer")), null, (TopicAdmin)this.cpAdmin, consumedCallback, (Time)Time.SYSTEM, ignored -> {}, topicPartition -> topicPartition.partition() == 0);
            this.backingStore.start(true);
            this.backingStore.stop();
        }
        finally {
            this.releaseResources();
        }
    }
}

