h

hening1

V1

2022/08/25阅读:36主题:默认主题

各种ByteBuffer探索

各种ByteBuffer

最近在看rocketmq源码的时候经常能看到ByteBuffer,刚开始看到的时候一脸懵逼,后来研究一段时间之后终于豁然开朗,在此总结分享一下。

Buffer

Buffer是一个抽象类,顾名思义是一个数据的缓存,ByteBuffer是字节缓冲区,扩展了Buffer,同样是一个抽象类,Buffer不只有ByteBuffer一个抽象类,还有比如IntBuffer,LongBuffer等,继承结构如下图,本文主要介绍ByteBuffer。

image-20211126152912377

Buffer中维护了几个私有变量

// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
//下一个读或者写的位置
private int position = 0;
//不能读或者写的边界索引
private int limit;
//buffer的容量
private int capacity;

可以看到JDK javadoc中就写清楚了mark <= position <= limit <= capacity这个关系。

重点介绍一下Buffer中提供的几个方法。

  1. flip()方法,后面我会给出这个方法的使用案例。

public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
}

将limit置为position,position置为0,mark置为0。用作读写转换,读完之后调用flip进行写。使用案例

  1. remaining()方法,返回limit和postion之差。
public final int remaining() {
        int rem = limit - position;
        return rem > 0 ? rem : 0;
    }
  1. hasRemaining()方法
public final boolean hasRemaining() {
        return position < limit;
    }
byteBuffer
byteBuffer

ByteBuffer

ByteBuffer有堆内和堆外两种实现:

  1. HeapByteBuffer是堆内的实现。
  2. DirectByteBuffer是堆外内存(直接内存)的实现,DirectByteBuffer扩展了MappedByteBuffer,MappedByteBuffer扩展了ByteBuffer,MappedByteBuffer也是在直接内存中。

堆内的HeapByteBuffer是通过一个字节数组来保存数据的:

public abstract class ByteBuffer {
  ...
  final byte[] hb;                  // Non-null only for heap buffers
  ...
}

堆外的DirectByteBuffer是通过在直接内存中申请一段内存来保存数据的。

public abstract class Buffer {
  ...
  // Used only by direct buffers
    // NOTE: hoisted here for speed in JNI GetDirectBufferAddress
    long address;
  ...
}

如果是mappedByteBuffer类型,可以通过以下方式获取到这个内存地址。

((DirectBuffer) mappedByteBuffer).address()

ByteBuffer是一个抽象类,没法实例化,需要实例化其子类。可以通过下面的静态方法创建HeapByteBuffer和DirectByteBuffer。

ByteBuffer heapByteBuffer = ByteBuffer.allocate(10);
ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(10);

ByteBuffer还提供了将字节数组包装成ByteBuffer的静态方法,输入字节数组,输出HeapByteBuffer。

public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)

    
{
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

现在我们来看一下ByteBuffer的使用案例,着重看一下flip的使用。

public static void main(String[] args) throws IOException {
        FileChannel fileChannel = new RandomAccessFile(new File("./hehe.txt"), "rw").getChannel();
        final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
        byteBuffer.put("hehe".getBytes(StandardCharsets.UTF_8));
        System.out.println("调用flip之前:" + byteBuffer.remaining());
        byteBuffer.flip();
        System.out.println("调用flip之后:" + byteBuffer.remaining());
        fileChannel.write(byteBuffer);
     final ByteBuffer allocate = ByteBuffer.allocate(4);
        fileChannel.read(allocate, 0);
        System.out.println("使用FileChannel读结果:" + new String(allocate.array()));
    }

输出结果

调用flip之前:1020
调用flip之后:4
使用FileChannel读结果:hehe

注意我的代码中使用了flip()方法,如果不flip是写不到文件数据的。先大概解释一下,我创建了一个1024字节的堆外缓存,这时候limit是1024, capacity是1024, position是0,写入了4个字节之后,position变成了4,所以调用remaining的结果返回1024-4=1020,调用了flip之后,position置为0,limit置为了position之前的值也就是4,所以remaining返回的结果是4-0=4。起始可以看到,在调用flip之前,remaining方法返回的是当前byteBuffer还有多少剩余空间可以存放数据,调用flip之后,返回的结果是byteBuffer存储了多少数据

这里我们可以从源码层面一探究竟为什么调用flip。主要是看以下两个方法:

byteBuffer.put()和fileChannel.write(byteBuffer)方法,先看一下byteBuffer.put()方法。

public ByteBuffer put(byte[] src, int offset, int length) {
        checkBounds(offset, length, src.length);
        if (length > remaining())
            throw new BufferOverflowException();
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }

public ByteBuffer put(byte x) {
        unsafe.putByte(ix(nextPutIndex()), ((x)));
        return this;
    }

private long ix(int i) {
        return address + ((long)i << 0);
    }

final int nextPutIndex() {                          // package-private
        int p = position;
        if (p >= limit)
            throw new BufferOverflowException();
        position = p + 1;
        return p;
    }

上面的代码比较简单,上面我们put了"hehe"字符串,就是一个for循环,从分配的堆外内存地址address+position开始存储元素,position不断的+1更新,另外我们新创建的byteBuffer的position是0,所以就是从堆外内存地址address开始,一个字节一个字节的存储数据。

接下来我们再看一下fileChannel.write的实现,sun.nio.ch.FileChannelImpl#write()这个方法。

//sun.nio.ch.FileChannelImpl#write()
public int write(ByteBuffer var1) throws IOException {
             ...
                    if (this.isOpen()) {
                        do {
                            var3 = IOUtil.write(this.fd, var1, -1Lthis.nd);
                        } while(var3 == -3 && this.isOpen());
         ...

                return var5;
            }
        }
    }

