@@ -258,6 +258,21 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
258258 }
259259 }
260260
261+ private Versioned <BookieServiceInfo > updateWritableBookieInfo (BookieId bookieId , boolean isReadonly ,
262+ byte [] bytes , Stat stat )
263+ throws IOException {
264+ BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo (bookieId , bytes );
265+ Versioned <BookieServiceInfo > result = new Versioned <>(bookieServiceInfo ,
266+ new LongVersion (stat .getCversion ()));
267+ log .info ("Update BookieInfoCache (writable bookie) {} -> {}" , bookieId , result .getValue ());
268+ if (isReadonly ) {
269+ readOnlyBookieInfo .put (bookieId , result );
270+ } else {
271+ writableBookieInfo .put (bookieId , result );
272+ }
273+ return result ;
274+ }
275+
261276 /**
262277 * Read BookieServiceInfo from ZooKeeper and updates the local cache.
263278 *
@@ -273,11 +288,8 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
273288 (int rc , String path , Object o , byte [] bytes , Stat stat ) -> {
274289 if (KeeperException .Code .OK .intValue () == rc ) {
275290 try {
276- BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo (bookieId , bytes );
277- Versioned <BookieServiceInfo > result = new Versioned <>(bookieServiceInfo ,
278- new LongVersion (stat .getCversion ()));
279- log .info ("Update BookieInfoCache (writable bookie) {} -> {}" , bookieId , result .getValue ());
280- writableBookieInfo .put (bookieId , result );
291+ Versioned <BookieServiceInfo > result =
292+ updateWritableBookieInfo (bookieId , false , bytes , stat );
281293 promise .complete (result );
282294 } catch (IOException ex ) {
283295 log .error ("Cannot update BookieInfo for " , ex );
@@ -291,14 +303,8 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
291303 (int rc2 , String path2 , Object o2 , byte [] bytes2 , Stat stat2 ) -> {
292304 if (KeeperException .Code .OK .intValue () == rc2 ) {
293305 try {
294- BookieServiceInfo bookieServiceInfo =
295- deserializeBookieServiceInfo (bookieId , bytes2 );
296306 Versioned <BookieServiceInfo > result =
297- new Versioned <>(bookieServiceInfo ,
298- new LongVersion (stat2 .getCversion ()));
299- log .info ("Update BookieInfoCache (readonly bookie) {} -> {}" ,
300- bookieId , result .getValue ());
301- readOnlyBookieInfo .put (bookieId , result );
307+ updateWritableBookieInfo (bookieId , true , bytes , stat );
302308 promise .complete (result );
303309 } catch (IOException ex ) {
304310 log .error ("Cannot update BookieInfo for " , ex );
@@ -329,18 +335,15 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonly
329335 CompletableFuture <Versioned <BookieServiceInfo >> promise = new CompletableFuture <>();
330336 // not found, looking for a readonly bookie
331337 zk .getData (pathAsReadonly , bookieServiceInfoCacheInvalidation ,
332- (int rc2 , String path2 , Object o2 , byte [] bytes2 , Stat stat2 ) -> {
333- if (KeeperException .Code .OK .intValue () == rc2 ) {
338+ (int rc , String path , Object o , byte [] bytes , Stat stat ) -> {
339+ if (KeeperException .Code .OK .intValue () == rc ) {
334340 try {
335- BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo (bookieId , bytes2 );
336341 Versioned <BookieServiceInfo > result =
337- new Versioned <>(bookieServiceInfo , new LongVersion (stat2 .getCversion ()));
338- log .info ("Update BookieInfoCache (readonly bookie) {} -> {}" , bookieId , result .getValue ());
339- readOnlyBookieInfo .put (bookieId , result );
342+ updateWritableBookieInfo (bookieId , true , bytes , stat );
340343 promise .complete (result );
341344 } catch (IOException ex ) {
342345 log .error ("Cannot update BookieInfo for " , ex );
343- promise .completeExceptionally (KeeperException .create (KeeperException .Code .get (rc2 ), path2 )
346+ promise .completeExceptionally (KeeperException .create (KeeperException .Code .get (rc ), path )
344347 .initCause (ex ));
345348 return ;
346349 }
@@ -360,11 +363,8 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsWritable
360363 (int rc , String path , Object o , byte [] bytes , Stat stat ) -> {
361364 if (KeeperException .Code .OK .intValue () == rc ) {
362365 try {
363- BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo (bookieId , bytes );
364- Versioned <BookieServiceInfo > result = new Versioned <>(bookieServiceInfo ,
365- new LongVersion (stat .getCversion ()));
366- log .info ("Update BookieInfoCache (writable bookie) {} -> {}" , bookieId , result .getValue ());
367- writableBookieInfo .put (bookieId , result );
366+ Versioned <BookieServiceInfo > result =
367+ updateWritableBookieInfo (bookieId , false , bytes , stat );
368368 promise .complete (result );
369369 } catch (IOException ex ) {
370370 log .error ("Cannot update BookieInfo for " , ex );
0 commit comments