package org.apache.mahout.cf.taste.impl.similarity.precompute;

import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.ItemBasedRecommender;
import org.apache.mahout.cf.taste.similarity.precompute.BatchItemSimilarities;
import org.apache.mahout.cf.taste.similarity.precompute.SimilarItems;
import org.apache.mahout.cf.taste.similarity.precompute.SimilarItemsWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/mahout/cf/taste/impl/similarity/precompute/MultithreadedBatchItemSimilarities.class */
public class MultithreadedBatchItemSimilarities extends BatchItemSimilarities {
    private int batchSize;
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final Logger log = LoggerFactory.getLogger(MultithreadedBatchItemSimilarities.class);

    /* loaded from: input_file:org/apache/mahout/cf/taste/impl/similarity/precompute/MultithreadedBatchItemSimilarities$Output.class */
    private static class Output implements Runnable {
        private final BlockingQueue<List<SimilarItems>> results;
        private final SimilarItemsWriter writer;
        private final AtomicInteger numActiveWorkers;
        private int numSimilaritiesProcessed = 0;

        Output(BlockingQueue<List<SimilarItems>> blockingQueue, SimilarItemsWriter similarItemsWriter, AtomicInteger atomicInteger) {
            this.results = blockingQueue;
            this.writer = similarItemsWriter;
            this.numActiveWorkers = atomicInteger;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumSimilaritiesProcessed() {
            return this.numSimilaritiesProcessed;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.numActiveWorkers.get() != 0) {
                try {
                    List<SimilarItems> poll = this.results.poll(10L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        for (SimilarItems similarItems : poll) {
                            this.writer.add(similarItems);
                            this.numSimilaritiesProcessed += similarItems.numSimilarItems();
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/mahout/cf/taste/impl/similarity/precompute/MultithreadedBatchItemSimilarities$SimilarItemsWorker.class */
    private class SimilarItemsWorker implements Runnable {
        private final int number;
        private final BlockingQueue<long[]> itemIDBatches;
        private final BlockingQueue<List<SimilarItems>> results;
        private final AtomicInteger numActiveWorkers;

        SimilarItemsWorker(int i, BlockingQueue<long[]> blockingQueue, BlockingQueue<List<SimilarItems>> blockingQueue2, AtomicInteger atomicInteger) {
            this.number = i;
            this.itemIDBatches = blockingQueue;
            this.results = blockingQueue2;
            this.numActiveWorkers = atomicInteger;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (!this.itemIDBatches.isEmpty()) {
                try {
                    long[] take = this.itemIDBatches.take();
                    ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(take.length);
                    for (long j : take) {
                        newArrayListWithCapacity.add(new SimilarItems(j, MultithreadedBatchItemSimilarities.this.getRecommender().mostSimilarItems(j, MultithreadedBatchItemSimilarities.this.getSimilarItemsPerItem())));
                    }
                    this.results.offer(newArrayListWithCapacity);
                    i++;
                    if (i % 5 == 0) {
                        MultithreadedBatchItemSimilarities.log.info("worker {} processed {} batches", Integer.valueOf(this.number), Integer.valueOf(i));
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            MultithreadedBatchItemSimilarities.log.info("worker {} processed {} batches. done.", Integer.valueOf(this.number), Integer.valueOf(i));
            this.numActiveWorkers.decrementAndGet();
        }
    }

    public MultithreadedBatchItemSimilarities(ItemBasedRecommender itemBasedRecommender, int i) {
        this(itemBasedRecommender, i, 100);
    }

    public MultithreadedBatchItemSimilarities(ItemBasedRecommender itemBasedRecommender, int i, int i2) {
        super(itemBasedRecommender, i);
        this.batchSize = i2;
    }

    @Override // org.apache.mahout.cf.taste.similarity.precompute.BatchItemSimilarities
    public int computeItemSimilarities(int i, int i2, SimilarItemsWriter similarItemsWriter) throws IOException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i + 1);
        try {
            try {
                similarItemsWriter.open();
                BlockingQueue<long[]> queueItemIDsInBatches = queueItemIDsInBatches(getRecommender().getDataModel(), this.batchSize, i);
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                AtomicInteger atomicInteger = new AtomicInteger(i);
                for (int i3 = 0; i3 < i; i3++) {
                    newFixedThreadPool.execute(new SimilarItemsWorker(i3, queueItemIDsInBatches, linkedBlockingQueue, atomicInteger));
                }
                Output output = new Output(linkedBlockingQueue, similarItemsWriter, atomicInteger);
                newFixedThreadPool.execute(output);
                newFixedThreadPool.shutdown();
                try {
                    if (!newFixedThreadPool.awaitTermination(i2, TimeUnit.HOURS)) {
                        throw new RuntimeException("Unable to complete the computation in " + i2 + " hours!");
                    }
                    Closeables.close(similarItemsWriter, false);
                    return output.getNumSimilaritiesProcessed();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            try {
                if (!newFixedThreadPool.awaitTermination(i2, TimeUnit.HOURS)) {
                    throw new RuntimeException("Unable to complete the computation in " + i2 + " hours!");
                }
                Closeables.close(similarItemsWriter, false);
                throw th;
            } catch (InterruptedException e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    private static BlockingQueue<long[]> queueItemIDsInBatches(DataModel dataModel, int i, int i2) throws TasteException {
        LongPrimitiveIterator itemIDs = dataModel.getItemIDs();
        int numItems = dataModel.getNumItems();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue((numItems / i) + 1);
        long[] jArr = new long[i];
        int i3 = 0;
        while (itemIDs.hasNext()) {
            jArr[i3] = itemIDs.nextLong();
            i3++;
            if (i3 == i) {
                linkedBlockingQueue.add(jArr.clone());
                i3 = 0;
            }
        }
        if (i3 > 0) {
            long[] jArr2 = new long[i3];
            System.arraycopy(jArr, 0, jArr2, 0, i3);
            linkedBlockingQueue.add(jArr2);
        }
        if (linkedBlockingQueue.size() < i2) {
            throw new IllegalStateException("Degree of parallelism [" + i2 + "]  is larger than number of batches [" + linkedBlockingQueue.size() + "].");
        }
        log.info("Queued {} items in {} batches", Integer.valueOf(numItems), Integer.valueOf(linkedBlockingQueue.size()));
        return linkedBlockingQueue;
    }
}