//sun.nio.ch.IOUtil#write()
 static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            ...
        }
    }


//sun.nio.ch.IOUtil#writeFromNativeBuffer()
    private static int writeFromNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
      //position,前面的例子中调用flip之后是0
        int var5 = var1.position();
      //limit, 前面的例子中调用flip之后是4
        int var6 = var1.limit();

        assert var5 <= var6;

      //remaining, 前面的例子中调用flip之后4
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        boolean var8 = false;
        if (var7 == 0) {
            return 0;
        } else {
            int var9;
            if (var2 != -1L) {
                var9 = var4.pwrite(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
            } else {
              //这步sun.nio.ch.NativeDispatcher#write,真正的写入数据,第一个参数是内存中的偏移量,第二个参数是写入数据的大小
                var9 = var4.write(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
            }

            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            return var9;
        }
    }

从上面的代码45行看,最终就是调用到了sun.nio.ch.NativeDispatcher#write方法,在调用flip之后var5是0,var7是4,此时就是在堆外分配的内存地址address+0的基础上偏移4个字节,正好是"hehe"这个字符串,而在调用flip之前,var5是4, var7是1020,调用write方法是是在address+4的基础上再偏移1020个字节,结合上面put的解释,这段内存空间根本就没有数据。这就解释了为什么调用fileChannel.write之前要掉用flip方法。

值得注意的是这里一般推荐使用直接内存DirectByteBuffer,原因是使用HeapByteBuffer在写入文件时需要多一步从java堆空间到直接内存的复制。我们再来看一下sun.nio.ch.FileChannelImpl#write()这个方法的代码,最终调用到sun.nio.ch.IOUtil#write()这个方法:

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
          //如果是DirectByteBuffer,走这个分支,调用writeFromNativeBuffer将数据写入到直接内存中
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
          //这个分支是非直接内存,即HeapByteBuffer会走这个分支
            int var5 = var1.position();
            int var6 = var1.limit();

            assert var5 <= var6;

            int var7 = var5 <= var6 ? var6 - var5 : 0;
           //这里会创建一个临时的堆外内存的缓存
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

            int var10;
            try {
                var8.put(var1);
                var8.flip();
                var1.position(var5);
              //再用堆外内调用writeFromNativeBuffer写数据
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }

                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }

            return var10;
        }
    }

从上面的代码可以看出,如果是HeapByteBuffer,就会创建一个临时的堆外内存缓存,然后将数据拷贝到这个堆外缓存上,在写入到文件。至于为什么这样操作,可以看一下R大的答案

MappedByteBuffer

MappedByteBuffer从jdk的javadoc上就能找到合适的介绍:

A direct byte buffer whose content is a memory-mapped region of a file.
 
