/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.encoding.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.internal.PlatformDependent;
import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.buffer.api.CompositeBuffer;
import io.servicetalk.buffer.api.ReadOnlyBufferAllocators;
import io.servicetalk.buffer.netty.BufferUtils;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.encoding.api.CodecDecodingException;
import io.servicetalk.encoding.api.CodecEncodingException;
import io.servicetalk.encoding.api.ContentCodec;
import io.servicetalk.encoding.netty.AbstractContentCodec;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NettyChannelContentCodec
extends AbstractContentCodec {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelContentCodec.class);
    private static final Buffer END_OF_STREAM = ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR.fromAscii((CharSequence)" ");
    private static final int MAX_SIZE_FOR_MERGED_BUFFER = 1024;
    private final Supplier<MessageToByteEncoder<ByteBuf>> encoderSupplier;
    private final Supplier<ByteToMessageDecoder> decoderSupplier;

    NettyChannelContentCodec(CharSequence name, Supplier<MessageToByteEncoder<ByteBuf>> encoderSupplier, Supplier<ByteToMessageDecoder> decoderSupplier) {
        super(name);
        this.encoderSupplier = encoderSupplier;
        this.decoderSupplier = decoderSupplier;
    }

    public Buffer encode(Buffer src, BufferAllocator allocator) {
        Objects.requireNonNull(allocator);
        if (src.readableBytes() == 0) {
            throw new CodecEncodingException((ContentCodec)this, "No data to encode.");
        }
        MessageToByteEncoder<ByteBuf> encoder = this.encoderSupplier.get();
        EmbeddedChannel channel = this.newEmbeddedChannel((ChannelHandler)encoder, allocator);
        try {
            ByteBuf origin = BufferUtils.extractByteBufOrCreate((Buffer)src);
            channel.writeOutbound(new Object[]{origin});
            NettyChannelContentCodec.preparePendingData(channel);
            Buffer buffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(channel.outboundMessages(), allocator);
            if (buffer == null) {
                throw new CodecEncodingException((ContentCodec)this, "Not enough data to produce an encoded output");
            }
            NettyChannelContentCodec.cleanup(channel);
            Buffer buffer2 = buffer;
            return buffer2;
        }
        catch (CodecEncodingException e) {
            throw e;
        }
        catch (Throwable e) {
            throw NettyChannelContentCodec.wrapEncodingException(this, e);
        }
        finally {
            NettyChannelContentCodec.safeCleanup(channel);
        }
    }

    public Buffer encode(Buffer src, int offset, int length, BufferAllocator allocator) {
        Buffer slice = src.slice(src.readerIndex() + offset, length);
        src.skipBytes(offset + length);
        return this.encode(slice, allocator);
    }

    public Publisher<Buffer> encode(Publisher<Buffer> from, final BufferAllocator allocator) {
        Objects.requireNonNull(from);
        Objects.requireNonNull(allocator);
        return from.concat(Single.succeeded((Object)END_OF_STREAM)).liftSync(subscriber -> new PublisherSource.Subscriber<Buffer>(){
            private final EmbeddedChannel channel;
            @Nullable
            PublisherSource.Subscription subscription;
            private final MessageToByteEncoder encoder;
            {
                this.encoder = (MessageToByteEncoder)NettyChannelContentCodec.this.encoderSupplier.get();
                this.channel = NettyChannelContentCodec.this.newEmbeddedChannel((ChannelHandler)this.encoder, allocator);
            }

            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription = subscription;
                subscriber.onSubscribe(subscription);
            }

            public void onNext(Buffer next) {
                assert (this.subscription != null);
                if (!this.channel.isOpen()) {
                    throw new IllegalStateException("Stream encoder previously closed but more input arrived");
                }
                if (next == null) {
                    throw new CodecEncodingException((ContentCodec)NettyChannelContentCodec.this, "Cannot encode null values");
                }
                try {
                    if (next == END_OF_STREAM) {
                        NettyChannelContentCodec.preparePendingData(this.channel);
                        Buffer buffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), allocator);
                        if (buffer != null) {
                            subscriber.onNext((Object)buffer);
                        }
                        return;
                    }
                    this.channel.writeOutbound(new Object[]{BufferUtils.extractByteBufOrCreate((Buffer)next)});
                    Buffer buffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), allocator);
                    if (buffer != null && buffer.readableBytes() > 0) {
                        subscriber.onNext((Object)buffer);
                    } else {
                        this.subscription.request(1L);
                    }
                }
                catch (Throwable t) {
                    throw NettyChannelContentCodec.wrapEncodingException(NettyChannelContentCodec.this, t);
                }
            }

            public void onError(Throwable t) {
                NettyChannelContentCodec.safeCleanup(this.channel);
                subscriber.onError(t);
            }

            public void onComplete() {
                try {
                    NettyChannelContentCodec.cleanup(this.channel);
                }
                catch (Throwable t) {
                    subscriber.onError((Throwable)NettyChannelContentCodec.wrapEncodingException(NettyChannelContentCodec.this, t));
                    return;
                }
                subscriber.onComplete();
            }
        });
    }

    public Buffer decode(Buffer src, BufferAllocator allocator) {
        Objects.requireNonNull(allocator);
        if (src.readableBytes() == 0) {
            throw new CodecEncodingException((ContentCodec)this, "No data to encode.");
        }
        ByteToMessageDecoder decoder = this.decoderSupplier.get();
        EmbeddedChannel channel = this.newEmbeddedChannel((ChannelHandler)decoder, allocator);
        try {
            ByteBuf origin = BufferUtils.extractByteBufOrCreate((Buffer)src);
            channel.writeInbound(new Object[]{origin});
            Buffer buffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(channel.inboundMessages(), allocator);
            if (buffer == null) {
                throw new CodecDecodingException((ContentCodec)this, "Not enough data to decode.");
            }
            NettyChannelContentCodec.cleanup(channel);
            Buffer buffer2 = buffer;
            return buffer2;
        }
        catch (CodecDecodingException e) {
            throw e;
        }
        catch (Throwable e) {
            throw NettyChannelContentCodec.wrapDecodingException(this, e);
        }
        finally {
            NettyChannelContentCodec.safeCleanup(channel);
        }
    }

    public Buffer decode(Buffer src, int offset, int length, BufferAllocator allocator) {
        Buffer slice = src.slice(src.readerIndex() + offset, length);
        src.skipBytes(offset + length);
        return this.decode(slice, allocator);
    }

    public Publisher<Buffer> decode(Publisher<Buffer> from, final BufferAllocator allocator) {
        Objects.requireNonNull(from);
        Objects.requireNonNull(allocator);
        return from.liftSync(subscriber -> new PublisherSource.Subscriber<Buffer>(){
            private final ByteToMessageDecoder decoder;
            private final EmbeddedChannel channel;
            @Nullable
            PublisherSource.Subscription subscription;
            {
                this.decoder = (ByteToMessageDecoder)NettyChannelContentCodec.this.decoderSupplier.get();
                this.channel = NettyChannelContentCodec.this.newEmbeddedChannel((ChannelHandler)this.decoder, allocator);
            }

            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription = subscription;
                subscriber.onSubscribe(subscription);
            }

            public void onNext(@Nullable Buffer src) {
                assert (this.subscription != null);
                if (!this.channel.isOpen()) {
                    throw new CodecDecodingException((ContentCodec)NettyChannelContentCodec.this, "Stream decoder previously closed but more input arrived");
                }
                if (src == null) {
                    throw new CodecDecodingException((ContentCodec)NettyChannelContentCodec.this, "Cannot decode null values");
                }
                try {
                    this.channel.writeInbound(new Object[]{BufferUtils.extractByteBufOrCreate((Buffer)src)});
                    Buffer buffer = NettyChannelContentCodec.drainChannelQueueToSingleBuffer(this.channel.inboundMessages(), allocator);
                    if (buffer != null && buffer.readableBytes() > 0) {
                        subscriber.onNext((Object)buffer);
                    } else {
                        this.subscription.request(1L);
                    }
                }
                catch (Throwable e) {
                    throw NettyChannelContentCodec.wrapDecodingException(NettyChannelContentCodec.this, e);
                }
            }

            public void onError(Throwable t) {
                NettyChannelContentCodec.safeCleanup(this.channel);
                subscriber.onError(t);
            }

            public void onComplete() {
                try {
                    NettyChannelContentCodec.cleanup(this.channel);
                }
                catch (Throwable t) {
                    subscriber.onError((Throwable)NettyChannelContentCodec.wrapDecodingException(NettyChannelContentCodec.this, t));
                    return;
                }
                subscriber.onComplete();
            }
        });
    }

    private EmbeddedChannel newEmbeddedChannel(ChannelHandler handler, BufferAllocator allocator) {
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        channel.config().setAllocator(BufferUtils.getByteBufAllocator((BufferAllocator)allocator));
        return channel;
    }

    private static void preparePendingData(EmbeddedChannel channel) {
        try {
            channel.close().syncUninterruptibly().get();
            channel.checkException();
        }
        catch (InterruptedException | ExecutionException ex) {
            PlatformDependent.throwException((Throwable)ex);
        }
    }

    @Nullable
    private static Buffer drainChannelQueueToSingleBuffer(Queue<Object> queue, BufferAllocator allocator) {
        ByteBuf part;
        if (queue.isEmpty()) {
            return null;
        }
        if (queue.size() == 1) {
            return BufferUtils.newBufferFrom((ByteBuf)((ByteBuf)queue.poll()));
        }
        int accumulateSize = 0;
        int components = 0;
        for (Object e : queue) {
            accumulateSize += ((ByteBuf)e).readableBytes();
            ++components;
        }
        if (accumulateSize <= 1024) {
            Buffer buffer = allocator.newBuffer();
            while ((part = (ByteBuf)queue.poll()) != null) {
                buffer.writeBytes(BufferUtils.newBufferFrom((ByteBuf)part));
            }
            return buffer;
        }
        CompositeBuffer compositeBuffer = allocator.newCompositeBuffer(components);
        while ((part = (ByteBuf)queue.poll()) != null) {
            compositeBuffer.addBuffer(BufferUtils.newBufferFrom((ByteBuf)part));
        }
        return compositeBuffer;
    }

    private static void cleanup(EmbeddedChannel channel) {
        boolean wasNotEmpty = channel.finishAndReleaseAll();
        assert (!wasNotEmpty);
    }

    private static void safeCleanup(EmbeddedChannel channel) {
        try {
            NettyChannelContentCodec.cleanup(channel);
        }
        catch (AssertionError error) {
            throw error;
        }
        catch (Throwable t) {
            LOGGER.error("Error while closing embedded channel", t);
        }
    }

    private static CodecEncodingException wrapEncodingException(ContentCodec codec, Throwable cause) {
        return new CodecEncodingException(codec, "Unexpected exception during encoding", cause);
    }

    private static CodecDecodingException wrapDecodingException(ContentCodec codec, Throwable cause) {
        return new CodecDecodingException(codec, "Unexpected exception during decoding", cause);
    }
}

