package org.apache.hadoop.hbase.mob.mapreduce;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/mob/mapreduce/SweepJob.class */
public class SweepJob {
    private final FileSystem fs;
    private final Configuration conf;
    private static final Log LOG = LogFactory.getLog(SweepJob.class);
    static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
    static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
    static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
    static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
    static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
    static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
    static final String WORKING_ALLNAMES_DIR = "all";
    static final String WORKING_VISITED_DIR = "visited";
    public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
    public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
    protected static final long ONE_DAY = 86400000;
    private long compactionStartTime = EnvironmentEdgeManager.currentTime();
    public static final String CREDENTIALS_LOCATION = "credentials_location";
    private CacheConfig cacheConfig;
    static final int SCAN_CACHING = 10000;
    private TableLockManager tableLockManager;

    /* loaded from: input_file:org/apache/hadoop/hbase/mob/mapreduce/SweepJob$DummyMobAbortable.class */
    public static class DummyMobAbortable implements Abortable {
        private boolean abort = false;

        @Override // org.apache.hadoop.hbase.Abortable
        public void abort(String str, Throwable th) {
            this.abort = true;
        }

        @Override // org.apache.hadoop.hbase.Abortable
        public boolean isAborted() {
            return this.abort;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/mob/mapreduce/SweepJob$IndexedResult.class */
    public static class IndexedResult implements Comparable<IndexedResult> {
        private int index;
        private String value;

        public IndexedResult(int i, String str) {
            this.index = i;
            this.value = str;
        }

        public int getIndex() {
            return this.index;
        }

        public String getValue() {
            return this.value;
        }

        @Override // java.lang.Comparable
        public int compareTo(IndexedResult indexedResult) {
            if (this.value == null && indexedResult.getValue() == null) {
                return 0;
            }
            if (indexedResult.value == null) {
                return 1;
            }
            if (this.value == null) {
                return -1;
            }
            return this.value.compareTo(indexedResult.value);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && (obj instanceof IndexedResult) && compareTo((IndexedResult) obj) == 0;
        }

        public int hashCode() {
            return this.value.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/mob/mapreduce/SweepJob$MergeSortReader.class */
    public static class MergeSortReader {
        private List<SequenceFile.Reader> readers = new ArrayList();
        private PriorityQueue<IndexedResult> results = new PriorityQueue<>();

        public MergeSortReader(FileSystem fileSystem, Configuration configuration, Path path) throws IOException {
            if (fileSystem.exists(path)) {
                int i = 0;
                for (FileStatus fileStatus : fileSystem.listStatus(path)) {
                    if (fileStatus.isFile()) {
                        SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fileStatus.getPath(), configuration);
                        String str = (String) reader.next((String) null);
                        if (str != null) {
                            this.results.add(new IndexedResult(i, str));
                            this.readers.add(reader);
                            i++;
                        }
                    }
                }
            }
        }

        public String next() throws IOException {
            IndexedResult poll = this.results.poll();
            if (poll == null) {
                return null;
            }
            String str = (String) this.readers.get(poll.getIndex()).next((String) null);
            if (str != null) {
                this.results.add(new IndexedResult(poll.getIndex(), str));
            }
            return poll.getValue();
        }

        public void close() {
            for (SequenceFile.Reader reader : this.readers) {
                if (reader != null) {
                    IOUtils.closeStream(reader);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/mob/mapreduce/SweepJob$SweepCounter.class */
    public enum SweepCounter {
        INPUT_FILE_COUNT,
        FILE_TO_BE_MERGE_OR_CLEAN,
        FILE_AFTER_MERGE_OR_CLEAN,
        RECORDS_UPDATED
    }

    public SweepJob(Configuration configuration, FileSystem fileSystem) {
        this.conf = configuration;
        this.fs = fileSystem;
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
        this.cacheConfig = new CacheConfig(configuration2);
    }

    static ServerName getCurrentServerName(Configuration configuration) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(configuration.get("hbase.regionserver.ipc.address", Strings.domainNamePointerToHostName(DNS.getDefaultHost(configuration.get("hbase.regionserver.dns.interface", "default"), configuration.get("hbase.regionserver.dns.nameserver", "default")))), configuration.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT));
        if (inetSocketAddress.getAddress() == null) {
            throw new IllegalArgumentException("Failed resolve of " + inetSocketAddress);
        }
        return ServerName.valueOf(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), EnvironmentEdgeManager.currentTime());
    }

    /* JADX WARN: Finally extract failed */
    public int sweep(TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException, ClassNotFoundException, InterruptedException, KeeperException {
        Configuration configuration = new Configuration(this.conf);
        String shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
        FileStatus[] listStatus = this.fs.listStatus(new Path(configuration.get(HConstants.HBASE_DIR)));
        if (listStatus.length <= 0) {
            LOG.error("The target HBase doesn't exist");
            throw new IOException("The target HBase doesn't exist");
        }
        if (!listStatus[0].getOwner().equals(shortUserName)) {
            String str = "The current user[" + shortUserName + "] doesn't have hbase root credentials. Please make sure the user is the root of the target HBase";
            LOG.error(str);
            throw new IOException(str);
        }
        String nameAsString = hColumnDescriptor.getNameAsString();
        String str2 = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
        ZooKeeperWatcher zooKeeperWatcher = new ZooKeeperWatcher(configuration, str2, new DummyMobAbortable());
        try {
            ServerName currentServerName = getCurrentServerName(configuration);
            this.tableLockManager = TableLockManager.createTableLockManager(configuration, zooKeeperWatcher, currentServerName);
            TableName tableLockName = MobUtils.getTableLockName(tableName);
            TableLockManager.TableLock writeLock = this.tableLockManager.writeLock(tableLockName, "Run sweep tool");
            String nameAsString2 = tableName.getNameAsString();
            try {
                writeLock.acquire();
                try {
                    Scan scan = new Scan();
                    scan.addFamily(hColumnDescriptor.getName());
                    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE.booleanValue()));
                    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE.booleanValue()));
                    scan.setCaching(10000);
                    scan.setCacheBlocks(false);
                    scan.setMaxVersions(hColumnDescriptor.getMaxVersions());
                    configuration.set("io.serializations", JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
                    configuration.set(SWEEP_JOB_ID, str2);
                    configuration.set(SWEEP_JOB_SERVERNAME, currentServerName.toString());
                    configuration.set(SWEEP_JOB_TABLE_NODE, ZKUtil.joinZNode(zooKeeperWatcher.tableLockZNode, tableLockName.getNameAsString()));
                    Job prepareJob = prepareJob(tableName, nameAsString, scan, configuration);
                    prepareJob.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, nameAsString);
                    prepareJob.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, this.compactionStartTime);
                    prepareJob.setPartitionerClass(MobFilePathHashPartitioner.class);
                    submit(prepareJob, tableName, nameAsString);
                    if (!prepareJob.waitForCompletion(true)) {
                        System.err.println("Job was not successful");
                        try {
                            cleanup(prepareJob, tableName, nameAsString);
                            try {
                                writeLock.release();
                            } catch (IOException e) {
                                LOG.error("Failed to release the table lock " + nameAsString2, e);
                            }
                            zooKeeperWatcher.close();
                            return 4;
                        } catch (Throwable th) {
                            throw th;
                        }
                    }
                    removeUnusedFiles(prepareJob, tableName, hColumnDescriptor);
                    try {
                        cleanup(prepareJob, tableName, nameAsString);
                        try {
                            writeLock.release();
                        } catch (IOException e2) {
                            LOG.error("Failed to release the table lock " + nameAsString2, e2);
                        }
                        zooKeeperWatcher.close();
                        return 0;
                    } finally {
                        try {
                            writeLock.release();
                        } catch (IOException e3) {
                            LOG.error("Failed to release the table lock " + nameAsString2, e3);
                        }
                    }
                } catch (Throwable th2) {
                    try {
                        cleanup(null, tableName, nameAsString);
                        try {
                            writeLock.release();
                        } catch (IOException e4) {
                            LOG.error("Failed to release the table lock " + nameAsString2, e4);
                        }
                        throw th2;
                    } finally {
                        try {
                            writeLock.release();
                        } catch (IOException e5) {
                            LOG.error("Failed to release the table lock " + nameAsString2, e5);
                        }
                    }
                }
            } catch (Exception e6) {
                LOG.warn("Can not lock the table " + nameAsString2 + ". The major compaction in HBase may be in-progress or another sweep job is running. Please re-run the job.");
                zooKeeperWatcher.close();
                return 3;
            }
        } catch (Throwable th3) {
            zooKeeperWatcher.close();
            throw th3;
        }
        zooKeeperWatcher.close();
        throw th3;
    }