Mapped byte buffers are created via the {@link
java.nio.channels.FileChannel#map FileChannel.map} method.  This class
extends the {@link ByteBuffer} class with operations that are specific to
memory-mapped file regions.

翻译过来其实就是一个文件的内存映射,通过调用FileChannel.map方法创建,扩展了ByteBuffer,其实mappedByteBuffer就可以理解为PageCache,如果不理解PageCache的概念可以看一下我之前写的从mmap到rocketmq存储设计这篇文章。

FileChannel fileChannel = new RandomAccessFile(new File("./hehe.txt"), "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 01024 * 1024);

FileChannel.map调用的是FileChannelImpl#map0的native方法,native方法的实现如下,最终调用的是操作系统的mmap系统调用。返回一个内存映射的索引地址。

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)

{
    void *mapAddress = 0;
    ...

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    ...

    return ((jlong) (unsigned long) mapAddress);
}

mmap就是内存映射函数,可以将一个文件映射到进程的虚拟地址空间上返回一个逻辑地址的指针,使进程可以通过指针来操作文件,进程访问文件就像访问内存一样,而且没有用户态内核态的切换。前面也提到了,在Java中可以通过下面的方法获取到这个逻辑指针的地址。

((DirectBuffer) mappedByteBuffer).address()
mmap
mmap

刚才说了MappedByteBuffer可以理解为PageCache,是属于内存,内存是不可靠的,所以MappedByteBuffer中提供了force()方法,从javadoc上看可以看出来这个方法的作用,说白了就是将MappedByteBuffer中的数据持久化到被映射的物理文件上,通过主动调用force方法可以将数据持久化到磁盘,可以避免内存不可靠导致数据丢失的问题。

/**
     * Forces any changes made to this buffer's content to be written to the
     * storage device containing the mapped file.
     * ...
     * @return  This buffer
     */

    public final MappedByteBuffer force() {
        checkMapped();
        if ((address != 0) && (capacity() != 0)) {
            long offset = mappingOffset();
            force0(fd, mappingAddress(offset), mappingLength(offset));
        }
        return this;
    }

native方法的force0的JVM实现。

JNIEXPORT void JNICALL
Java_java_nio_MappedByteBuffer_force0(JNIEnv *env, jobject obj, jobject fdo,
                                      jlong address, jlong len)

{
    void* a = (void *)jlong_to_ptr(address);
    int result = msync(a, (size_t)len, MS_SYNC);
    if (result == -1) {
        JNU_ThrowIOExceptionWithLastError(env, "msync failed");
    }
}

force0的实现使用系统调用msync,msync的作用是将内存映射的的更改刷新回文件系统,flag参数为MS_SYNC,表示同步等待刷新成功之后返回。

int msync(void *addr, size_t length, int flags);

FileChannel和MappedByteBuffer

前面的案例中多次出现了FileChannel,包括mmap使用案例,mappedByteBuffer是通过fileChannel.map方法创建的。

FileChannel fileChannel = new RandomAccessFile(new File("./hehe.txt"), "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 01024 * 1024);

那么FileChannel和MappedByteBuffer有什么关联吗,先看一下fileChannel.map方法的javadoc:

 A mapping, once established, is not dependent upon the file channel
 that was used to create it.  Closing the channel, in particular, has no
 effect upon the validity of the mapping.

简单翻一下就是“一个映射一旦创建了就和FileChannel没有关系了,即使关闭了FileChannel,也不影响映射的有效性”。看似MappedByteBuffer只是由FileChannel创建的,创建之后二者就没什么关系了。我们来看一下下面的代码:

public static void main(String[] args) throws IOException {
        FileChannel fileChannel = new RandomAccessFile(new File("./hehe.txt"), "rw").getChannel();
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 01024);

        //使用FileChannel写,MappedByteBuffer读
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
        byteBuffer.put("hehe".getBytes(StandardCharsets.UTF_8));
        byteBuffer.flip();
        fileChannel.write(byteBuffer);
        byte[] bytes = new byte[4];
        mappedByteBuffer.get(bytes);
        mappedByteBuffer.position(0);
        System.out.println("使用FileChannel写,MappedByteBuffer读,结果:" + new String(bytes));

        //使用MappedByteBuffer写,使用FileChannel读
        mappedByteBuffer.position(4);
        mappedByteBuffer.put("haha".getBytes(StandardCharsets.UTF_8));
        final ByteBuffer allocate = ByteBuffer.allocate(4);
        fileChannel.read(allocate, 4);
        System.out.println("使用MappedByteBuffer写,使用FileChannel读:" + new String(allocate.array()));
    }

输出结果

使用FileChannel写,MappedByteBuffer读,结果:hehe
使用MappedByteBuffer写,使用FileChannel读:haha

可以看到FileChannle和MappedByteBuffer之间的数据都是互相可见的,其实这两个都可以理解为PageCache,在rocketmq中,就有一种模式是使用FileChannle写,使用MappedByteBuffer读,有兴趣的可以看一下我之前的文章。

总结

本文总结了一些令人懵逼的ByteBuffer的一些原理和常见的操作,希望你看完本文下次遇见就不懵逼了^_^。

分类:

后端

标签:

Java

作者介绍

h
hening1
V1