/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.encoding.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.buffer.api.CompositeBuffer;
import io.servicetalk.buffer.api.ReadOnlyBufferAllocators;
import io.servicetalk.buffer.netty.BufferUtils;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.encoding.api.BufferEncodingException;
import io.servicetalk.encoding.netty.NettyCompressionSerializer;
import io.servicetalk.serializer.api.StreamingSerializerDeserializer;
import java.util.Objects;
import java.util.Queue;
import java.util.function.Supplier;
import javax.annotation.Nullable;

final class NettyCompressionStreamingSerializer
implements StreamingSerializerDeserializer<Buffer> {
    private static final Buffer END_OF_STREAM = ReadOnlyBufferAllocators.DEFAULT_RO_ALLOCATOR.fromAscii((CharSequence)" ");
    private static final int MAX_SIZE_FOR_MERGED_BUFFER = 65536;
    private final Supplier<MessageToByteEncoder<ByteBuf>> encoderSupplier;
    private final Supplier<ByteToMessageDecoder> decoderSupplier;

    NettyCompressionStreamingSerializer(Supplier<MessageToByteEncoder<ByteBuf>> encoderSupplier, Supplier<ByteToMessageDecoder> decoderSupplier) {
        this.encoderSupplier = Objects.requireNonNull(encoderSupplier);
        this.decoderSupplier = Objects.requireNonNull(decoderSupplier);
    }

    public Publisher<Buffer> deserialize(Publisher<Buffer> serializedData, final BufferAllocator allocator) {
        return serializedData.liftSync(subscriber -> new PublisherSource.Subscriber<Buffer>(){
            private final ByteToMessageDecoder decoder;
            private final EmbeddedChannel channel;
            @Nullable
            private PublisherSource.Subscription subscription;
            {
                this.decoder = (ByteToMessageDecoder)NettyCompressionStreamingSerializer.this.decoderSupplier.get();
                this.channel = NettyCompressionStreamingSerializer.newEmbeddedChannel((ChannelHandler)this.decoder, allocator);
            }

            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)subscription);
                subscriber.onSubscribe(this.subscription);
            }

            public void onNext(@Nullable Buffer next) {
                assert (this.subscription != null);
                if (next == null) {
                    subscriber.onNext(null);
                    return;
                }
                try {
                    NettyCompressionSerializer.writeAndUpdateIndex(this.channel, next, true);
                    Buffer buffer = NettyCompressionStreamingSerializer.drainChannelQueueToSingleBuffer(this.channel.inboundMessages(), allocator);
                    if (buffer != null && buffer.readableBytes() > 0) {
                        subscriber.onNext((Object)buffer);
                    } else {
                        this.subscription.request(1L);
                    }
                }
                catch (Throwable t) {
                    throw new BufferEncodingException("Unexpected exception during decoding", t);
                }
            }

            public void onError(Throwable t) {
                NettyCompressionSerializer.safeCleanup(this.channel);
                subscriber.onError(t);
            }

            public void onComplete() {
                try {
                    NettyCompressionSerializer.cleanup(this.channel);
                }
                catch (Throwable t) {
                    subscriber.onError((Throwable)new BufferEncodingException("Unexpected exception during decoding", t));
                    return;
                }
                subscriber.onComplete();
            }
        });
    }

    public Publisher<Buffer> serialize(Publisher<Buffer> toSerialize, final BufferAllocator allocator) {
        return toSerialize.concat(Single.succeeded((Object)END_OF_STREAM)).liftSync(subscriber -> new PublisherSource.Subscriber<Buffer>(){
            private final EmbeddedChannel channel;
            @Nullable
            private PublisherSource.Subscription subscription;
            private final MessageToByteEncoder encoder;
            {
                this.encoder = (MessageToByteEncoder)NettyCompressionStreamingSerializer.this.encoderSupplier.get();
                this.channel = NettyCompressionStreamingSerializer.newEmbeddedChannel((ChannelHandler)this.encoder, allocator);
            }

            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)subscription);
                subscriber.onSubscribe(this.subscription);
            }

            public void onNext(@Nullable Buffer next) {
                assert (this.subscription != null);
                if (next == null) {
                    subscriber.onNext(null);
                    return;
                }
                try {
                    if (next == END_OF_STREAM) {
                        NettyCompressionSerializer.preparePendingData(this.channel);
                        Buffer buffer = NettyCompressionStreamingSerializer.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), allocator);
                        if (buffer != null) {
                            subscriber.onNext((Object)buffer);
                        }
                    } else {
                        NettyCompressionSerializer.writeAndUpdateIndex(this.channel, next, false);
                        Buffer buffer = NettyCompressionStreamingSerializer.drainChannelQueueToSingleBuffer(this.channel.outboundMessages(), allocator);
                        if (buffer != null && buffer.readableBytes() > 0) {
                            subscriber.onNext((Object)buffer);
                        } else {
                            this.subscription.request(1L);
                        }
                    }
                }
                catch (Throwable t) {
                    throw new BufferEncodingException("Unexpected exception during encoding", t);
                }
            }

            public void onError(Throwable t) {
                NettyCompressionSerializer.safeCleanup(this.channel);
                subscriber.onError(t);
            }

            public void onComplete() {
                try {
                    NettyCompressionSerializer.cleanup(this.channel);
                }
                catch (Throwable t) {
                    subscriber.onError((Throwable)new BufferEncodingException("Unexpected exception during encoding", t));
                    return;
                }
                subscriber.onComplete();
            }
        });
    }

    private static EmbeddedChannel newEmbeddedChannel(ChannelHandler handler, BufferAllocator allocator) {
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        channel.config().setAllocator(BufferUtils.getByteBufAllocator((BufferAllocator)allocator));
        return channel;
    }

    @Nullable
    private static Buffer drainChannelQueueToSingleBuffer(Queue<Object> queue, BufferAllocator allocator) {
        ByteBuf part;
        if (queue.isEmpty()) {
            return null;
        }
        if (queue.size() == 1) {
            return BufferUtils.newBufferFrom((ByteBuf)((ByteBuf)queue.poll()));
        }
        int accumulateSize = 0;
        for (Object e : queue) {
            accumulateSize += ((ByteBuf)e).readableBytes();
        }
        if (accumulateSize <= 65536) {
            Buffer buffer = allocator.newBuffer();
            while ((part = (ByteBuf)queue.poll()) != null) {
                buffer.writeBytes(BufferUtils.newBufferFrom((ByteBuf)part));
            }
            return buffer;
        }
        CompositeBuffer compositeBuffer = allocator.newCompositeBuffer(Integer.MAX_VALUE);
        while ((part = (ByteBuf)queue.poll()) != null) {
            compositeBuffer.addBuffer(BufferUtils.newBufferFrom((ByteBuf)part));
        }
        return compositeBuffer;
    }
}

