package com.taobao.api.internal.jushita.stream;

import com.baidu.android.pushservice.PushConstants;
import com.taobao.api.ApiException;
import com.taobao.api.internal.jushita.JushitaTaobaoClient;
import com.taobao.api.internal.jushita.stream.Message;
import com.taobao.api.internal.tmc.MessageFields;
import com.taobao.api.internal.util.StringUtils;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public class MessageDriver implements Runnable {
    private static final Logger logger = Logger.getLogger(MessageDriver.class);
    private String appKey;
    private JushitaTaobaoClient client;
    private MessageHandleThread[] consumerThreads;
    private Report lastFailedReport;
    private long lastReportTime;
    private MessageHandler messageHandler;
    private MessageCircleQueue queue;
    private ScheduledExecutorService reportSchedule;
    private String reportUrl;
    private String secret;
    private int reportCount = 100;
    private long reportInterval = 300000;
    private int consumerThreadCount = 10;
    private int timeoutSeconds = 10;

    /* loaded from: classes.dex */
    public static class MessageHandleThread extends Thread {
        private MessageHandler messageHandle;
        private MessageCircleQueue queue;
        private volatile boolean run;

        public MessageHandleThread(String str, MessageHandler messageHandler, MessageCircleQueue messageCircleQueue) {
            super(str);
            this.run = true;
            this.messageHandle = messageHandler;
            this.queue = messageCircleQueue;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.run) {
                try {
                    Message take = this.queue.take();
                    if (this.messageHandle.handle(take)) {
                        take.setState(Message.State.SUCCESS);
                    } else {
                        take.setState(Message.State.FAILED);
                    }
                } catch (InterruptedException e) {
                    MessageDriver.logger.warn("thread is interrupted");
                    return;
                } catch (Exception e2) {
                    MessageDriver.logger.warn("error happened when handle message:" + e2.getMessage(), e2);
                }
            }
        }

        public void shutdown() {
            this.run = false;
        }
    }

    /* loaded from: classes.dex */
    public interface MessageHandler {
        boolean handle(Message message);
    }

    public MessageDriver(String str, String str2) {
        if (StringUtils.isEmpty(str) || StringUtils.isEmpty(str2)) {
            throw new NullPointerException("parameters should not empty");
        }
        this.appKey = str;
        this.secret = str2;
    }

    public int getConsumerThreadCount() {
        return this.consumerThreadCount;
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }

    public int getReportCount() {
        return this.reportCount;
    }

    public long getReportInterval() {
        return this.reportInterval;
    }

    public String getReportUrl() {
        return this.reportUrl;
    }

    public int getTimeoutSeconds() {
        return this.timeoutSeconds;
    }

    public void pushMessage(String str) {
        try {
            this.queue.put(new Message(str));
        } catch (InterruptedException e) {
            logger.info("push message thread is interrupted");
        }
    }

    protected void report(Report report) {
        HashMap hashMap = new HashMap();
        if (logger.isInfoEnabled()) {
            logger.info("report happened:" + report.asJson());
        }
        hashMap.put("report", report.asJson());
        hashMap.put(PushConstants.EXTRA_USER_ID, "1");
        hashMap.put(MessageFields.DATA_TOPIC, MessageFields.DATA_TOPIC);
        try {
            this.client.execute("report_message", hashMap, MessageFields.DATA_INCOMING_USER_SESSION);
            this.lastFailedReport = null;
        } catch (ApiException e) {
            this.lastFailedReport = report;
            logger.error("report message error. if this happened many times, please contact us.", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.lastFailedReport != null) {
            report(this.lastFailedReport);
            return;
        }
        int check = this.queue.check();
        if (check >= this.reportCount || (check > 0 && System.currentTimeMillis() - this.lastReportTime > this.reportInterval)) {
            report(this.queue.report());
            this.lastReportTime = System.currentTimeMillis();
        }
    }

    public void setConsumerThreadCount(int i) {
        this.consumerThreadCount = i;
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    public void setReportCount(int i) {
        this.reportCount = i;
    }

    public void setReportInterval(long j) {
        this.reportInterval = j;
    }

    public void setReportUrl(String str) {
        this.reportUrl = str;
    }

    public void setTimeoutSeconds(int i) {
        this.timeoutSeconds = i;
    }

    public void start() {
        logger.info("driver start");
        if (this.messageHandler == null) {
            throw new NullPointerException("message handle is not be set");
        }
        if (StringUtils.isEmpty(this.reportUrl)) {
            throw new NullPointerException("report url is not be set");
        }
        this.client = new JushitaTaobaoClient(this.reportUrl, this.appKey, this.secret);
        this.queue = new MessageCircleQueue(this.reportCount * 20, this.timeoutSeconds);
        this.consumerThreads = new MessageHandleThread[this.consumerThreadCount];
        for (int i = 0; i < this.consumerThreadCount; i++) {
            this.consumerThreads[i] = new MessageHandleThread("msg-driver-thread-" + i, this.messageHandler, this.queue);
            this.consumerThreads[i].start();
        }
        this.lastReportTime = System.currentTimeMillis();
        this.reportSchedule = Executors.newScheduledThreadPool(1);
        this.reportSchedule.scheduleWithFixedDelay(this, 50L, 50L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        logger.info("driver stop");
        for (int i = 0; i < this.consumerThreadCount; i++) {
            this.consumerThreads[i].shutdown();
        }
        this.reportSchedule.shutdown();
    }
}
