package nstream.adapter.http;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import nstream.adapter.common.ext.HttpIngressSettings;
import nstream.adapter.common.ingress.ExchangeRelay;
import nstream.adapter.common.ingress.IngestorAgent;
import nstream.adapter.common.schedule.DeferrableException;
import swim.concurrent.TimerRef;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/http/HttpIngestingAgent.class */
public abstract class HttpIngestingAgent extends IngestorAgent<HttpIngressSettings, HttpResponse<InputStream>> implements ExchangeRelay<HttpIngressSettings, HttpRequest, HttpResponse<InputStream>> {
    protected TimerRef exchangeTimer;

    protected TimerRef exchangeTimer() {
        return this.exchangeTimer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public HttpIngressSettings m1parseIngressSettings(Value value) {
        return HttpAdapterUtils.ingressSettingsFromProp(value);
    }

    protected HttpClient httpClient() {
        return HttpAdapterUtils.defaultHttpClient();
    }

    protected void fetchAndRelay() throws DeferrableException {
        relayResponse(executeRequest(prepareRequest((HttpIngressSettings) this.ingressSettings)));
    }

    public HttpRequest prepareRequest(HttpIngressSettings httpIngressSettings) {
        try {
            return HttpAdapterUtils.buildHttpRequest("GET", httpIngressSettings.endpointUrl(), httpIngressSettings.headers(), HttpRequest.BodyPublishers.noBody(), httpIngressSettings.timeoutMillis());
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    public HttpResponse<InputStream> executeRequest(HttpRequest httpRequest) throws DeferrableException {
        try {
            return HttpAdapterUtils.executeHttpRequest(httpClient(), httpRequest);
        } catch (IOException | InterruptedException e) {
            throw new DeferrableException("Failed to execute request (timer remains active)", e);
        }
    }

    protected boolean responseIsHealthy(HttpResponse<?> httpResponse) {
        return httpResponse.statusCode() / 100 == 2;
    }

    public void relayResponse(HttpResponse<InputStream> httpResponse) throws DeferrableException {
        if (!responseIsHealthy(httpResponse)) {
            throw new DeferrableException(nodeUri() + ": response saw unhealthy status code " + httpResponse.statusCode() + " (timer remains active). Headers: " + httpResponse.headers());
        }
        ingest(httpResponse);
    }

    protected void stageReception() {
        loadSettings("httpIngressConf");
        this.exchangeTimer = scheduleAtFixedRate(this::exchangeTimer, this.ingressSettings.firstPollDelayMillis(), this.ingressSettings.pollIntervalMillis(), this::fetchAndRelay);
    }

    public void didStart() {
        System.out.println(nodeUri() + ": didStart");
        stageReception();
    }
}
