Skip to content

Commit

Permalink
refactor: better chunk loading
Browse files Browse the repository at this point in the history
  • Loading branch information
smartcmd committed May 18, 2024
1 parent 272b4d4 commit 622683d
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.allaymc.api.utils;

/**
* Allay Project 2024/5/18
*
* @author daoge_cmd
*/
public class AllayComputeThread extends Thread {

public static final String ALLAY_COMPUTATION_THREAD_PREFIX = "allay-compute-thread-";

public static boolean isAllayComputeThread(Thread thread) {
return thread instanceof AllayComputeThread;
}

public AllayComputeThread(Runnable task) {
super(task);
setName(ALLAY_COMPUTATION_THREAD_PREFIX + threadId());
}
}
7 changes: 3 additions & 4 deletions Allay-API/src/main/java/org/allaymc/api/world/Dimension.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import org.allaymc.api.block.data.BlockFace;
import org.allaymc.api.block.data.BlockStateWithPos;
import org.allaymc.api.block.property.type.BlockPropertyType;
import org.allaymc.api.block.type.BlockState;
import org.allaymc.api.blockentity.BlockEntity;
import org.allaymc.api.entity.Entity;
Expand Down Expand Up @@ -138,7 +137,7 @@ default void setBlockState(Vector3ic pos, BlockState blockState, int layer, bool
default void setBlockState(int x, int y, int z, BlockState blockState, int layer, boolean send, boolean update) {
var chunk = getChunkService().getChunkByLevelPos(x, z);
if (chunk == null) {
chunk = getChunkService().getChunkImmediately(x >> 4, z >> 4);
chunk = getChunkService().getOrLoadChunkSynchronously(x >> 4, z >> 4);
}
int xIndex = x & 15;
int zIndex = z & 15;
Expand Down Expand Up @@ -178,7 +177,7 @@ default BlockState getBlockState(int x, int y, int z, int layer) {
return AIR_TYPE.getDefaultState();
var chunk = getChunkService().getChunkByLevelPos(x, z);
if (chunk == null) {
chunk = getChunkService().getChunkImmediately(x >> 4, z >> 4);
chunk = getChunkService().getOrLoadChunkSynchronously(x >> 4, z >> 4);
}
return chunk.getBlockState(x & 15, y, z & 15, layer);
}
Expand Down Expand Up @@ -359,7 +358,7 @@ default boolean isAABBInWorld(AABBfc aabb) {
default BlockEntity getBlockEntity(int x, int y, int z) {
var chunk = getChunkService().getChunkByLevelPos(x, z);
if (chunk == null) {
chunk = getChunkService().getChunkImmediately(x >> 4, z >> 4);
chunk = getChunkService().getOrLoadChunkSynchronously(x >> 4, z >> 4);
}
return chunk.getBlockEntity(x & 15, y, z & 15);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface ChunkService extends ChunkAccessible {
CompletableFuture<Chunk> loadChunk(int x, int z);

@SlowOperation
Chunk getChunkImmediately(int x, int z);
Chunk getOrLoadChunkSynchronously(int x, int z);

void unloadChunk(int x, int z);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ default void tick(long currentTick) {}

CompletableFuture<Chunk> readChunk(int chunkX, int chunkZ, DimensionInfo dimensionInfo) throws WorldStorageException;

Chunk readChunkSynchronously(int chunkX, int chunkZ, DimensionInfo dimensionInfo) throws WorldStorageException;

CompletableFuture<Void> writeChunk(Chunk chunk) throws WorldStorageException;

boolean containChunk(int chunkX, int chunkZ, DimensionInfo dimensionInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.allaymc.api.server.BanInfo;
import org.allaymc.api.server.Server;
import org.allaymc.api.server.Whitelist;
import org.allaymc.api.utils.AllayComputeThread;
import org.allaymc.api.utils.AllayStringUtils;
import org.allaymc.api.utils.GameLoop;
import org.allaymc.api.world.DimensionInfo;
Expand Down Expand Up @@ -93,11 +94,7 @@ public final class AllayServer implements Server {
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
r -> {
Thread thread = new Thread(r);
thread.setName("computation-thread-" + thread.threadId());
return thread;
});
r -> new AllayComputeThread(r));
// Thread pool for executing I/O-intensive tasks
@Getter
private final ExecutorService virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ protected void teleportInDimension(Location3fc target) {
protected void teleportOverDimension(Location3fc target) {
// Teleporting to another dimension, there will be more works to be done
this.location.dimension().getEntityService().removeEntity(thisEntity, () -> {
target.dimension().getChunkService().getChunkImmediately((int) target.x() >> 4, (int) target.z() >> 4);
target.dimension().getChunkService().getOrLoadChunkSynchronously((int) target.x() >> 4, (int) target.z() >> 4);
setLocation(target, false);
target.dimension().getEntityService().addEntity(thisEntity);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ protected void teleportOverDimension(Location3fc target) {
networkComponent.sendPacket(targetDim.getWorld().getWorldData().getGameRules().buildPacket());
}
this.location.dimension().removePlayer(thisEntity, () -> {
targetDim.getChunkService().getChunkImmediately((int) target.x() >> 4, (int) target.z() >> 4);
targetDim.getChunkService().getOrLoadChunkSynchronously((int) target.x() >> 4, (int) target.z() >> 4);
setLocation(target, false);
sendLocationToSelf();
if (currentDim.getDimensionInfo().dimensionId() != targetDim.getDimensionInfo().dimensionId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.allaymc.api.block.registry.BlockTypeRegistry;
import org.allaymc.api.entity.component.player.EntityPlayerContainerHolderComponent;
import org.allaymc.api.utils.Identifier;
import org.allaymc.api.client.data.LoginData;
import org.allaymc.api.client.storage.PlayerData;
Expand Down Expand Up @@ -277,7 +276,7 @@ public void initializePlayer() {
// Validate and set spawn point
validateAndSetSpawnPoint(playerData);
// Load the current point chunk firstly so that we can add player entity into the chunk
dimension.getChunkService().getChunkImmediately(
dimension.getChunkService().getOrLoadChunkSynchronously(
(int) currentPos.x() >> 4,
(int) currentPos.z() >> 4
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public PacketSignal handleAsync(EntityPlayer player, PlayerActionPacket packet)
case RESPAWN -> {
var spawnPoint = player.getSpawnPoint();
var dimension = spawnPoint.dimension();
dimension.getChunkService().getChunkImmediately(spawnPoint.x() >> 4, spawnPoint.z() >> 4);
dimension.getChunkService().getOrLoadChunkSynchronously(spawnPoint.x() >> 4, spawnPoint.z() >> 4);
dimension.addPlayer(player, () -> {
player.teleport(new Location3f(spawnPoint.x(), spawnPoint.y(), spawnPoint.z(), dimension));
player.setSprinting(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package org.allaymc.server.world.service;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongComparator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.allaymc.api.annotation.SlowOperation;
import org.allaymc.api.datastruct.collections.nb.Long2ObjectNonBlockingMap;
import org.allaymc.api.server.Server;
import org.allaymc.api.utils.AllayComputeThread;
import org.allaymc.api.utils.HashUtils;
import org.allaymc.api.utils.MathUtils;
import org.allaymc.api.world.Dimension;
Expand Down Expand Up @@ -50,7 +53,7 @@ public class AllayChunkService implements ChunkService {

private final Map<Long, Chunk> loadedChunks = new Long2ObjectNonBlockingMap<>();
private final Map<Long, CompletableFuture<Chunk>> loadingChunks = new Long2ObjectNonBlockingMap<>();
private final Map<ChunkLoader, ChunkLoaderManager> chunkLoaderManagers = new Object2ObjectArrayMap<>(Server.SETTINGS.genericSettings().maxClientCount());
private final Map<ChunkLoader, ChunkLoaderManager> chunkLoaderManagers = new Object2ObjectOpenHashMap<>();
private final Dimension dimension;
private final WorldStorage worldStorage;
private final Map<Long, Integer> unusedChunkClearCountDown = new Long2IntOpenHashMap();
Expand Down Expand Up @@ -136,8 +139,12 @@ public Chunk getChunk(long chunkHash) {

@SlowOperation
@Override
public synchronized Chunk getChunkImmediately(int x, int z) {
return getOrLoadChunk(x, z).join();
public Chunk getOrLoadChunkSynchronously(int x, int z) {
var chunk = getChunk(x, z);
if (chunk != null) {
return chunk;
}
return loadChunkSynchronously(x, z, null);
}

@Override
Expand Down Expand Up @@ -176,9 +183,21 @@ public CompletableFuture<Chunk> loadChunk(int x, int z) {
if (isChunkLoaded(hashXZ)) {
throw new IllegalStateException("Chunk is already loaded");
}
CompletableFuture<Chunk> future;
// Prevent multiple threads from putting the same chunk into loadingChunks at the same time and wasting computing resources
var presentValue = loadingChunks.putIfAbsent(hashXZ, future = worldStorage.readChunk(x, z, DimensionInfo.OVERWORLD)
var future = new CompletableFuture<Chunk>();
// 只有一个线程可以成功向loadingChunks中写入future,其他线程将获取到写入成功线程的future
var presentValue = loadingChunks.putIfAbsent(hashXZ, future);
if (presentValue != null) {
return presentValue;
}
if (AllayComputeThread.isAllayComputeThread(Thread.currentThread())) {
// 若当前线程已经为计算线程,则直接在此线程上加载区块
// 否则会出现一个计算线程等待另外一个计算线程的情况,造成线程资源的浪费
// If the current thread is already a computing thread, load the block directly on this thread
// Otherwise, one computing thread will wait for another computing thread, resulting in a waste of thread resources
loadChunkSynchronously(x, z, future);
return future;
}
worldStorage.readChunk(x, z, DimensionInfo.OVERWORLD)
.exceptionally(t -> {
log.error("Error while reading chunk ({},{}) !", x, z, t);
return AllayUnsafeChunk.builder().emptyChunk(x, z, dimension.getDimensionInfo()).toSafeChunk();
Expand All @@ -192,15 +211,40 @@ public CompletableFuture<Chunk> loadChunk(int x, int z) {
prepareChunk.beforeSetChunk(dimension);
setChunk(x, z, prepareChunk);
prepareChunk.afterSetChunk(dimension);
future.complete(prepareChunk);
loadingChunks.remove(hashXZ);
return prepareChunk;
})
);
if (presentValue == null) {
return future;
} else {
return presentValue;
});
return future;
}

@SneakyThrows
protected Chunk loadChunkSynchronously(int x, int z, CompletableFuture<Chunk> futureAlreadyExists) {
var hash = HashUtils.hashXZ(x, z);
var synchronizedFuture = futureAlreadyExists;
if (futureAlreadyExists == null) {
synchronizedFuture = new CompletableFuture<>();
loadingChunks.put(hash, synchronizedFuture);
}
Chunk chunk;
try {
chunk = worldStorage.readChunkSynchronously(x, z, DimensionInfo.OVERWORLD);
} catch (Throwable t) {
log.error("Error while reading chunk ({},{}) !", x, z, t);
chunk = AllayUnsafeChunk.builder().emptyChunk(x, z, dimension.getDimensionInfo()).toSafeChunk();
}
try {
generateChunkIfNeed(chunk);
} catch (Throwable t) {
log.error("Error while generating chunk ({},{}) !", x, z, t);
chunk = AllayUnsafeChunk.builder().emptyChunk(x, z, dimension.getDimensionInfo()).toSafeChunk();
}
chunk.beforeSetChunk(dimension);
setChunk(x, z, chunk);
chunk.afterSetChunk(dimension);
synchronizedFuture.complete(chunk);
loadingChunks.remove(hash);
return chunk;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,30 @@ private void initWorldData(String worldName) {

@Override
public CompletableFuture<Chunk> readChunk(int x, int z, DimensionInfo dimensionInfo) throws WorldStorageException {
return CompletableFuture.supplyAsync(() -> {
AllayUnsafeChunk.Builder builder = AllayUnsafeChunk.builder()
.chunkX(x)
.chunkZ(z)
.dimensionInfo(dimensionInfo);
byte[] versionValue = this.db.get(LevelDBKeyUtils.VERSION.getKey(x, z, dimensionInfo));
if (versionValue == null) {
versionValue = this.db.get(LevelDBKeyUtils.LEGACY_VERSION.getKey(x, z, dimensionInfo));
}
if (versionValue == null) {
return builder.build().toSafeChunk();
}
byte[] finalized = this.db.get(LevelDBKeyUtils.CHUNK_FINALIZED_STATE.getKey(x, z, dimensionInfo));
if (finalized == null) {
builder.state(ChunkState.FINISHED);
} else {
builder.state(ChunkState.values()[Unpooled.wrappedBuffer(finalized).readIntLE() + 1]);
}
LevelDBChunkSerializer.INSTANCE.deserialize(this.db, builder);
return CompletableFuture.supplyAsync(() -> readChunkSynchronously(x, z, dimensionInfo), Server.getInstance().getVirtualThreadPool());
}

@Override
public Chunk readChunkSynchronously(int x, int z, DimensionInfo dimensionInfo) throws WorldStorageException {
AllayUnsafeChunk.Builder builder = AllayUnsafeChunk.builder()
.chunkX(x)
.chunkZ(z)
.dimensionInfo(dimensionInfo);
byte[] versionValue = this.db.get(LevelDBKeyUtils.VERSION.getKey(x, z, dimensionInfo));
if (versionValue == null) {
versionValue = this.db.get(LevelDBKeyUtils.LEGACY_VERSION.getKey(x, z, dimensionInfo));
}
if (versionValue == null) {
return builder.build().toSafeChunk();
}, Server.getInstance().getVirtualThreadPool());
}
byte[] finalized = this.db.get(LevelDBKeyUtils.CHUNK_FINALIZED_STATE.getKey(x, z, dimensionInfo));
if (finalized == null) {
builder.state(ChunkState.FINISHED);
} else {
builder.state(ChunkState.values()[Unpooled.wrappedBuffer(finalized).readIntLE() + 1]);
}
LevelDBChunkSerializer.INSTANCE.deserialize(this.db, builder);
return builder.build().toSafeChunk();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public AllayNonPersistentWorldStorage() {

@Override
public CompletableFuture<Chunk> readChunk(int x, int z, DimensionInfo dimensionInfo) {
return CompletableFuture.completedFuture(readChunkSynchronously(x, z, dimensionInfo));
}

@Override
public Chunk readChunkSynchronously(int x, int z, DimensionInfo dimensionInfo) throws WorldStorageException {
Dimension dimension = Server.getInstance().getWorldPool().getWorld(worldData.getName()).getDimension(dimensionInfo.dimensionId());
long l = HashUtils.hashXZ(x, z);
var chunk = chunks.get(l);
Expand All @@ -51,7 +56,7 @@ public CompletableFuture<Chunk> readChunk(int x, int z, DimensionInfo dimensionI
}
readEntities(l).stream().map(nbt -> EntityHelper.fromNBT(dimension, nbt)).forEach(e -> dimension.getEntityService().addEntity(e));
readBlockEntities(l).stream().map(nbt -> BlockEntityHelper.fromNBT(dimension, nbt)).forEach(chunk::addBlockEntity);
return CompletableFuture.completedFuture(chunk);
return chunk;
}

@Override
Expand Down

0 comments on commit 622683d

Please sign in to comment.