/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.transform.transforms;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerJobStats;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.BulkIndexingException;
import org.elasticsearch.xpack.transform.transforms.Function;
import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
import org.elasticsearch.xpack.transform.transforms.RetentionPolicyToDeleteByQueryRequestConverter;
import org.elasticsearch.xpack.transform.transforms.TransformContext;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

public abstract class TransformIndexer
extends AsyncTwoPhaseIndexer<TransformIndexerPosition, TransformIndexerStats> {
    private static final int PERSIST_STOP_AT_CHECKPOINT_TIMEOUT_SEC = 5;
    public static final int MINIMUM_PAGE_SIZE = 10;
    private static final Logger logger = LogManager.getLogger(TransformIndexer.class);
    private static final long NUMBER_OF_CHECKPOINTS_TO_KEEP = 10L;
    private static final long RETENTION_OF_CHECKPOINTS_MS = 864000000L;
    private static final long CHECKPOINT_CLEANUP_INTERVAL = 100L;
    protected final TransformConfigManager transformsConfigManager;
    private final CheckpointProvider checkpointProvider;
    private volatile float docsPerSecond = -1.0f;
    protected final TransformAuditor auditor;
    protected final TransformContext context;
    protected volatile TransformConfig transformConfig;
    private volatile TransformProgress progress;
    protected volatile boolean hasSourceChanged = true;
    protected final AtomicReference<Collection<ActionListener<Void>>> saveStateListeners = new AtomicReference();
    private volatile Map<String, String> fieldMappings;
    private Function function;
    private Function.ChangeCollector changeCollector;
    private Map<String, Object> nextChangeCollectorBucketPosition = null;
    private volatile Integer initialConfiguredPageSize;
    private volatile int pageSize = 0;
    private volatile long logEvery = 1L;
    private volatile long logCount = 0L;
    private volatile TransformCheckpoint lastCheckpoint;
    private volatile TransformCheckpoint nextCheckpoint;
    private volatile String lastAuditedExceptionMessage = null;
    private volatile RunState runState;
    private volatile long lastCheckpointCleanup = 0L;
    protected volatile boolean indexerThreadShuttingDown = false;
    protected volatile boolean saveStateRequestedDuringIndexerThreadShutdown = false;

    public TransformIndexer(ThreadPool threadPool, TransformServices transformServices, CheckpointProvider checkpointProvider, TransformConfig transformConfig, AtomicReference<IndexerState> initialState, TransformIndexerPosition initialPosition, TransformIndexerStats jobStats, TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, TransformContext context) {
        super(threadPool, initialState, (Object)initialPosition, (IndexerJobStats)jobStats);
        ExceptionsHelper.requireNonNull((Object)transformServices, (String)"transformServices");
        this.transformsConfigManager = transformServices.getConfigManager();
        this.checkpointProvider = (CheckpointProvider)ExceptionsHelper.requireNonNull((Object)checkpointProvider, (String)"checkpointProvider");
        this.auditor = transformServices.getAuditor();
        this.transformConfig = (TransformConfig)ExceptionsHelper.requireNonNull((Object)transformConfig, (String)"transformConfig");
        this.progress = transformProgress != null ? transformProgress : new TransformProgress();
        this.lastCheckpoint = (TransformCheckpoint)ExceptionsHelper.requireNonNull((Object)lastCheckpoint, (String)"lastCheckpoint");
        this.nextCheckpoint = (TransformCheckpoint)ExceptionsHelper.requireNonNull((Object)nextCheckpoint, (String)"nextCheckpoint");
        this.context = (TransformContext)ExceptionsHelper.requireNonNull((Object)context, (String)"context");
        this.runState = RunState.APPLY_RESULTS;
        if (transformConfig.getSettings() != null && transformConfig.getSettings().getDocsPerSecond() != null) {
            this.docsPerSecond = transformConfig.getSettings().getDocsPerSecond().floatValue();
        }
    }

    abstract void doGetInitialProgress(SearchRequest var1, ActionListener<SearchResponse> var2);

    abstract void doGetFieldMappings(ActionListener<Map<String, String>> var1);

    abstract void doDeleteByQuery(DeleteByQueryRequest var1, ActionListener<BulkByScrollResponse> var2);

    abstract void refreshDestinationIndex(ActionListener<RefreshResponse> var1);

    abstract void persistState(TransformState var1, ActionListener<Void> var2);

    public int getPageSize() {
        return this.pageSize;
    }

    protected String getJobId() {
        return this.transformConfig.getId();
    }

    protected float getMaxDocsPerSecond() {
        return this.docsPerSecond;
    }

    protected boolean triggerSaveState() {
        return this.saveStateListeners.get() != null || super.triggerSaveState();
    }

    public TransformConfig getConfig() {
        return this.transformConfig;
    }

    public boolean isContinuous() {
        return this.getConfig().getSyncConfig() != null;
    }

    public Map<String, String> getFieldMappings() {
        return this.fieldMappings;
    }

    public TransformProgress getProgress() {
        return this.progress;
    }

    public TransformCheckpoint getLastCheckpoint() {
        return this.lastCheckpoint;
    }

    public TransformCheckpoint getNextCheckpoint() {
        return this.nextCheckpoint;
    }

    public CheckpointProvider getCheckpointProvider() {
        return this.checkpointProvider;
    }

    protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
        this.checkpointProvider.createNextCheckpoint(this.getLastCheckpoint(), (ActionListener<TransformCheckpoint>)ActionListener.wrap(checkpoint -> this.transformsConfigManager.putTransformCheckpoint((TransformCheckpoint)checkpoint, (ActionListener<Boolean>)ActionListener.wrap(putCheckPointResponse -> listener.onResponse(checkpoint), createCheckpointException -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to create checkpoint.", (Object)this.getJobId()), (Throwable)createCheckpointException);
            listener.onFailure((Exception)new RuntimeException("Failed to create checkpoint due to: " + createCheckpointException.getMessage(), (Throwable)createCheckpointException));
        })), getCheckPointException -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to retrieve checkpoint.", (Object)this.getJobId()), (Throwable)getCheckPointException);
            listener.onFailure((Exception)new RuntimeException("Failed to retrieve checkpoint due to: " + getCheckPointException.getMessage(), (Throwable)getCheckPointException));
        }));
    }

    protected void onStart(long now, ActionListener<Boolean> listener) {
        if (this.context.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to start while failed.", (Object)this.getJobId());
            listener.onFailure((Exception)((Object)new ElasticsearchException("Attempted to start a failed transform [{}].", new Object[]{this.getJobId()})));
            return;
        }
        ActionListener finalListener = ActionListener.wrap(r -> {
            try {
                if (this.pageSize == 0) {
                    this.configurePageSize(this.getConfig().getSettings().getMaxPageSearchSize());
                }
                this.runState = this.determineRunStateAtStart();
                listener.onResponse((Object)true);
            }
            catch (Exception e) {
                listener.onFailure(e);
                return;
            }
        }, arg_0 -> listener.onFailure(arg_0));
        ActionListener configurationReadyListener = ActionListener.wrap(r -> {
            this.initializeFunction();
            if (this.initialRun()) {
                this.createCheckpoint((ActionListener<TransformCheckpoint>)ActionListener.wrap(cp -> {
                    this.nextCheckpoint = cp;
                    if (this.nextCheckpoint.getCheckpoint() > 1L) {
                        this.progress = new TransformProgress(null, Long.valueOf(0L), Long.valueOf(0L));
                        finalListener.onResponse(null);
                        return;
                    }
                    SearchRequest request = new SearchRequest(this.transformConfig.getSource().getIndex());
                    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                    this.function.buildSearchQueryForInitialProgress(searchSourceBuilder);
                    searchSourceBuilder.query((QueryBuilder)QueryBuilders.boolQuery().filter(this.buildFilterQuery()).filter(searchSourceBuilder.query()));
                    request.allowPartialSearchResults(false).source(searchSourceBuilder);
                    this.doGetInitialProgress(request, (ActionListener<SearchResponse>)ActionListener.wrap(response -> this.function.getInitialProgressFromResponse((SearchResponse)response, (ActionListener<TransformProgress>)ActionListener.wrap(newProgress -> {
                        logger.trace("[{}] reset the progress from [{}] to [{}].", (Object)this.getJobId(), (Object)this.progress, newProgress);
                        this.progress = newProgress != null ? newProgress : new TransformProgress();
                        finalListener.onResponse(null);
                    }, failure -> {
                        this.progress = new TransformProgress();
                        logger.warn((Message)new ParameterizedMessage("[{}] unable to load progress information for task.", (Object)this.getJobId()), (Throwable)failure);
                        finalListener.onResponse(null);
                    })), failure -> {
                        this.progress = new TransformProgress();
                        logger.warn((Message)new ParameterizedMessage("[{}] unable to load progress information for task.", (Object)this.getJobId()), (Throwable)failure);
                        finalListener.onResponse(null);
                    }));
                }, arg_0 -> ((ActionListener)listener).onFailure(arg_0)));
            } else {
                finalListener.onResponse(null);
            }
        }, arg_0 -> listener.onFailure(arg_0));
        ActionListener fieldMappingsListener = ActionListener.wrap(mappings -> {
            this.fieldMappings = mappings;
            configurationReadyListener.onResponse(null);
        }, arg_0 -> listener.onFailure(arg_0));
        ActionListener reLoadFieldMappingsListener = ActionListener.wrap(updateConfigResponse -> this.doGetFieldMappings((ActionListener<Map<String, String>>)fieldMappingsListener), arg_0 -> listener.onFailure(arg_0));
        ActionListener changedSourceListener = ActionListener.wrap(r -> {
            if (this.isContinuous()) {
                this.transformsConfigManager.getTransformConfiguration(this.getJobId(), (ActionListener<TransformConfig>)ActionListener.wrap(config -> {
                    if (this.transformConfig.equals(config) && this.fieldMappings != null) {
                        logger.trace("[{}] transform config has not changed.", (Object)this.getJobId());
                        configurationReadyListener.onResponse(null);
                    } else {
                        this.transformConfig = config;
                        logger.debug("[{}] successfully refreshed transform config from index.", (Object)this.getJobId());
                        reLoadFieldMappingsListener.onResponse(null);
                    }
                }, failure -> {
                    String msg = TransformMessages.getMessage((String)"Failed to reload transform configuration for transform [{0}]", (Object[])new Object[]{this.getJobId()});
                    logger.error(msg, (Throwable)failure);
                    if (failure instanceof ResourceNotFoundException) {
                        reLoadFieldMappingsListener.onFailure((Exception)((Object)new TransformConfigLostOnReloadException(msg, (Throwable)failure, new Object[0])));
                    } else {
                        this.auditor.warning(this.getJobId(), msg);
                        reLoadFieldMappingsListener.onResponse(null);
                    }
                }));
            } else {
                reLoadFieldMappingsListener.onResponse(null);
            }
        }, arg_0 -> listener.onFailure(arg_0));
        Instant instantOfTrigger = Instant.ofEpochMilli(now);
        if (this.context.getCheckpoint() > 0L && this.initialRun()) {
            this.sourceHasChanged((ActionListener<Boolean>)ActionListener.wrap(hasChanged -> {
                this.context.setLastSearchTime(instantOfTrigger);
                this.hasSourceChanged = hasChanged;
                if (hasChanged.booleanValue()) {
                    this.context.setChangesLastDetectedAt(instantOfTrigger);
                    logger.debug("[{}] source has changed, triggering new indexer run.", (Object)this.getJobId());
                    changedSourceListener.onResponse(null);
                } else {
                    logger.trace("[{}] source has not changed, finish indexer early.", (Object)this.getJobId());
                    listener.onResponse((Object)false);
                }
            }, failure -> {
                this.hasSourceChanged = true;
                listener.onFailure(failure);
            }));
        } else {
            this.hasSourceChanged = true;
            this.context.setLastSearchTime(instantOfTrigger);
            this.context.setChangesLastDetectedAt(instantOfTrigger);
            changedSourceListener.onResponse(null);
        }
    }

    protected void initializeFunction() {
        this.function = FunctionFactory.create(this.getConfig());
        if (this.isContinuous()) {
            this.changeCollector = this.function.buildChangeCollector(this.getConfig().getSyncConfig().getField());
        }
    }

    protected boolean initialRun() {
        return this.getPosition() == null;
    }

    protected void onFinish(ActionListener<Void> listener) {
        this.startIndexerThreadShutdown();
        if (!this.hasSourceChanged) {
            if (this.context.shouldStopAtCheckpoint()) {
                this.stop();
            }
            listener.onResponse(null);
            return;
        }
        ActionListener failureHandlingListener = ActionListener.wrap(arg_0 -> listener.onResponse(arg_0), failure -> {
            this.handleFailure((Exception)failure);
            listener.onFailure(failure);
        });
        try {
            this.refreshDestinationIndex((ActionListener<RefreshResponse>)ActionListener.wrap(response -> {
                if (response.getFailedShards() > 0) {
                    logger.warn("[{}] failed to refresh transform destination index, not all data might be available after checkpoint.", (Object)this.getJobId());
                }
                if (this.transformConfig.getRetentionPolicyConfig() != null) {
                    this.executeRetentionPolicy((ActionListener<Void>)failureHandlingListener);
                } else {
                    this.finalizeCheckpoint((ActionListener<Void>)failureHandlingListener);
                }
            }, arg_0 -> ((ActionListener)failureHandlingListener).onFailure(arg_0)));
        }
        catch (Exception e) {
            failureHandlingListener.onFailure(e);
        }
    }

    private void executeRetentionPolicy(ActionListener<Void> listener) {
        DeleteByQueryRequest deleteByQuery = RetentionPolicyToDeleteByQueryRequestConverter.buildDeleteByQueryRequest(this.transformConfig.getRetentionPolicyConfig(), this.transformConfig.getSettings(), this.transformConfig.getDestination(), this.nextCheckpoint);
        if (deleteByQuery == null) {
            this.finalizeCheckpoint(listener);
            return;
        }
        logger.debug(() -> new ParameterizedMessage("[{}] Run delete based on retention policy using dbq [{}] with query: [{}]", new Object[]{this.getJobId(), deleteByQuery, deleteByQuery.getSearchRequest()}));
        ((TransformIndexerStats)this.getStats()).markStartDelete();
        this.doDeleteByQuery(deleteByQuery, (ActionListener<BulkByScrollResponse>)ActionListener.wrap(bulkByScrollResponse -> {
            logger.trace(() -> new ParameterizedMessage("[{}] dbq response: [{}]", (Object)this.getJobId(), bulkByScrollResponse));
            ((TransformIndexerStats)this.getStats()).markEndDelete();
            ((TransformIndexerStats)this.getStats()).incrementNumDeletedDocuments(bulkByScrollResponse.getDeleted());
            logger.debug("[{}] deleted [{}] documents as part of the retention policy.", (Object)this.getJobId(), (Object)bulkByScrollResponse.getDeleted());
            if (bulkByScrollResponse.getVersionConflicts() > 0L) {
                listener.onFailure((Exception)((Object)new RetentionPolicyToDeleteByQueryRequestConverter.RetentionPolicyException("found [{}] version conflicts when deleting documents as part of the retention policy.", bulkByScrollResponse.getDeleted())));
                return;
            }
            if (bulkByScrollResponse.getBulkFailures().size() > 0 || bulkByScrollResponse.getSearchFailures().size() > 0) {
                assert (false) : "delete by query failed unexpectedly" + bulkByScrollResponse;
                listener.onFailure((Exception)((Object)new RetentionPolicyToDeleteByQueryRequestConverter.RetentionPolicyException("found failures when deleting documents as part of the retention policy. Response: [{}]", bulkByScrollResponse)));
                return;
            }
            this.finalizeCheckpoint(listener);
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    private void finalizeCheckpoint(ActionListener<Void> listener) {
        try {
            this.pageSize = this.function.getInitialPageSize();
            if (this.changeCollector != null) {
                this.changeCollector.clear();
            }
            long checkpoint = this.context.incrementAndGetCheckpoint();
            this.lastCheckpoint = this.getNextCheckpoint();
            this.nextCheckpoint = null;
            this.context.resetReasonAndFailureCounter();
            if (this.progress.getPercentComplete() != null && this.progress.getPercentComplete() < 100.0) {
                this.progress.incrementDocsProcessed(this.progress.getTotalDocs() - this.progress.getDocumentsProcessed());
            }
            if (this.lastCheckpoint != null) {
                long docsIndexed = this.progress.getDocumentsIndexed();
                long docsProcessed = this.progress.getDocumentsProcessed();
                long durationMs = System.currentTimeMillis() - this.lastCheckpoint.getTimestamp();
                ((TransformIndexerStats)this.getStats()).incrementCheckpointExponentialAverages(durationMs < 0L ? 0L : durationMs, docsIndexed, docsProcessed);
            }
            if (this.shouldAuditOnFinish(checkpoint)) {
                this.auditor.info(this.getJobId(), "Finished indexing for transform checkpoint [" + checkpoint + "].");
            }
            logger.debug("[{}] finished indexing for transform checkpoint [{}].", (Object)this.getJobId(), (Object)checkpoint);
            if (this.context.shouldStopAtCheckpoint()) {
                this.stop();
            }
            if (checkpoint - this.lastCheckpointCleanup > 100L) {
                this.cleanupOldCheckpoints(listener);
            } else {
                listener.onResponse(null);
            }
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    protected void afterFinishOrFailure() {
        this.finishIndexerThreadShutdown();
    }

    protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse searchResponse) {
        switch (this.runState) {
            case APPLY_RESULTS: {
                return this.processBuckets(searchResponse);
            }
            case IDENTIFY_CHANGES: {
                return this.processChangedBuckets(searchResponse);
            }
        }
        logger.warn("[{}] Encountered unexpected run state [{}]", (Object)this.getJobId(), (Object)this.runState);
        throw new IllegalStateException("Transform indexer job encountered an illegal state [" + (Object)((Object)this.runState) + "]");
    }

    public synchronized boolean maybeTriggerAsyncJob(long now) {
        if (this.context.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", (Object)this.getJobId());
            return false;
        }
        IndexerState indexerState = this.getState();
        if (IndexerState.INDEXING.equals((Object)indexerState) || IndexerState.STOPPING.equals((Object)indexerState)) {
            logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", (Object)this.getJobId(), (Object)indexerState);
            return false;
        }
        if (this.indexerThreadShuttingDown) {
            logger.debug("[{}] indexer thread is shutting down. Ignoring trigger.", (Object)this.getJobId());
            return false;
        }
        return super.maybeTriggerAsyncJob(now);
    }

    public void applyNewSettings(SettingsConfig newSettings) {
        this.auditor.info(this.transformConfig.getId(), "Transform settings have been updated.");
        logger.info("[{}] transform settings have been updated.", (Object)this.transformConfig.getId());
        float f = this.docsPerSecond = newSettings.getDocsPerSecond() != null ? newSettings.getDocsPerSecond().floatValue() : -1.0f;
        if (!Objects.equals(newSettings.getMaxPageSearchSize(), this.initialConfiguredPageSize)) {
            this.configurePageSize(newSettings.getMaxPageSearchSize());
        }
        this.rethrottle();
    }

    protected void onFailure(Exception exc) {
        this.startIndexerThreadShutdown();
        try {
            this.handleFailure(exc);
        }
        catch (Exception e) {
            logger.error((Message)new ParameterizedMessage("[{}] transform encountered an unexpected internal exception: ", (Object)this.getJobId()), (Throwable)e);
        }
    }

    protected void onStop() {
        this.auditor.info(this.transformConfig.getId(), "Transform has stopped.");
        logger.info("[{}] transform has stopped.", (Object)this.transformConfig.getId());
    }

    protected void onAbort() {
        this.auditor.info(this.transformConfig.getId(), "Received abort request, stopping transform.");
        logger.info("[{}] transform received abort request. Stopping indexer.", (Object)this.transformConfig.getId());
        this.context.shutdown();
    }

    protected void doSaveState(IndexerState indexerState, TransformIndexerPosition position, Runnable next) {
        if (this.context.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to save state and stats while failed.", (Object)this.getJobId());
            next.run();
            return;
        }
        if (indexerState.equals((Object)IndexerState.ABORTING)) {
            next.run();
            return;
        }
        Collection saveStateListenersAtTheMomentOfCalling = this.saveStateListeners.getAndSet(null);
        boolean shouldStopAtCheckpoint = this.context.shouldStopAtCheckpoint();
        if (shouldStopAtCheckpoint && this.initialRun() && indexerState.equals((Object)IndexerState.STARTED)) {
            indexerState = IndexerState.STOPPED;
            this.auditor.info(this.transformConfig.getId(), "Transform is no longer in the middle of a checkpoint, initiating stop.");
            logger.info("[{}] transform is no longer in the middle of a checkpoint, initiating stop.", (Object)this.transformConfig.getId());
        }
        if (!this.hasSourceChanged && !indexerState.equals((Object)IndexerState.STOPPED)) {
            if (saveStateListenersAtTheMomentOfCalling != null) {
                ActionListener.onResponse((Iterable)saveStateListenersAtTheMomentOfCalling, null);
            }
            next.run();
            return;
        }
        TransformTaskState taskState = this.context.getTaskState();
        if (indexerState.equals((Object)IndexerState.STARTED) && this.context.getCheckpoint() == 1L && !this.isContinuous()) {
            indexerState = IndexerState.STOPPED;
            this.auditor.info(this.transformConfig.getId(), "Transform finished indexing all data, initiating stop");
            logger.info("[{}] transform finished indexing all data, initiating stop.", (Object)this.transformConfig.getId());
        }
        if (indexerState.equals((Object)IndexerState.STOPPED)) {
            shouldStopAtCheckpoint = false;
            taskState = TransformTaskState.STOPPED;
        }
        TransformState state = new TransformState(taskState, indexerState, position, this.context.getCheckpoint(), this.context.getStateReason(), this.getProgress(), null, shouldStopAtCheckpoint);
        logger.debug("[{}] updating persistent state of transform to [{}].", (Object)this.transformConfig.getId(), (Object)state.toString());
        this.persistStateWithAutoStop(state, (ActionListener<Void>)ActionListener.wrap(r -> {
            try {
                if (saveStateListenersAtTheMomentOfCalling != null) {
                    ActionListener.onResponse((Iterable)saveStateListenersAtTheMomentOfCalling, (Object)r);
                }
            }
            catch (Exception onResponseException) {
                String msg = LoggerMessageFormat.format((String)"[{}] failed notifying saveState listeners, ignoring.", (String)this.getJobId(), (Object[])new Object[0]);
                logger.warn(msg, (Throwable)onResponseException);
            }
            finally {
                next.run();
            }
        }, e -> {
            try {
                if (saveStateListenersAtTheMomentOfCalling != null) {
                    ActionListener.onFailure((Iterable)saveStateListenersAtTheMomentOfCalling, (Exception)e);
                }
            }
            catch (Exception onFailureException) {
                String msg = LoggerMessageFormat.format((String)"[{}] failed notifying saveState listeners, ignoring.", (String)this.getJobId(), (Object[])new Object[0]);
                logger.warn(msg, (Throwable)onFailureException);
            }
            finally {
                next.run();
            }
        }));
    }

    private void persistStateWithAutoStop(TransformState state, ActionListener<Void> listener) {
        this.persistState(state, (ActionListener<Void>)ActionListener.runBefore(listener, () -> {
            if (state.getTaskState().equals((Object)TransformTaskState.STOPPED)) {
                this.context.shutdown();
            }
        }));
    }

    final void setStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) {
        assert (Thread.currentThread().getName().contains("generic"));
        try {
            if (!this.addSetStopAtCheckpointListener(shouldStopAtCheckpoint, shouldStopAtCheckpointListener)) {
                shouldStopAtCheckpointListener.onResponse(null);
            }
        }
        catch (InterruptedException e) {
            logger.error((Message)new ParameterizedMessage("[{}] Interrupt waiting ({}s) for transform state to be stored.", (Object)this.getJobId(), (Object)5), (Throwable)e);
            shouldStopAtCheckpointListener.onFailure((Exception)new RuntimeException("Timed out (5s) waiting for transform state to be stored.", e));
        }
        catch (Exception e) {
            logger.error((Message)new ParameterizedMessage("[{}] failed to persist transform state.", (Object)this.getJobId()), (Throwable)e);
            shouldStopAtCheckpointListener.onFailure(e);
        }
    }

    private synchronized boolean addSetStopAtCheckpointListener(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) throws InterruptedException {
        if (this.indexerThreadShuttingDown) {
            this.context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
            this.saveStateRequestedDuringIndexerThreadShutdown = true;
            return false;
        }
        IndexerState state = this.getState();
        if (state == IndexerState.STARTED && this.context.shouldStopAtCheckpoint() != shouldStopAtCheckpoint) {
            IndexerState newIndexerState = IndexerState.STARTED;
            TransformTaskState newtaskState = this.context.getTaskState();
            if (shouldStopAtCheckpoint && this.initialRun()) {
                newIndexerState = IndexerState.STOPPED;
                newtaskState = TransformTaskState.STOPPED;
                logger.debug("[{}] transform is at a checkpoint, initiating stop.", (Object)this.transformConfig.getId());
            } else {
                this.context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
            }
            TransformState newTransformState = new TransformState(newtaskState, newIndexerState, (TransformIndexerPosition)this.getPosition(), this.context.getCheckpoint(), this.context.getStateReason(), this.getProgress(), null, newIndexerState == IndexerState.STARTED);
            CountDownLatch latch = new CountDownLatch(1);
            logger.debug("[{}] persisting stop at checkpoint", (Object)this.getJobId());
            this.persistState(newTransformState, (ActionListener<Void>)ActionListener.wrap(() -> latch.countDown()));
            if (!latch.await(5L, TimeUnit.SECONDS)) {
                logger.error((Message)new ParameterizedMessage("[{}] Timed out ({}s) waiting for transform state to be stored.", (Object)this.getJobId(), (Object)5));
            }
            if (newtaskState.equals((Object)TransformTaskState.STOPPED)) {
                this.context.shutdown();
            }
            return false;
        }
        if (state != IndexerState.INDEXING) {
            return false;
        }
        if (this.saveStateListeners.updateAndGet(currentListeners -> {
            if (this.getState() != IndexerState.INDEXING) {
                return null;
            }
            if (currentListeners == null) {
                if (this.context.shouldStopAtCheckpoint() == shouldStopAtCheckpoint) {
                    return null;
                }
                return Collections.singletonList(shouldStopAtCheckpointListener);
            }
            return CollectionUtils.appendToCopy((Collection)currentListeners, (Object)shouldStopAtCheckpointListener);
        }) == null) {
            return false;
        }
        this.context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
        this.runSearchImmediately();
        return true;
    }

    synchronized void stopAndMaybeSaveState() {
        this.onStop();
        IndexerState state = this.stop();
        if (this.indexerThreadShuttingDown) {
            this.saveStateRequestedDuringIndexerThreadShutdown = true;
        } else if (state == IndexerState.STOPPED) {
            this.doSaveState(IndexerState.STOPPED, (TransformIndexerPosition)this.getPosition(), () -> {});
        }
    }

    void handleFailure(Exception e) {
        ElasticsearchException elasticsearchException;
        logger.warn((Message)new ParameterizedMessage("[{}] transform encountered an exception: ", (Object)this.getJobId()), (Throwable)e);
        Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause((Throwable)e);
        if (unwrappedException instanceof CircuitBreakingException) {
            this.handleCircuitBreakingException((CircuitBreakingException)unwrappedException);
            return;
        }
        if (unwrappedException instanceof ScriptException) {
            this.handleScriptException((ScriptException)unwrappedException);
            return;
        }
        if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException)((Object)unwrappedException)).isIrrecoverable()) {
            this.handleIrrecoverableBulkIndexingException((BulkIndexingException)((Object)unwrappedException));
            return;
        }
        if (unwrappedException instanceof ElasticsearchException && ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains((elasticsearchException = (ElasticsearchException)unwrappedException).status())) {
            this.failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage());
            return;
        }
        if (unwrappedException instanceof IllegalArgumentException) {
            this.failIndexer("task encountered irrecoverable failure: " + e.getMessage());
            return;
        }
        if (this.context.getAndIncrementFailureCount() > this.context.getNumFailureRetries()) {
            this.failIndexer("task encountered more than " + this.context.getNumFailureRetries() + " failures; latest failure: " + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException));
            return;
        }
        if (!e.getMessage().equals(this.lastAuditedExceptionMessage)) {
            String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
            this.auditor.warning(this.getJobId(), "Transform encountered an exception: " + message + "; Will attempt again at next scheduled trigger.");
            this.lastAuditedExceptionMessage = message;
        }
    }

    private void cleanupOldCheckpoints(ActionListener<Void> listener) {
        long now = this.getTimeNanos() * 1000L;
        long checkpointLowerBound = this.context.getCheckpoint() - 10L;
        long lowerBoundEpochMs = now - 864000000L;
        if (checkpointLowerBound > 0L && lowerBoundEpochMs > 0L) {
            this.transformsConfigManager.deleteOldCheckpoints(this.transformConfig.getId(), checkpointLowerBound, lowerBoundEpochMs, (ActionListener<Long>)ActionListener.wrap(deletes -> {
                logger.debug("[{}] deleted [{}] outdated checkpoints", (Object)this.getJobId(), deletes);
                listener.onResponse(null);
                this.lastCheckpointCleanup = this.context.getCheckpoint();
            }, e -> {
                logger.warn((Message)new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", (Object)this.getJobId()), (Throwable)e);
                this.auditor.warning(this.getJobId(), "Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage());
                listener.onResponse(null);
            }));
        } else {
            logger.debug("[{}] checked for outdated checkpoints", (Object)this.getJobId());
            listener.onResponse(null);
        }
    }

    private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
        this.checkpointProvider.sourceHasChanged(this.getLastCheckpoint(), (ActionListener<Boolean>)ActionListener.wrap(hasChanged -> {
            logger.trace("[{}] change detected [{}].", (Object)this.getJobId(), hasChanged);
            hasChangedListener.onResponse(hasChanged);
        }, e -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to detect changes for transform. Skipping update till next check.", (Object)this.getJobId()), (Throwable)e);
            this.auditor.warning(this.getJobId(), "Failed to detect changes for transform, skipping update till next check. Exception: " + e.getMessage());
            hasChangedListener.onResponse((Object)false);
        }));
    }

    private IterationResult<TransformIndexerPosition> processBuckets(SearchResponse searchResponse) {
        Tuple<Stream<IndexRequest>, Map<String, Object>> indexRequestStreamAndCursor = this.function.processSearchResponse(searchResponse, this.getConfig().getDestination().getIndex(), this.getConfig().getDestination().getPipeline(), this.getFieldMappings(), (TransformIndexerStats)this.getStats(), this.progress);
        if (indexRequestStreamAndCursor == null || indexRequestStreamAndCursor.v1() == null) {
            if (this.nextCheckpoint.getCheckpoint() == 1L || !this.isContinuous() || !this.changeCollector.queryForChanges()) {
                return new IterationResult(Stream.empty(), null, true);
            }
            this.changeCollector.clear();
            this.runState = RunState.IDENTIFY_CHANGES;
            return new IterationResult(Stream.empty(), (Object)new TransformIndexerPosition(null, this.nextChangeCollectorBucketPosition), false);
        }
        Stream indexRequestStream = (Stream)indexRequestStreamAndCursor.v1();
        TransformIndexerPosition oldPosition = (TransformIndexerPosition)this.getPosition();
        TransformIndexerPosition newPosition = new TransformIndexerPosition((Map)indexRequestStreamAndCursor.v2(), oldPosition != null ? ((TransformIndexerPosition)this.getPosition()).getBucketsPosition() : null);
        return new IterationResult(indexRequestStream, (Object)newPosition, false);
    }

    private IterationResult<TransformIndexerPosition> processChangedBuckets(SearchResponse searchResponse) {
        this.nextChangeCollectorBucketPosition = this.changeCollector.processSearchResponse(searchResponse);
        if (this.nextChangeCollectorBucketPosition == null) {
            this.changeCollector.clear();
            return new IterationResult(Stream.empty(), null, true);
        }
        this.runState = RunState.APPLY_RESULTS;
        return new IterationResult(Stream.empty(), (Object)((TransformIndexerPosition)this.getPosition()), false);
    }

    protected QueryBuilder buildFilterQuery() {
        assert (this.nextCheckpoint != null);
        QueryBuilder queryBuilder = this.getConfig().getSource().getQueryConfig().getQuery();
        TransformConfig config = this.getConfig();
        if (this.isContinuous()) {
            BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder);
            if (this.lastCheckpoint != null) {
                filteredQuery.filter(config.getSyncConfig().getRangeQuery(this.lastCheckpoint, this.nextCheckpoint));
            } else {
                filteredQuery.filter(config.getSyncConfig().getRangeQuery(this.nextCheckpoint));
            }
            return filteredQuery;
        }
        return queryBuilder;
    }

    protected Tuple<String, SearchRequest> buildSearchRequest() {
        assert (this.nextCheckpoint != null);
        switch (this.runState) {
            case APPLY_RESULTS: {
                return new Tuple((Object)"apply_results", (Object)this.buildQueryToUpdateDestinationIndex());
            }
            case IDENTIFY_CHANGES: {
                return new Tuple((Object)"identify_changes", (Object)this.buildQueryToFindChanges());
            }
        }
        logger.warn("Encountered unexpected run state [" + (Object)((Object)this.runState) + "]");
        throw new IllegalStateException("Transform indexer job encountered an illegal state [" + (Object)((Object)this.runState) + "]");
    }

    private SearchRequest buildQueryToFindChanges() {
        assert (this.isContinuous());
        TransformIndexerPosition position = (TransformIndexerPosition)this.getPosition();
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().runtimeMappings(this.getConfig().getSource().getRuntimeMappings());
        SearchRequest request = new SearchRequest(this.getConfig().getSource().getIndex());
        request.allowPartialSearchResults(false).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        this.changeCollector.buildChangesQuery(sourceBuilder, position != null ? position.getBucketsPosition() : null, this.pageSize);
        QueryBuilder queryBuilder = this.getConfig().getSource().getQueryConfig().getQuery();
        TransformConfig config = this.getConfig();
        BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder).filter(config.getSyncConfig().getRangeQuery(this.lastCheckpoint, this.nextCheckpoint));
        sourceBuilder.query((QueryBuilder)filteredQuery);
        logger.debug("[{}] Querying {} for changes: {}", (Object)this.getJobId(), (Object)request.indices(), (Object)sourceBuilder);
        return request.source(sourceBuilder);
    }

    private SearchRequest buildQueryToUpdateDestinationIndex() {
        TransformIndexerPosition position = (TransformIndexerPosition)this.getPosition();
        TransformConfig config = this.getConfig();
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().runtimeMappings(this.getConfig().getSource().getRuntimeMappings());
        this.function.buildSearchQuery(sourceBuilder, position != null ? position.getIndexerPosition() : null, this.pageSize);
        SearchRequest request = new SearchRequest();
        QueryBuilder queryBuilder = config.getSource().getQueryConfig().getQuery();
        if (this.isContinuous()) {
            BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder).filter(config.getSyncConfig().getRangeQuery(this.nextCheckpoint));
            if (this.nextCheckpoint.getCheckpoint() > 1L && this.changeCollector != null) {
                QueryBuilder filter = this.changeCollector.buildFilterQuery(this.lastCheckpoint, this.nextCheckpoint);
                if (filter != null) {
                    filteredQuery.filter(filter);
                }
                request.indices(this.getConfig().getSource().getIndex());
            } else {
                request.indices(this.getConfig().getSource().getIndex());
            }
            queryBuilder = filteredQuery;
        } else {
            request.indices(this.getConfig().getSource().getIndex());
        }
        sourceBuilder.query(queryBuilder);
        logger.debug("[{}] Querying {} for data: {}", (Object)this.getJobId(), (Object)request.indices(), (Object)sourceBuilder);
        return request.source(sourceBuilder).allowPartialSearchResults(false).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
    }

    private void handleCircuitBreakingException(CircuitBreakingException circuitBreakingException) {
        double reducingFactor = Math.min((double)circuitBreakingException.getByteLimit() / (double)circuitBreakingException.getBytesWanted(), 1.0 - Math.log10(this.pageSize) * 0.1);
        int newPageSize = (int)Math.round(reducingFactor * (double)this.pageSize);
        if (newPageSize < 10) {
            String message = TransformMessages.getMessage((String)"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, please simplify job or increase heap size on data nodes.", (Object[])new Object[]{this.pageSize});
            this.failIndexer(message);
        } else {
            String message = TransformMessages.getMessage((String)"Insufficient memory for search, reducing number of buckets per search from [{0}] to [{1}]", (Object[])new Object[]{this.pageSize, newPageSize});
            this.auditor.info(this.getJobId(), message);
            logger.info("[{}] {}", (Object)this.getJobId(), (Object)message);
            this.pageSize = newPageSize;
        }
    }

    private void handleScriptException(ScriptException scriptException) {
        String message = TransformMessages.getMessage((String)"Failed to execute script with error: [{0}], stack trace: {1}", (Object[])new Object[]{scriptException.getDetailedMessage(), scriptException.getScriptStack()});
        this.failIndexer(message);
    }

    private void handleIrrecoverableBulkIndexingException(BulkIndexingException bulkIndexingException) {
        String message = TransformMessages.getMessage((String)"Failed to index documents into destination index due to permanent error: [{0}]", (Object[])new Object[]{bulkIndexingException.getDetailedMessage()});
        this.failIndexer(message);
    }

    protected void failIndexer(String failureMessage) {
        this.context.markAsFailed(failureMessage);
    }

    protected boolean shouldAuditOnFinish(long completedCheckpoint) {
        if (++this.logCount % this.logEvery != 0L) {
            return false;
        }
        if (completedCheckpoint == 0L) {
            return true;
        }
        int log10Checkpoint = (int)Math.floor(Math.log10(completedCheckpoint));
        this.logEvery = log10Checkpoint >= 3 ? 1000L : (long)((int)Math.pow(10.0, log10Checkpoint));
        this.logCount = 0L;
        return true;
    }

    private RunState determineRunStateAtStart() {
        if (this.nextCheckpoint.getCheckpoint() == 1L || !this.isContinuous()) {
            return RunState.APPLY_RESULTS;
        }
        if (this.changeCollector == null || !this.changeCollector.queryForChanges()) {
            return RunState.APPLY_RESULTS;
        }
        return RunState.IDENTIFY_CHANGES;
    }

    private void configurePageSize(Integer newPageSize) {
        this.initialConfiguredPageSize = newPageSize;
        this.pageSize = this.initialConfiguredPageSize != null && this.initialConfiguredPageSize > 0 ? this.initialConfiguredPageSize.intValue() : this.function.getInitialPageSize();
    }

    private synchronized void startIndexerThreadShutdown() {
        this.indexerThreadShuttingDown = true;
        this.saveStateRequestedDuringIndexerThreadShutdown = false;
    }

    private synchronized void finishIndexerThreadShutdown() {
        this.indexerThreadShuttingDown = false;
        if (this.saveStateRequestedDuringIndexerThreadShutdown) {
            if (this.context.shouldStopAtCheckpoint() && this.nextCheckpoint == null) {
                this.stop();
            }
            this.doSaveState(this.getState(), (TransformIndexerPosition)this.getPosition(), () -> {});
        }
    }

    private static enum RunState {
        APPLY_RESULTS,
        IDENTIFY_CHANGES;

    }

    static class TransformConfigLostOnReloadException
    extends ResourceNotFoundException {
        TransformConfigLostOnReloadException(String msg, Throwable cause, Object ... args) {
            super(msg, cause, args);
        }
    }
}

