/*
 * Decompiled with CFR 0.152.
 */
package io.deltastream.flink.connector.snowflake.sink.internal;

import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeChannelConfig;
import io.deltastream.flink.connector.snowflake.sink.config.SnowflakeWriterConfig;
import io.deltastream.flink.connector.snowflake.sink.internal.SnowflakeSinkServiceImpl;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class SnowflakeSinkServiceImplTest {
    SnowflakeSinkServiceImplTest() {
    }

    @Test
    void testSuccessfulInsert() throws IOException {
        FakeSnowflakeSinkServiceImpl sinkService = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup());
        Assertions.assertEquals((long)0L, (long)sinkService.getLatestCommittedOffsetFromSnowflakeIngestChannel());
        sinkService.insert(Map.of("field_1", "val_1"));
        Assertions.assertEquals((long)1L, (long)sinkService.getLatestCommittedOffsetFromSnowflakeIngestChannel());
    }

    @Test
    void testInsertExceptionHandling() throws IOException {
        FakeSnowflakeSinkServiceImpl sinkService = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()){

            public SnowflakeStreamingIngestChannel getChannel() {
                return new FakeSnowflakeStreamingIngestChannel(this.getChannelName(), this.getChannelConfig().getDatabaseName(), this.getChannelConfig().getSchemaName(), this.getChannelConfig().getTableName()){

                    @Override
                    public InsertValidationResponse insertRow(Map<String, Object> row, String offsetToken) {
                        throw new SFException(ErrorCode.INTERNAL_ERROR, new Object[]{"test"});
                    }
                };
            }
        };
        IOException e = (IOException)Assertions.assertThrows(IOException.class, () -> sinkService.insert(Map.of("field_1", "val_1")));
        Assertions.assertTrue((boolean)e.getMessage().contains("Failed to insert row with Snowflake sink service"));
    }

    @Test
    void testInsertErrornHandling() throws IOException {
        FakeSnowflakeSinkServiceImpl sinkService = new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()){

            public SnowflakeStreamingIngestChannel getChannel() {
                return new FakeSnowflakeStreamingIngestChannel(this.getChannelName(), this.getChannelConfig().getDatabaseName(), this.getChannelConfig().getSchemaName(), this.getChannelConfig().getTableName()){

                    @Override
                    public InsertValidationResponse insertRow(Map<String, Object> row, String offsetToken) {
                        InsertValidationResponse res = new InsertValidationResponse();
                        InsertValidationResponse.InsertError insertError = new InsertValidationResponse.InsertError(row, Long.parseLong(offsetToken));
                        insertError.setException(new SFException(ErrorCode.INTERNAL_ERROR, new Object[]{"test"}));
                        res.addError(insertError);
                        return res;
                    }
                };
            }
        };
        IOException e = (IOException)Assertions.assertThrows(IOException.class, () -> sinkService.insert(Map.of("field_1", "val_1")));
        Assertions.assertTrue((boolean)e.getMessage().contains("Encountered errors while ingesting rows into Snowflake: Ingest client internal error: test."));
    }

    @Test
    void testFetchOffsetTokenErrorHandling() {
        FlinkRuntimeException e = (FlinkRuntimeException)Assertions.assertThrows(FlinkRuntimeException.class, () -> new FakeSnowflakeSinkServiceImpl("appId", 0, new Properties(), SnowflakeWriterConfig.builder().build(), SnowflakeChannelConfig.builder().build("FAKE_DB", "FAKE_SCHEMA", "FAKE_TABLE"), new FakeSinkWriterMetricGroup()){

            public SnowflakeStreamingIngestClient getClient() {
                return new FakeSnowflakeStreamingIngestClient(this.getChannelName()){

                    @Override
                    public Map<String, String> getLatestCommittedOffsetTokens(List<SnowflakeStreamingIngestChannel> channels) {
                        HashMap<String, String> offsetTokens = new HashMap<String, String>();
                        channels.forEach(c -> {
                            String fqn = c.getFullyQualifiedName();
                            String token = "invalid_token";
                            offsetTokens.put(fqn, token);
                        });
                        return offsetTokens;
                    }
                };
            }
        });
        Assertions.assertTrue((boolean)e.getMessage().contains(String.format("The offsetToken '%s' cannot be parsed as a long for channel", "invalid_token")));
        Assertions.assertTrue((boolean)(e.getCause() instanceof NumberFormatException));
    }

    private class FakeSnowflakeSinkServiceImpl
    extends SnowflakeSinkServiceImpl {
        public FakeSnowflakeSinkServiceImpl(String appId, int taskId, Properties connectionConfig, SnowflakeWriterConfig writerConfig, SnowflakeChannelConfig channelConfig, SinkWriterMetricGroup metricGroup) {
            super(appId, taskId, connectionConfig, writerConfig, channelConfig, metricGroup);
        }

        SnowflakeStreamingIngestClient createClientFromConfig(String appId, Properties connectionConfig) {
            return new FakeSnowflakeStreamingIngestClient(this.getChannelName());
        }
    }

    private class FakeSinkWriterMetricGroup
    implements SinkWriterMetricGroup {
        private FakeSinkWriterMetricGroup() {
        }

        public Counter getNumRecordsOutErrorsCounter() {
            throw new UnsupportedOperationException();
        }

        public Counter getNumRecordsSendErrorsCounter() {
            return new SimpleCounter();
        }

        public Counter getNumRecordsSendCounter() {
            return new SimpleCounter();
        }

        public Counter getNumBytesSendCounter() {
            throw new UnsupportedOperationException();
        }

        public void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge) {
            throw new UnsupportedOperationException();
        }

        public OperatorIOMetricGroup getIOMetricGroup() {
            throw new UnsupportedOperationException();
        }

        public Counter counter(String name) {
            throw new UnsupportedOperationException();
        }

        public <C extends Counter> C counter(String name, C counter) {
            throw new UnsupportedOperationException();
        }

        public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
            throw new UnsupportedOperationException();
        }

        public <H extends Histogram> H histogram(String name, H histogram) {
            throw new UnsupportedOperationException();
        }

        public <M extends Meter> M meter(String name, M meter) {
            throw new UnsupportedOperationException();
        }

        public MetricGroup addGroup(String name) {
            throw new UnsupportedOperationException();
        }

        public MetricGroup addGroup(String key, String value) {
            throw new UnsupportedOperationException();
        }

        public String[] getScopeComponents() {
            throw new UnsupportedOperationException();
        }

        public Map<String, String> getAllVariables() {
            throw new UnsupportedOperationException();
        }

        public String getMetricIdentifier(String metricName) {
            throw new UnsupportedOperationException();
        }

        public String getMetricIdentifier(String metricName, CharacterFilter filter) {
            throw new UnsupportedOperationException();
        }
    }
}

