/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.asyncquery;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;

public class OpensearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {
    public static final String JOB_METADATA_INDEX = ".ql-job-metadata";
    private static final String JOB_METADATA_INDEX_MAPPING_FILE_NAME = "job-metadata-index-mapping.yml";
    private static final String JOB_METADATA_INDEX_SETTINGS_FILE_NAME = "job-metadata-index-settings.yml";
    private static final Logger LOG = LogManager.getLogger();
    private final Client client;
    private final ClusterService clusterService;

    public OpensearchAsyncQueryJobMetadataStorageService(Client client, ClusterService clusterService) {
        this.client = client;
        this.clusterService = clusterService;
    }

    @Override
    public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        IndexResponse indexResponse;
        if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
            this.createJobMetadataIndex();
        }
        IndexRequest indexRequest = new IndexRequest(JOB_METADATA_INDEX);
        indexRequest.id(asyncQueryJobMetadata.getJobId());
        indexRequest.opType(DocWriteRequest.OpType.CREATE);
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try (ThreadContext.StoredContext storedContext = this.client.threadPool().getThreadContext().stashContext();){
            indexRequest.source(AsyncQueryJobMetadata.convertToXContent(asyncQueryJobMetadata));
            ActionFuture indexResponseActionFuture = this.client.index(indexRequest);
            indexResponse = (IndexResponse)indexResponseActionFuture.actionGet();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (!indexResponse.getResult().equals((Object)DocWriteResponse.Result.CREATED)) {
            throw new RuntimeException("Saving job metadata information failed with result : " + indexResponse.getResult().getLowercase());
        }
        LOG.debug("JobMetadata   : {}  successfully created", (Object)asyncQueryJobMetadata.getJobId());
    }

    @Override
    public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
        if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
            this.createJobMetadataIndex();
            return Optional.empty();
        }
        return this.searchInJobMetadataIndex((QueryBuilder)QueryBuilders.termQuery((String)"jobId.keyword", (String)jobId)).stream().findFirst();
    }

    private void createJobMetadataIndex() {
        try {
            ActionFuture createIndexResponseActionFuture;
            InputStream mappingFileStream = OpensearchAsyncQueryJobMetadataStorageService.class.getClassLoader().getResourceAsStream(JOB_METADATA_INDEX_MAPPING_FILE_NAME);
            InputStream settingsFileStream = OpensearchAsyncQueryJobMetadataStorageService.class.getClassLoader().getResourceAsStream(JOB_METADATA_INDEX_SETTINGS_FILE_NAME);
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(JOB_METADATA_INDEX);
            createIndexRequest.mapping(IOUtils.toString((InputStream)mappingFileStream, (Charset)StandardCharsets.UTF_8), XContentType.YAML).settings(IOUtils.toString((InputStream)settingsFileStream, (Charset)StandardCharsets.UTF_8), XContentType.YAML);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                createIndexResponseActionFuture = this.client.admin().indices().create(createIndexRequest);
            }
            CreateIndexResponse createIndexResponse = (CreateIndexResponse)createIndexResponseActionFuture.actionGet();
            if (!createIndexResponse.isAcknowledged()) {
                throw new RuntimeException("Index creation is not acknowledged.");
            }
            LOG.info("Index: {} creation Acknowledged", (Object)JOB_METADATA_INDEX);
        }
        catch (Throwable e) {
            throw new RuntimeException("Internal server error while creating.ql-job-metadata index:: " + e.getMessage());
        }
    }

    private List<AsyncQueryJobMetadata> searchInJobMetadataIndex(QueryBuilder query) {
        ActionFuture searchResponseActionFuture;
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(new String[]{JOB_METADATA_INDEX});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(query);
        searchSourceBuilder.size(1);
        searchRequest.source(searchSourceBuilder);
        searchRequest.preference("_primary_first");
        try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
            searchResponseActionFuture = this.client.search(searchRequest);
        }
        SearchResponse searchResponse = (SearchResponse)searchResponseActionFuture.actionGet();
        if (searchResponse.status().getStatus() != 200) {
            throw new RuntimeException("Fetching job metadata information failed with status : " + searchResponse.status());
        }
        ArrayList<AsyncQueryJobMetadata> list = new ArrayList<AsyncQueryJobMetadata>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            AsyncQueryJobMetadata asyncQueryJobMetadata;
            String sourceAsString = searchHit.getSourceAsString();
            try {
                asyncQueryJobMetadata = AsyncQueryJobMetadata.toJobMetadata(sourceAsString);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            list.add(asyncQueryJobMetadata);
        }
        return list;
    }
}

