package org.apache.zeppelin.scheduler;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.scheduler.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler.class */
public class RemoteScheduler implements Scheduler {
    private ExecutorService executor;
    private SchedulerListener listener;
    private String name;
    private int maxConcurrency;
    private final String noteId;
    private RemoteInterpreterProcess interpreterProcess;
    Logger logger = LoggerFactory.getLogger((Class<?>) RemoteScheduler.class);
    List<Job> queue = new LinkedList();
    List<Job> running = new LinkedList();
    boolean terminate = false;

    /* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler$JobRunner.class */
    private class JobRunner implements Runnable, JobListener {
        private Scheduler scheduler;
        private Job job;
        private boolean jobExecuted = false;
        boolean jobSubmittedRemotely = false;

        public JobRunner(Scheduler scheduler, Job job) {
            this.scheduler = scheduler;
            this.job = job;
        }

        public boolean isJobSubmittedInRemote() {
            return this.jobSubmittedRemotely;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.job.isAborted()) {
                this.job.setStatus(Job.Status.ABORT);
                this.job.aborted = false;
                synchronized (RemoteScheduler.this.queue) {
                    RemoteScheduler.this.running.remove(this.job);
                    RemoteScheduler.this.queue.notify();
                }
                this.jobSubmittedRemotely = true;
                return;
            }
            JobStatusPoller jobStatusPoller = new JobStatusPoller(1500L, 100L, 500L, this.job, this);
            jobStatusPoller.start();
            if (RemoteScheduler.this.listener != null) {
                RemoteScheduler.this.listener.jobStarted(this.scheduler, this.job);
            }
            this.job.run();
            this.jobExecuted = true;
            this.jobSubmittedRemotely = true;
            jobStatusPoller.shutdown();
            try {
                jobStatusPoller.join();
            } catch (InterruptedException e) {
                RemoteScheduler.this.logger.error("JobStatusPoller interrupted", (Throwable) e);
            }
            Job.Status status = jobStatusPoller.getStatus();
            Object obj = this.job.getReturn();
            if (obj != null && (obj instanceof InterpreterResult) && ((InterpreterResult) obj).code() == InterpreterResult.Code.ERROR) {
                status = Job.Status.ERROR;
            }
            if (this.job.getException() != null) {
                status = Job.Status.ERROR;
            }
            this.job.setStatus(status);
            if (RemoteScheduler.this.listener != null) {
                RemoteScheduler.this.listener.jobFinished(this.scheduler, this.job);
            }
            this.job.aborted = false;
            synchronized (RemoteScheduler.this.queue) {
                RemoteScheduler.this.running.remove(this.job);
                RemoteScheduler.this.queue.notify();
            }
        }

        @Override // org.apache.zeppelin.scheduler.JobListener
        public void onProgressUpdate(Job job, int i) {
        }

        @Override // org.apache.zeppelin.scheduler.JobListener
        public void beforeStatusChange(Job job, Job.Status status, Job.Status status2) {
        }