    private Job prepareJob(TableName tableName, String str, Scan scan, Configuration configuration) throws IOException {
        Job job = Job.getInstance(configuration);
        job.setJarByClass(SweepMapper.class);
        TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan, (Class<? extends TableMapper>) SweepMapper.class, (Class<?>) Text.class, (Class<?>) Writable.class, job);
        job.setInputFormatClass(TableInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KeyValue.class);
        job.setReducerClass(SweepReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setJobName(getCustomJobName(getClass().getSimpleName(), tableName, str));
        if (StringUtils.isNotEmpty(configuration.get("credentials_location"))) {
            job.getCredentials().addAll(Credentials.readTokenStorageFile(new File(configuration.get("credentials_location")), configuration));
        }
        return job;
    }

    private static String getCustomJobName(String str, TableName tableName, String str2) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        sb.append('-').append(SweepMapper.class.getSimpleName());
        sb.append('-').append(SweepReducer.class.getSimpleName());
        sb.append('-').append(tableName.getNamespaceAsString());
        sb.append('-').append(tableName.getQualifierAsString());
        sb.append('-').append(str2);
        return sb.toString();
    }

    private void submit(Job job, TableName tableName, String str) throws IOException {
        Path compactionWorkingPath = MobUtils.getCompactionWorkingPath(new Path(new Path(MobUtils.getMobHome(job.getConfiguration()), ".tmp"), MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME), job.getJobName());
        job.getConfiguration().set(WORKING_DIR_KEY, compactionWorkingPath.toString());
        this.fs.delete(compactionWorkingPath, true);
        this.fs.mkdirs(compactionWorkingPath);
        Path path = new Path(compactionWorkingPath, "files");
        Path path2 = new Path(compactionWorkingPath, "names");
        job.getConfiguration().set(WORKING_FILES_DIR_KEY, path.toString());
        Path path3 = new Path(path2, "all");
        job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, path3.toString());
        Path path4 = new Path(path2, WORKING_VISITED_DIR);
        job.getConfiguration().set(WORKING_VISITED_DIR_KEY, path4.toString());
        this.fs.mkdirs(path4);
        FileStatus[] listStatus = this.fs.listStatus(MobUtils.getMobFamilyPath(job.getConfiguration(), tableName, str));
        TreeSet treeSet = new TreeSet();
        long j = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, 86400000L);
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath()) && this.compactionStartTime - fileStatus.getModificationTime() > j) {
                treeSet.add(fileStatus.getPath().getName());
            }
        }
        FSDataOutputStream fSDataOutputStream = null;
        SequenceFile.Writer writer = null;
        try {
            fSDataOutputStream = this.fs.create(path3, true);
            writer = SequenceFile.createWriter(job.getConfiguration(), fSDataOutputStream, String.class, String.class, SequenceFile.CompressionType.NONE, (CompressionCodec) null);
            Iterator it2 = treeSet.iterator();
            while (it2.hasNext()) {
                writer.append((String) it2.next(), "");
            }
            writer.hflush();
            if (writer != null) {
                IOUtils.closeStream(writer);
            }
            if (fSDataOutputStream != null) {
                IOUtils.closeStream(fSDataOutputStream);
            }
        } catch (Throwable th) {
            if (writer != null) {
                IOUtils.closeStream(writer);
            }
            if (fSDataOutputStream != null) {
                IOUtils.closeStream(fSDataOutputStream);
            }
            throw th;
        }
    }

    List<String> getUnusedFiles(Configuration configuration) throws IOException {
        Path path = new Path(configuration.get(WORKING_ALLNAMES_FILE_KEY));
        SequenceFile.Reader reader = null;
        MergeSortReader mergeSortReader = null;
        ArrayList arrayList = new ArrayList();
        try {
            reader = new SequenceFile.Reader(this.fs, path, configuration);
            mergeSortReader = new MergeSortReader(this.fs, configuration, new Path(configuration.get(WORKING_VISITED_DIR_KEY)));
            String str = (String) reader.next((String) null);
            String next = mergeSortReader.next();
            while (str != null) {
                if (next != null) {
                    int compareTo = str.compareTo(next);
                    if (compareTo < 0) {
                        arrayList.add(str);
                        str = (String) reader.next((String) null);
                    } else if (compareTo > 0) {
                        next = mergeSortReader.next();
                    } else {
                        str = (String) reader.next((String) null);
                        next = mergeSortReader.next();
                    }
                } else {
                    arrayList.add(str);
                    str = (String) reader.next((String) null);
                }
                if (str == null && next == null) {
                    break;
                }
            }
            if (reader != null) {
                IOUtils.closeStream(reader);
            }
            if (mergeSortReader != null) {
                mergeSortReader.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (reader != null) {
                IOUtils.closeStream(reader);
            }
            if (mergeSortReader != null) {
                mergeSortReader.close();
            }
            throw th;
        }
    }

    private void removeUnusedFiles(Job job, TableName tableName, HColumnDescriptor hColumnDescriptor) throws IOException {
        ArrayList arrayList = new ArrayList();
        List<String> unusedFiles = getUnusedFiles(job.getConfiguration());
        Path mobFamilyPath = MobUtils.getMobFamilyPath(job.getConfiguration(), tableName, hColumnDescriptor.getNameAsString());
        Iterator<String> it2 = unusedFiles.iterator();
        while (it2.hasNext()) {
            arrayList.add(new StoreFile(this.fs, new Path(mobFamilyPath, it2.next()), job.getConfiguration(), this.cacheConfig, BloomType.NONE));
        }
        if (arrayList.isEmpty()) {
            return;
        }
        try {
            MobUtils.removeMobFiles(job.getConfiguration(), this.fs, tableName, FSUtils.getTableDir(MobUtils.getMobHome(this.conf), tableName), hColumnDescriptor.getName(), arrayList);
            LOG.info(arrayList.size() + " unused MOB files are removed");
        } catch (Exception e) {
            LOG.error("Failed to archive the store files " + arrayList, e);
        }
    }

    private void cleanup(Job job, TableName tableName, String str) {
        if (job != null) {
            try {
                this.fs.delete(new Path(job.getConfiguration().get(WORKING_DIR_KEY)), true);
            } catch (IOException e) {
                LOG.warn("Failed to delete the working directory after sweeping store " + str + " in the table " + tableName.getNameAsString(), e);
            }
        }
    }
}
