|
| 1 | +package convex.lattice.queue; |
| 2 | + |
| 3 | +import convex.core.data.ACell; |
| 4 | +import convex.core.data.AHashMap; |
| 5 | +import convex.core.data.AVector; |
| 6 | +import convex.core.data.Index; |
| 7 | +import convex.core.data.Keyword; |
| 8 | +import convex.core.data.prim.CVMLong; |
| 9 | +import convex.lattice.cursor.ALatticeCursor; |
| 10 | + |
| 11 | +/** |
| 12 | + * Represents a single topic in the lattice message queue system. |
| 13 | + * |
| 14 | + * <p>A topic contains one or more <b>partitions</b>, each an independent |
| 15 | + * {@link LatticeQueue} (append-only log), plus topic-level <b>metadata</b> |
| 16 | + * (e.g. partition count, retention policy, ownership).</p> |
| 17 | + * |
| 18 | + * <p>The topic state is a {@link TopicLattice} value: an {@code Index<Keyword, ACell>} |
| 19 | + * with {@code :partitions} (partition map) and {@code :meta} (metadata map).</p> |
| 20 | + * |
| 21 | + * <h2>Partitioning</h2> |
| 22 | + * <p>Producers can write to a specific partition or use {@link #offer(ACell, ACell)} |
| 23 | + * to auto-partition by key hash. The number of partitions is stored in topic |
| 24 | + * metadata under {@code :num-partitions} and must be set before auto-partitioning.</p> |
| 25 | + * |
| 26 | + * <h2>Usage</h2> |
| 27 | + * <pre>{@code |
| 28 | + * LatticeTopic topic = mq.topic("user-events"); |
| 29 | + * |
| 30 | + * // Configure partitions |
| 31 | + * topic.setNumPartitions(4); |
| 32 | + * |
| 33 | + * // Write to a specific partition |
| 34 | + * topic.partition(0).offer(Strings.create("event-1")); |
| 35 | + * |
| 36 | + * // Auto-partition by key hash |
| 37 | + * topic.offer(Strings.create("user-42"), Strings.create("login")); |
| 38 | + * }</pre> |
| 39 | + */ |
| 40 | +public class LatticeTopic { |
| 41 | + |
| 42 | + public static final Keyword KEY_NUM_PARTITIONS = Keyword.intern("num-partitions"); |
| 43 | + |
| 44 | + private final ALatticeCursor<?> cursor; |
| 45 | + |
| 46 | + /** |
| 47 | + * Wraps an existing cursor at the TopicLattice level. |
| 48 | + */ |
| 49 | + LatticeTopic(ALatticeCursor<?> cursor) { |
| 50 | + this.cursor = cursor; |
| 51 | + } |
| 52 | + |
| 53 | + /** |
| 54 | + * Returns a partition by integer ID. The partition is created implicitly on first write. |
| 55 | + * |
| 56 | + * @param id Partition number (0-based, like Kafka) |
| 57 | + * @return Queue for the specified partition |
| 58 | + */ |
| 59 | + public LatticeQueue partition(long id) { |
| 60 | + return partition(CVMLong.create(id)); |
| 61 | + } |
| 62 | + |
| 63 | + /** |
| 64 | + * Returns a partition by CVM key. |
| 65 | + * |
| 66 | + * <p>Ensures the topic state is initialised as an {@code Index} before |
| 67 | + * descending, so that {@code RT.assocIn} preserves the correct type on |
| 68 | + * writeback through the cursor hierarchy.</p> |
| 69 | + * |
| 70 | + * @param id Partition key (typically CVMLong) |
| 71 | + * @return Queue for the specified partition |
| 72 | + */ |
| 73 | + @SuppressWarnings("unchecked") |
| 74 | + public LatticeQueue partition(ACell id) { |
| 75 | + // Ensure topic state is Index before descended cursors write through it. |
| 76 | + // RT.assocIn creates MapLeaf for null intermediaries; by initialising to |
| 77 | + // Index.EMPTY first, subsequent assoc() calls preserve the Index type. |
| 78 | + ensureInitialised(); |
| 79 | + |
| 80 | + ALatticeCursor<AVector<ACell>> partCursor = cursor.descend(TopicLattice.KEY_PARTITIONS, id); |
| 81 | + return new LatticeQueue(partCursor); |
| 82 | + } |
| 83 | + |
| 84 | + // ===== Metadata ===== |
| 85 | + |
| 86 | + /** |
| 87 | + * Gets a topic metadata value by key. |
| 88 | + */ |
| 89 | + @SuppressWarnings("unchecked") |
| 90 | + public ACell getMeta(ACell key) { |
| 91 | + ALatticeCursor<Index<Keyword, ACell>> c = (ALatticeCursor<Index<Keyword, ACell>>) cursor; |
| 92 | + Index<Keyword, ACell> state = c.get(); |
| 93 | + AHashMap<ACell, ACell> meta = TopicLattice.getMeta(state); |
| 94 | + return meta.get(key); |
| 95 | + } |
| 96 | + |
| 97 | + /** |
| 98 | + * Sets a topic metadata value. |
| 99 | + */ |
| 100 | + @SuppressWarnings("unchecked") |
| 101 | + public void setMeta(ACell key, ACell value) { |
| 102 | + ALatticeCursor<Index<Keyword, ACell>> c = (ALatticeCursor<Index<Keyword, ACell>>) cursor; |
| 103 | + c.updateAndGet(state -> { |
| 104 | + if (state == null) state = TopicLattice.INSTANCE.zero(); |
| 105 | + AHashMap<ACell, ACell> meta = TopicLattice.getMeta(state); |
| 106 | + return state.assoc(TopicLattice.KEY_META, meta.assoc(key, value)); |
| 107 | + }); |
| 108 | + } |
| 109 | + |
| 110 | + /** |
| 111 | + * Returns the configured number of partitions, or 0 if not set. |
| 112 | + */ |
| 113 | + public long getNumPartitions() { |
| 114 | + ACell val = getMeta(KEY_NUM_PARTITIONS); |
| 115 | + if (val instanceof CVMLong l) return l.longValue(); |
| 116 | + return 0; |
| 117 | + } |
| 118 | + |
| 119 | + /** |
| 120 | + * Configures the number of partitions for this topic. |
| 121 | + */ |
| 122 | + public void setNumPartitions(long n) { |
| 123 | + setMeta(KEY_NUM_PARTITIONS, CVMLong.create(n)); |
| 124 | + } |
| 125 | + |
| 126 | + /** |
| 127 | + * Produces a keyed record to an auto-selected partition. |
| 128 | + * |
| 129 | + * <p>The partition is chosen by hashing the key modulo the configured |
| 130 | + * partition count. Records with the same key always go to the same |
| 131 | + * partition, preserving per-key ordering (the same guarantee Kafka provides).</p> |
| 132 | + * |
| 133 | + * <p>Requires {@link #setNumPartitions(long)} to have been called first.</p> |
| 134 | + * |
| 135 | + * @param key Record key (used for partition selection and stored in the entry) |
| 136 | + * @param value Record value |
| 137 | + * @return Absolute offset assigned within the selected partition |
| 138 | + * @throws IllegalStateException if partition count is not configured |
| 139 | + */ |
| 140 | + public long offer(ACell key, ACell value) { |
| 141 | + long numPartitions = getNumPartitions(); |
| 142 | + if (numPartitions <= 0) { |
| 143 | + throw new IllegalStateException( |
| 144 | + "Topic has no partitions configured. Call setNumPartitions() first."); |
| 145 | + } |
| 146 | + long partId = Math.abs(key.getHash().longValue()) % numPartitions; |
| 147 | + return partition(partId).offer(key, value); |
| 148 | + } |
| 149 | + |
| 150 | + /** |
| 151 | + * Creates a forked copy of this topic for independent operation. |
| 152 | + * All partitions within this topic are forked together. |
| 153 | + */ |
| 154 | + @SuppressWarnings("unchecked") |
| 155 | + public LatticeTopic fork() { |
| 156 | + return new LatticeTopic(((ALatticeCursor<ACell>) cursor).fork()); |
| 157 | + } |
| 158 | + |
| 159 | + /** |
| 160 | + * Syncs this forked topic back to its parent, merging all partition changes. |
| 161 | + */ |
| 162 | + @SuppressWarnings("unchecked") |
| 163 | + public void sync() { |
| 164 | + ((ALatticeCursor<ACell>) cursor).sync(); |
| 165 | + } |
| 166 | + |
| 167 | + /** |
| 168 | + * Returns the underlying lattice cursor. |
| 169 | + */ |
| 170 | + public ALatticeCursor<?> cursor() { |
| 171 | + return cursor; |
| 172 | + } |
| 173 | + |
| 174 | + /** |
| 175 | + * Ensures the topic cursor is initialised to an Index so that |
| 176 | + * descended cursors (which write back via RT.assocIn) preserve the type. |
| 177 | + */ |
| 178 | + @SuppressWarnings("unchecked") |
| 179 | + private void ensureInitialised() { |
| 180 | + ALatticeCursor<Index<Keyword, ACell>> c = (ALatticeCursor<Index<Keyword, ACell>>) cursor; |
| 181 | + c.updateAndGet(state -> { |
| 182 | + if (state == null) return TopicLattice.INSTANCE.zero(); |
| 183 | + return state; |
| 184 | + }); |
| 185 | + } |
| 186 | +} |
0 commit comments