/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.indices.replication.checkpoint;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.UploadListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.RemoteStoreUploader;
import org.opensearch.index.shard.RemoteStoreUploaderService;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.AbstractPublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.MergedSegmentCheckpoint;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.RemoteStoreMergedSegmentCheckpoint;
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

public class RemoteStorePublishMergedSegmentAction
extends AbstractPublishCheckpointAction<RemoteStorePublishMergedSegmentRequest, RemoteStorePublishMergedSegmentRequest>
implements MergedSegmentPublisher.PublishAction {
    public static final String ACTION_NAME = "indices:admin/remote_publish_merged_segment";
    private static final Logger logger = LogManager.getLogger(RemoteStorePublishMergedSegmentAction.class);
    private final SegmentReplicationTargetService replicationService;

    @Inject
    public RemoteStorePublishMergedSegmentAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, SegmentReplicationTargetService targetService) {
        super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, RemoteStorePublishMergedSegmentRequest::new, RemoteStorePublishMergedSegmentRequest::new, "generic", logger);
        this.replicationService = targetService;
    }

    @Override
    protected void doReplicaOperation(RemoteStorePublishMergedSegmentRequest shardRequest, IndexShard replica) {
        RemoteStoreMergedSegmentCheckpoint checkpoint = shardRequest.getMergedSegment();
        if (checkpoint.getShardId().equals((Object)replica.shardId())) {
            long startTime = System.currentTimeMillis();
            replica.getRemoteDirectory().markMergedSegmentsPendingDownload(checkpoint.getLocalToRemoteSegmentFilenameMap());
            this.replicationService.onNewMergedSegmentCheckpoint(checkpoint, replica);
            replica.mergedSegmentTransferTracker().addTotalReceiveTimeMillis(System.currentTimeMillis() - startTime);
        } else {
            logger.warn(() -> new ParameterizedMessage("Received merged segment checkpoint for shard {} on replica shard {}, ignoring checkpoint", (Object)checkpoint.getShardId(), (Object)replica.shardId()));
        }
    }

    @Override
    protected void shardOperationOnPrimary(RemoteStorePublishMergedSegmentRequest shardRequest, IndexShard primary, ActionListener<TransportReplicationAction.PrimaryResult<RemoteStorePublishMergedSegmentRequest, ReplicationResponse>> listener) {
        ActionListener.completeWith(listener, () -> new TransportReplicationAction.PrimaryResult<RemoteStorePublishMergedSegmentRequest, ReplicationResponse>(shardRequest, new ReplicationResponse()));
    }

    @Override
    public final void publish(final IndexShard indexShard, MergedSegmentCheckpoint checkpoint) {
        long startTimeMillis = System.currentTimeMillis();
        Map<String, String> localToRemoteStoreFilenames = this.uploadMergedSegmentsToRemoteStore(indexShard, checkpoint);
        long endTimeMillis = System.currentTimeMillis();
        long elapsedTimeMillis = endTimeMillis - startTimeMillis;
        long timeoutMillis = indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout().millis();
        long timeLeftMillis = Math.max(0L, timeoutMillis - elapsedTimeMillis);
        indexShard.mergedSegmentTransferTracker().addTotalSendTimeMillis(elapsedTimeMillis);
        if (timeLeftMillis > 0L) {
            RemoteStoreMergedSegmentCheckpoint remoteStoreMergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint(checkpoint, localToRemoteStoreFilenames);
            this.doPublish(indexShard, remoteStoreMergedSegmentCheckpoint, new RemoteStorePublishMergedSegmentRequest(remoteStoreMergedSegmentCheckpoint), "segrep_remote_publish_merged_segment", true, TimeValue.timeValueMillis((long)timeLeftMillis), new ActionListener<Void>(this){

                public void onResponse(Void unused) {
                }

                public void onFailure(Exception e) {
                    indexShard.mergedSegmentTransferTracker().incrementTotalWarmFailureCount();
                }
            });
        } else {
            indexShard.mergedSegmentTransferTracker().incrementTotalWarmFailureCount();
            logger.warn(() -> new ParameterizedMessage("Unable to confirm upload of merged segment {} to remote store. Timeout of {}ms exceeded. Skipping pre-copy.", (Object)checkpoint, (Object)TimeValue.timeValueMillis((long)elapsedTimeMillis).toHumanReadableString(3)));
        }
    }

    private Map<String, String> uploadMergedSegmentsToRemoteStore(final IndexShard indexShard, final MergedSegmentCheckpoint checkpoint) {
        final Set<String> segmentsToUpload = checkpoint.getMetadataMap().keySet();
        final ConcurrentHashMap<String, String> localToRemoteStoreFilenames = new ConcurrentHashMap<String, String>();
        Map<String, Long> segmentsSizeMap = checkpoint.getMetadataMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((StoreFileMetadata)e.getValue()).length()));
        final CountDownLatch latch = new CountDownLatch(1);
        this.getRemoteStoreUploaderService(indexShard).uploadSegments(segmentsToUpload, segmentsSizeMap, new ActionListener<Void>(this){

            public void onResponse(Void unused) {
                logger.trace(() -> new ParameterizedMessage("Successfully uploaded segments {} to remote store", (Object)segmentsToUpload));
                latch.countDown();
            }

            public void onFailure(Exception e) {
                logger.warn(() -> new ParameterizedMessage("Failed to upload segments {} to remote store. {}", (Object)segmentsToUpload, (Object)e));
                latch.countDown();
            }
        }, x -> new UploadListener(){

            @Override
            public void beforeUpload(String file) {
            }

            @Override
            public void onSuccess(String file) {
                localToRemoteStoreFilenames.put(file, indexShard.getRemoteDirectory().getExistingRemoteFilename(file));
                indexShard.mergedSegmentTransferTracker().addTotalBytesSent(checkpoint.getMetadataMap().get(file).length());
            }

            @Override
            public void onFailure(String file) {
                logger.warn("Unable to upload segments during merge. Continuing.");
            }
        }, true);
        try {
            long timeout = indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout().seconds();
            if (!latch.await(timeout, TimeUnit.SECONDS)) {
                logger.warn(() -> new ParameterizedMessage("Timeout exceeded {}s: Could not verify merge segment downloads were completed by replicas. Continuing.", (Object)timeout));
            }
        }
        catch (InterruptedException e2) {
            logger.warn(() -> new ParameterizedMessage("Unable to confirm successful merge segment downloads by replicas due to interruption. Continuing. \nException - {}", (Object)e2));
        }
        return localToRemoteStoreFilenames;
    }

    private RemoteStoreUploader getRemoteStoreUploaderService(IndexShard indexShard) {
        return new RemoteStoreUploaderService(indexShard, indexShard.store().directory(), indexShard.getRemoteDirectory());
    }
}

