Home / Function/ testBufferLifecycleCorrectlyHandled() — netty Function Reference

testBufferLifecycleCorrectlyHandled() — netty Function Reference

Architecture documentation for the testBufferLifecycleCorrectlyHandled() function in IoUringSocketSendSzSendmsgZcTest.java from the netty codebase.

Entity Profile

Dependency Diagram

graph TD
  4d512686_38c2_fd69_88d7_aea07a32ce28["testBufferLifecycleCorrectlyHandled()"]
  26135596_06d1_940f_cc3a_62ccf09121f7["IoUringSocketSendSzSendmsgZcTest"]
  4d512686_38c2_fd69_88d7_aea07a32ce28 -->|defined in| 26135596_06d1_940f_cc3a_62ccf09121f7
  2bd35cf6_9d79_442c_433f_5cbbd6556abc["testBufferLifecycleCorrectlyHandledUsingSendZc()"]
  2bd35cf6_9d79_442c_433f_5cbbd6556abc -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  64b9f0df_91c4_e81b_1f5f_1db485751f4d["testBufferLifecycleCorrectlyHandledUsingSendmsgZc()"]
  64b9f0df_91c4_e81b_1f5f_1db485751f4d -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  385129a3_476d_2ebd_5ab3_efd218f2e23f["testBufferLifecycleCorrectlyHandledUsingSendZcWhenRemoteClose()"]
  385129a3_476d_2ebd_5ab3_efd218f2e23f -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  67f196e9_937c_928c_3ca6_55e74a4b7b95["testBufferLifecycleCorrectlyHandledUsingSendmsgZcWhenRemoteClose()"]
  67f196e9_937c_928c_3ca6_55e74a4b7b95 -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  d7aa244c_b851_aed7_e98e_7237e2836d1b["testBufferLifecycleCorrectlyHandledUsingSendZcWhenLocalClose()"]
  d7aa244c_b851_aed7_e98e_7237e2836d1b -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  cdbdaede_0dd7_59ae_29a1_b32d0990520a["testBufferLifecycleCorrectlyHandledUsingSendmsgZcWhenLocalClose()"]
  cdbdaede_0dd7_59ae_29a1_b32d0990520a -->|calls| 4d512686_38c2_fd69_88d7_aea07a32ce28
  style 4d512686_38c2_fd69_88d7_aea07a32ce28 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

transport-native-io_uring/src/test/java/io/netty/channel/uring/IoUringSocketSendSzSendmsgZcTest.java lines 126–225

    private static void testBufferLifecycleCorrectlyHandled(Bootstrap cb, boolean multiple, Close remoteClose)
            throws Throwable {
        cb.handler(new ChannelInboundHandlerAdapter());
        // Force to use send_zc / sendmsg_zc if supported.
        cb.option(IoUringChannelOption.IO_URING_WRITE_ZERO_COPY_THRESHOLD, 0);
        if (remoteClose == Close.LOCAL) {
            // Configure TCP_USER_TIMEOUT to a small number so the buffers can be returned quickly.
            // See:
            // - https://man7.org/linux/man-pages/man7/tcp.7.html
            // - https://github.com/torvalds/linux/blob/v6.16/include/uapi/linux/tcp.h#L111
            cb.option(new IntegerUnixChannelOption("TCP_USER_TIMEOUT", 6, 18), 1000);
        }

        try (ServerSocket serverSocket = new ServerSocket()) {
            serverSocket.bind(new InetSocketAddress(0));
            ChannelFuture future = cb.connect(serverSocket.getLocalSocketAddress());
            final AtomicReference<Throwable> causeRef = new AtomicReference<>();

            try (Socket socket = serverSocket.accept()) {
                // We accept the socket but don't read data, this way we will not receive the second notification
                // for the send as we never see a TCP ack until we start reading.
                Channel channel = future.sync().channel();
                try {
                    final int numBuffers = multiple ? 2: 1;
                    CountDownLatch latch = new CountDownLatch(numBuffers);
                    int bufferSize = 1024 * 1024;
                    final ByteBuf buffer = channel.alloc().buffer(bufferSize);
                    future.addListener(f -> {
                        if (f.isSuccess()) {
                            ChannelFutureListener writeListener = f2 -> {
                                if (!f2.isSuccess()) {
                                    causeRef.compareAndSet(null, f2.cause());
                                }
                                latch.countDown();
                            };

                            buffer.writerIndex(buffer.capacity());
                            if (multiple) {
                                channel.write(buffer.readRetainedSlice(buffer.readableBytes() / 2))
                                        .addListener(writeListener);
                            }
                            channel.writeAndFlush(buffer)
                                    .addListener(writeListener);
                        } else {
                            buffer.release();
                            causeRef.set(f.cause());

                            for (int i = 0; i < numBuffers; i++) {
                                latch.countDown();
                            }
                        }
                    });
                    latch.await();
                    Throwable cause = causeRef.get();
                    if (cause != null) {
                        fail(cause);
                    }
                    // The buffer should still have a reference count of 1 as we did not receive the second notification
                    // yet as the remote peer did not start reading and did not close the socket yet.
                    if (multiple) {
                        assertEquals(numBuffers, buffer.refCnt());
                    } else {
                        assertEquals(numBuffers, buffer.refCnt());
                    }

                    switch (remoteClose) {
                        case REMOTE:
                            // Don't read any data but just close the socket. This should trigger the required
                            // notifications to release the buffers.
                            socket.close();
                            break;
                        case LOCAL:
                            // Don't read any data but just close the channel. Once we did not see an ack for the
                            // configured TCP_USER_TIMEOUT we will get the required notifications to release the buffers
                            channel.close().sync();
                            break;
                        case NONE:
                            // Let's read the bytes now so the buffer can be released again from the NIC.
                            try (InputStream stream = socket.getInputStream()) {
                                byte[] bytes = new byte[64 * 1024];
                                int r;

Domain

Subdomains

Frequently Asked Questions

What does testBufferLifecycleCorrectlyHandled() do?
testBufferLifecycleCorrectlyHandled() is a function in the netty codebase, defined in transport-native-io_uring/src/test/java/io/netty/channel/uring/IoUringSocketSendSzSendmsgZcTest.java.
Where is testBufferLifecycleCorrectlyHandled() defined?
testBufferLifecycleCorrectlyHandled() is defined in transport-native-io_uring/src/test/java/io/netty/channel/uring/IoUringSocketSendSzSendmsgZcTest.java at line 126.
What calls testBufferLifecycleCorrectlyHandled()?
testBufferLifecycleCorrectlyHandled() is called by 6 function(s): testBufferLifecycleCorrectlyHandledUsingSendZc, testBufferLifecycleCorrectlyHandledUsingSendZcWhenLocalClose, testBufferLifecycleCorrectlyHandledUsingSendZcWhenRemoteClose, testBufferLifecycleCorrectlyHandledUsingSendmsgZc, testBufferLifecycleCorrectlyHandledUsingSendmsgZcWhenLocalClose, testBufferLifecycleCorrectlyHandledUsingSendmsgZcWhenRemoteClose.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free