        @Override // org.apache.zeppelin.scheduler.JobListener
        public void afterStatusChange(Job job, Job.Status status, Job.Status status2) {
            if (status2 != null) {
                if (this.jobExecuted) {
                    this.jobSubmittedRemotely = true;
                } else {
                    if (status2 == Job.Status.FINISHED || status2 == Job.Status.ABORT || status2 == Job.Status.ERROR) {
                        return;
                    }
                    if (status2 == Job.Status.RUNNING) {
                        this.jobSubmittedRemotely = true;
                    }
                }
                if (job.getStatus() != status2) {
                    job.setStatus(status2);
                    return;
                }
                return;
            }
            if (this.jobExecuted) {
                this.jobSubmittedRemotely = true;
                Object obj = job.getReturn();
                if (job.isAborted()) {
                    job.setStatus(Job.Status.ABORT);
                    return;
                }
                if (job.getException() != null) {
                    job.setStatus(Job.Status.ERROR);
                } else if (obj != null && (obj instanceof InterpreterResult) && ((InterpreterResult) obj).code() == InterpreterResult.Code.ERROR) {
                    job.setStatus(Job.Status.ERROR);
                } else {
                    job.setStatus(Job.Status.FINISHED);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/zeppelin/scheduler/RemoteScheduler$JobStatusPoller.class */
    private class JobStatusPoller extends Thread {
        private long initialPeriodMsec;
        private long initialPeriodCheckIntervalMsec;
        private long checkIntervalMsec;
        private boolean terminate = false;
        private JobListener listener;
        private Job job;
        Job.Status lastStatus;

        public JobStatusPoller(long j, long j2, long j3, Job job, JobListener jobListener) {
            this.initialPeriodMsec = j;
            this.initialPeriodCheckIntervalMsec = j2;
            this.checkIntervalMsec = j3;
            this.job = job;
            this.listener = jobListener;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Job.Status status;
            long currentTimeMillis = System.currentTimeMillis();
            while (!this.terminate) {
                long j = System.currentTimeMillis() - currentTimeMillis < this.initialPeriodMsec ? this.initialPeriodCheckIntervalMsec : this.checkIntervalMsec;
                synchronized (this) {
                    try {
                        wait(j);
                    } catch (InterruptedException e) {
                        RemoteScheduler.this.logger.error("Exception in RemoteScheduler while run this.wait", (Throwable) e);
                    }
                }
                if (this.terminate || ((status = getStatus()) != null && status != Job.Status.READY && status != Job.Status.PENDING)) {
                    break;
                }
            }
            this.terminate = true;
        }

        public void shutdown() {
            this.terminate = true;
            synchronized (this) {
                notify();
            }
        }

        private Job.Status getLastStatus() {
            if (!this.terminate) {
                return this.lastStatus == null ? Job.Status.FINISHED : this.lastStatus;
            }
            if ((this.lastStatus == Job.Status.FINISHED || this.lastStatus == Job.Status.ERROR || this.lastStatus == Job.Status.ABORT) && this.lastStatus != null) {
                return this.lastStatus;
            }
            return Job.Status.FINISHED;
        }

        public synchronized Job.Status getStatus() {
            if (RemoteScheduler.this.interpreterProcess.referenceCount() <= 0) {
                return getLastStatus();
            }
            try {
                RemoteInterpreterService.Client client = RemoteScheduler.this.interpreterProcess.getClient();
                try {
                    try {
                        String status = client.getStatus(RemoteScheduler.this.noteId, this.job.getId());
                        if ("Unknown".equals(status)) {
                            this.listener.afterStatusChange(this.job, null, null);
                            Job.Status status2 = this.job.getStatus();
                            RemoteScheduler.this.interpreterProcess.releaseClient(client, false);
                            return status2;
                        }
                        Job.Status valueOf = Job.Status.valueOf(status);
                        this.lastStatus = valueOf;
                        this.listener.afterStatusChange(this.job, null, valueOf);
                        RemoteScheduler.this.interpreterProcess.releaseClient(client, false);
                        return valueOf;
                    } catch (TException e) {
                        RemoteScheduler.this.logger.error("Can't get status information", (Throwable) e);
                        this.lastStatus = Job.Status.ERROR;
                        Job.Status status3 = Job.Status.ERROR;
                        RemoteScheduler.this.interpreterProcess.releaseClient(client, true);
                        return status3;
                    } catch (Exception e2) {
                        RemoteScheduler.this.logger.error("Unknown status", (Throwable) e2);
                        this.lastStatus = Job.Status.ERROR;
                        Job.Status status4 = Job.Status.ERROR;
                        RemoteScheduler.this.interpreterProcess.releaseClient(client, false);
                        return status4;
                    }
                } catch (Throwable th) {
                    RemoteScheduler.this.interpreterProcess.releaseClient(client, false);
                    throw th;
                }
            } catch (Exception e3) {
                RemoteScheduler.this.logger.error("Can't get status information", (Throwable) e3);
                this.lastStatus = Job.Status.ERROR;
                return Job.Status.ERROR;
            }
        }
    }

    public RemoteScheduler(String str, ExecutorService executorService, String str2, RemoteInterpreterProcess remoteInterpreterProcess, SchedulerListener schedulerListener, int i) {
        this.name = str;
        this.executor = executorService;
        this.listener = schedulerListener;
        this.noteId = str2;
        this.interpreterProcess = remoteInterpreterProcess;
        this.maxConcurrency = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.terminate) {
            synchronized (this.queue) {
                if (this.running.size() >= this.maxConcurrency || this.queue.isEmpty()) {
                    try {
                        this.queue.wait(500L);
                    } catch (InterruptedException e) {
                        this.logger.error("Exception in RemoteScheduler while run queue.wait", (Throwable) e);
                    }
                } else {
                    Job remove = this.queue.remove(0);
                    this.running.add(remove);
                    JobRunner jobRunner = new JobRunner(this, remove);
                    this.executor.execute(jobRunner);
                    while (!jobRunner.isJobSubmittedInRemote()) {
                        synchronized (this.queue) {
                            try {
                                this.queue.wait(500L);
                            } catch (InterruptedException e2) {
                                this.logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote queue.wait", (Throwable) e2);
                            }
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public String getName() {
        return this.name;
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public Collection<Job> getJobsWaiting() {
        LinkedList linkedList = new LinkedList();
        synchronized (this.queue) {
            Iterator<Job> it = this.queue.iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
        }
        return linkedList;
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public Job removeFromWaitingQueue(String str) {
        synchronized (this.queue) {
            Iterator<Job> it = this.queue.iterator();
            while (it.hasNext()) {
                Job next = it.next();
                if (next.getId().equals(str)) {
                    it.remove();
                    return next;
                }
            }
            return null;
        }
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public Collection<Job> getJobsRunning() {
        LinkedList linkedList = new LinkedList();
        synchronized (this.queue) {
            Iterator<Job> it = this.running.iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
        }
        return linkedList;
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public void submit(Job job) {
        if (this.terminate) {
            throw new RuntimeException("Scheduler already terminated");
        }
        job.setStatus(Job.Status.PENDING);
        synchronized (this.queue) {
            this.queue.add(job);
            this.queue.notify();
        }
    }

    public void setMaxConcurrency(int i) {
        this.maxConcurrency = i;
        synchronized (this.queue) {
            this.queue.notify();
        }
    }

    @Override // org.apache.zeppelin.scheduler.Scheduler
    public void stop() {
        this.terminate = true;
        synchronized (this.queue) {
            this.queue.notify();
        }
    }
}
