Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/websubhub-consolidator/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "websubhub"
version = "1.15.0"
version = "1.15.1"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "http"},
Expand Down
32 changes: 30 additions & 2 deletions components/websubhub-consolidator/consolidator_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ http:Service consolidatorService = service object {
topics: getTopics(),
subscriptions: getSubscriptions()
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 1

Suggested change
};
};
log:printInfo("State snapshot retrieved successfully", topicCount = stateSnapshot.topics.length(), subscriptionCount = stateSnapshot.subscriptions.length());

log:printInfo("Request received to retrieve state-snapshot, hence responding with the current state-snapshot", state = stateSnapshot);
log:printDebug("Request received to retrieve state-snapshot, hence responding with the current state-snapshot", state = stateSnapshot);
return stateSnapshot;
}
};
Expand All @@ -39,46 +39,74 @@ isolated function consolidateSystemState() returns error? {
do {
while true {
kafka:BytesConsumerRecord[] records = check conn:websubEventConsumer->poll(config:kafka.consumer.pollingInterval);
log:printDebug("Polled Kafka records for state consolidation", recordCount = records.length(), pollingInterval = config:kafka.consumer.pollingInterval);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have any computation, it's better to use a function pointer, as it is only executed when we enable the debug level. Otherwise, it has unnecessary tasks.

log:printDebug("Polled Kafka records for state consolidation", 
                             recordCount = isolated function() returns int { return records.length();}, 
                             pollingInterval = config:kafka.consumer.pollingInterval);


if records.length() > 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 3

Suggested change
if records.length() > 0 {
if records.length() > 0 {
log:printInfo("Processing batch of Kafka records", batchSize = records.length());

log:printDebug("Processing batch of Kafka records", batchSize = records.length());
}

foreach kafka:BytesConsumerRecord currentRecord in records {
string lastPersistedData = check string:fromBytes(currentRecord.value);
int messageSize = lastPersistedData.length();
log:printDebug("Processing Kafka record for consolidation", messageSize = messageSize, offset = currentRecord.offset);
Comment on lines +50 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do the same as above without initializing the variable.

error? result = processPersistedData(lastPersistedData);
if result is error {
log:printError("Error occurred while processing received event ", 'error = result);
common:logError("Error occurred while processing received event ", result);
} else {
log:printDebug("Successfully processed Kafka record", offset = currentRecord.offset);
}
}
}
} on fail var e {
log:printDebug("Error in consolidation loop, closing Kafka consumer", gracePeriod = config:kafka.consumer.gracefulClosePeriod);
_ = check conn:websubEventConsumer->close(config:kafka.consumer.gracefulClosePeriod);
return e;
}
}

isolated function processPersistedData(string persistedData) returns error? {
log:printDebug("Starting persisted data processing", dataSize = persistedData.length());

json payload = check value:fromJsonString(persistedData);
string hubMode = check payload.hubMode;
log:printDebug("Processing event based on hub mode", hubMode = hubMode);

match hubMode {
"register" => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 4

Suggested change
"register" => {
match hubMode {
"register" => {
log:printInfo("Processing topic registration event");

log:printDebug("Processing topic registration event");
check processTopicRegistration(payload);
log:printDebug("Topic registration event processed successfully");
}
"deregister" => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 5

Suggested change
"deregister" => {
"deregister" => {
log:printInfo("Processing topic deregistration event");

log:printDebug("Processing topic deregistration event");
check processTopicDeregistration(payload);
log:printDebug("Topic deregistration event processed successfully");
}
"subscribe" => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 6

Suggested change
"subscribe" => {
"subscribe" => {
log:printInfo("Processing subscription event");

log:printDebug("Processing subscription event");
check processSubscription(payload);
log:printDebug("Subscription event processed successfully");
}
"unsubscribe" => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 7

Suggested change
"unsubscribe" => {
"unsubscribe" => {
log:printInfo("Processing unsubscription event");

log:printDebug("Processing unsubscription event");
check processUnsubscription(payload);
log:printDebug("Unsubscription event processed successfully");
}
_ => {
common:logError("Invalid hub mode received", hubMode = hubMode);
return error(string `Error occurred while deserializing subscriber events with invalid hubMode [${hubMode}]`);
}
}
log:printDebug("Completed processing persisted data", hubMode = hubMode);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 8

Suggested change
}
isolated function processStateUpdate() returns error? {
log:printInfo("Processing state update - gathering current system state");


isolated function processStateUpdate() returns error? {
log:printDebug("Processing state update - gathering current system state");
common:SystemStateSnapshot stateSnapshot = {
topics: getTopics(),
subscriptions: getSubscriptions()
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 9

Suggested change
};
check persist:persistWebsubEventsSnapshot(stateSnapshot);
log:printInfo("State snapshot persisted successfully", topicCount = stateSnapshot.topics.length(), subscriptionCount = stateSnapshot.subscriptions.length());

log:printDebug("Created state snapshot for persistence", topicCount = stateSnapshot.topics.length(), subscriptionCount = stateSnapshot.subscriptions.length());
check persist:persistWebsubEventsSnapshot(stateSnapshot);
log:printDebug("State snapshot persisted successfully");
}
4 changes: 2 additions & 2 deletions components/websubhub-consolidator/init_consolidator.bal
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function main() returns error? {
// Initialize consolidator-service state
error? stateSyncResult = syncSystemState();
if stateSyncResult is error {
common:logError("Error while syncing system state during startup", stateSyncResult, "FATAL");
common:logError("Error while syncing system state during startup", stateSyncResult, severity = "FATAL");
return;
}

Expand Down Expand Up @@ -67,7 +67,7 @@ isolated function syncSystemState() returns error? {
check persist:persistWebsubEventsSnapshot(lastStateSnapshot);
}
} on fail error kafkaError {
common:logError("Error occurred while syncing system-state", kafkaError, "FATAL");
common:logError("Error occurred while syncing system-state", kafkaError, severity = "FATAL");
error? result = check websubEventsSnapshotConsumer->close(config:kafka.consumer.gracefulClosePeriod);
if result is error {
common:logError("Error occurred while gracefully closing kafka:Consumer", result);
Expand Down
18 changes: 8 additions & 10 deletions components/websubhub-consolidator/modules/common/utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/log;
import ballerina/random;

Expand Down Expand Up @@ -51,15 +52,12 @@ public isolated function generateRandomString() returns string {

# Logs errors with proper details.
#
# + baseErrorMsg - Base error message
# + err - Current error
# + severity - Severity of the error
public isolated function logError(string baseErrorMsg, error err, string severity = "RECOVERABLE") {
string errorMsg = string `${baseErrorMsg}: ${err.message()}`;
error? cause = err.cause();
while cause is error {
errorMsg += string `: ${cause.message()}`;
cause = cause.cause();
# + msg - Base error message
# + error - Current error
# + keyValues - Additional key values to be logged
public isolated function logError(string msg, error? 'error = (), *log:KeyValues keyValues) {
if !keyValues.hasKey("severity") {
keyValues["severity"] = "RECOVERABLE";
}
log:printError(errorMsg, severity = severity, stackTrace = err.stackTrace());
log:printError(msg, 'error, keyValues = keyValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ import websubhub.consolidator.common;
import websubhub.consolidator.config;
import websubhub.consolidator.connections as conn;

import ballerina/log;

public isolated function persistWebsubEventsSnapshot(common:SystemStateSnapshot systemStateSnapshot) returns error? {
log:printDebug("Persisting system state snapshot", topicCount = systemStateSnapshot.topics.length(), subscriptionCount = systemStateSnapshot.subscriptions.length());
Comment on lines 23 to +24

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 11

Suggested change
public isolated function persistWebsubEventsSnapshot(common:SystemStateSnapshot systemStateSnapshot) returns error? {
log:printDebug("Persisting system state snapshot", topicCount = systemStateSnapshot.topics.length(), subscriptionCount = systemStateSnapshot.subscriptions.length());
public isolated function persistWebsubEventsSnapshot(common:SystemStateSnapshot systemStateSnapshot) returns error? {
log:printInfo("Persisting system state snapshot", topicCount = systemStateSnapshot.topics.length(), subscriptionCount = systemStateSnapshot.subscriptions.length());

json payload = systemStateSnapshot.toJson();
check produceKafkaMessage(config:state.snapshot.topic, payload);
log:printDebug("System state snapshot persisted successfully", targetTopic = config:state.snapshot.topic);
Comment on lines 26 to +27

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 12

Suggested change
check produceKafkaMessage(config:state.snapshot.topic, payload);
log:printDebug("System state snapshot persisted successfully", targetTopic = config:state.snapshot.topic);
check produceKafkaMessage(config:state.snapshot.topic, payload);
log:printInfo("System state snapshot persisted successfully", targetTopic = config:state.snapshot.topic);

}

isolated function produceKafkaMessage(string topicName, json payload) returns error? {
log:printDebug("Producing Kafka message", targetTopic = topicName);
Comment on lines 30 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 13

Suggested change
isolated function produceKafkaMessage(string topicName, json payload) returns error? {
log:printDebug("Producing Kafka message", targetTopic = topicName);
isolated function produceKafkaMessage(string topicName, json payload) returns error? {
if (log:isDebugEnabled()) {
log:printDebug("Producing Kafka message", targetTopic = topicName, payloadSize = payload.toJsonString().length());
}

byte[] serializedContent = payload.toJsonString().toBytes();
check conn:statePersistProducer->send({topic: topicName, value: serializedContent});
check conn:statePersistProducer->'flush();
log:printDebug("Message successfully sent to Kafka", topic = topicName);
}
35 changes: 22 additions & 13 deletions components/websubhub-consolidator/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,63 @@

import websubhub.consolidator.common;

import ballerina/lang.value;
import ballerina/log;
import ballerina/websubhub;

isolated map<websubhub:VerifiedSubscription> subscribersCache = {};

isolated function deSerializeSubscribersMessage(string lastPersistedData) returns websubhub:VerifiedSubscription[]|error {
websubhub:VerifiedSubscription[] currentSubscriptions = [];
json[] payload = <json[]>check value:fromJsonString(lastPersistedData);
foreach var data in payload {
websubhub:VerifiedSubscription subscription = check data.cloneWithType(websubhub:VerifiedSubscription);
currentSubscriptions.push(subscription);
}
return currentSubscriptions;
}

isolated function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
log:printDebug("Refreshing subscribers cache from persisted data", persistedSubscriberCount = persistedSubscribers.length());
Comment on lines 24 to +25

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 14

Suggested change
isolated function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
log:printDebug("Refreshing subscribers cache from persisted data", persistedSubscriberCount = persistedSubscribers.length());
isolated function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
log:printInfo("Starting subscribers cache refresh", persistedSubscriberCount = persistedSubscribers.length());

foreach var subscriber in persistedSubscribers {
string groupName = common:generatedSubscriberId(subscriber.hubTopic, subscriber.hubCallback);
lock {
subscribersCache[groupName] = subscriber.cloneReadOnly();
}
log:printDebug("Added subscriber to cache during refresh", subscriberId = groupName, topic = subscriber.hubTopic, callback = subscriber.hubCallback);
}
log:printDebug("Subscribers cache refresh completed", totalCachedSubscribers = subscribersCache.length());
Comment on lines 32 to +33

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 15

Suggested change
}
log:printDebug("Subscribers cache refresh completed", totalCachedSubscribers = subscribersCache.length());
}
log:printInfo("Subscribers cache refresh completed", totalCachedSubscribers = subscribersCache.length());

}

isolated function processSubscription(json payload) returns error? {
log:printDebug("Processing subscription event");
websubhub:VerifiedSubscription subscription = check payload.cloneWithType(websubhub:VerifiedSubscription);
string subscriberId = common:generatedSubscriberId(subscription.hubTopic, subscription.hubCallback);
log:printDebug("Deserialized subscription", subscriberId = subscriberId, topic = subscription.hubTopic, callback = subscription.hubCallback);
boolean subscriptionAdded = false;
lock {
// add the subscriber if subscription event received
if !subscribersCache.hasKey(subscriberId) {
subscribersCache[subscriberId] = subscription.cloneReadOnly();
subscriptionAdded = true;
log:printDebug("Added new subscription to cache", subscriberId = subscriberId, totalSubscriptions = subscribersCache.length());
} else {
log:printDebug("Subscription already exists in cache, skipping", subscriberId = subscriberId);
}
}
check processStateUpdate();
log:printDebug("Subscription processing completed", subscriberId = subscriberId, wasAdded = subscriptionAdded);
Comment on lines 52 to +53

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 16

Suggested change
check processStateUpdate();
log:printDebug("Subscription processing completed", subscriberId = subscriberId, wasAdded = subscriptionAdded);
check processStateUpdate();
log:printInfo("Subscription processed successfully", subscriberId = subscriberId, wasAdded = subscriptionAdded);

}

isolated function processUnsubscription(json payload) returns error? {
log:printDebug("Processing unsubscription event");
websubhub:VerifiedUnsubscription unsubscription = check payload.cloneWithType(websubhub:VerifiedUnsubscription);
string subscriberId = common:generatedSubscriberId(unsubscription.hubTopic, unsubscription.hubCallback);
log:printDebug("Deserialized unsubscription", subscriberId = subscriberId, topic = unsubscription.hubTopic, callback = unsubscription.hubCallback);
lock {
// remove the subscriber if the unsubscription event received
_ = subscribersCache.removeIfHasKey(subscriberId);
websubhub:VerifiedSubscription? removedSubscription = subscribersCache.removeIfHasKey(subscriberId);
boolean subscriptionRemoved = removedSubscription is websubhub:VerifiedSubscription;
log:printDebug("Removed subscription from cache", subscriberId = subscriberId, wasRemoved = subscriptionRemoved, totalSubscriptions = subscribersCache.length());
Comment on lines +63 to +65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}
check processStateUpdate();
log:printDebug("Unsubscription processing completed", subscriberId = subscriberId);
Comment on lines 67 to +68

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 17

Suggested change
check processStateUpdate();
log:printDebug("Unsubscription processing completed", subscriberId = subscriberId);
check processStateUpdate();
log:printInfo("Unsubscription processed successfully", subscriberId = subscriberId);

}

isolated function getSubscriptions() returns websubhub:VerifiedSubscription[] {
websubhub:VerifiedSubscription[] subscriptions;
lock {
return subscribersCache.toArray().cloneReadOnly();
subscriptions = subscribersCache.toArray().cloneReadOnly();
}
log:printDebug("Retrieved subscriptions from cache", subscriptionCount = subscriptions.length());
return subscriptions;
}
21 changes: 19 additions & 2 deletions components/websubhub-consolidator/websub_topics.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,55 @@
// under the License.

import ballerina/lang.value;
import ballerina/log;
import ballerina/websubhub;

isolated map<websubhub:TopicRegistration> registeredTopicsCache = {};

isolated function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
log:printDebug("Refreshing topic cache from persisted data", persistedTopicCount = persistedTopics.length());
Comment on lines 23 to +24

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 19

Suggested change
isolated function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
log:printDebug("Refreshing topic cache from persisted data", persistedTopicCount = persistedTopics.length());
solated function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
if (log:isDebugEnabled()) {
log:printDebug("Refreshing topic cache from persisted data", persistedTopicCount = persistedTopics.length());
}

foreach var topic in persistedTopics.cloneReadOnly() {
lock {
registeredTopicsCache[topic.topic] = topic.cloneReadOnly();
}
log:printDebug("Added topic to cache during refresh", topicName = topic.topic);
}
Comment on lines 25 to 30

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 18

Suggested change
foreach var topic in persistedTopics.cloneReadOnly() {
lock {
registeredTopicsCache[topic.topic] = topic.cloneReadOnly();
}
log:printDebug("Added topic to cache during refresh", topicName = topic.topic);
}
foreach var topic in persistedTopics.cloneReadOnly() {
lock {
registeredTopicsCache[topic.topic] = topic.cloneReadOnly();
}
if (log:isDebugEnabled()) {
log:printDebug("Added topic to cache during refresh", topicName = topic.topic);
}

log:printDebug("Topic cache refresh completed", totalCachedTopics = registeredTopicsCache.length());
Comment on lines 30 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 20

Suggested change
}
log:printDebug("Topic cache refresh completed", totalCachedTopics = registeredTopicsCache.length());
}
if (log:isDebugEnabled()) {
log:printDebug("Topic cache refresh completed", totalCachedTopics = registeredTopicsCache.length());
}

}

isolated function processTopicRegistration(json payload) returns error? {
Comment on lines 33 to 34

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 21

Suggested change
isolated function processTopicRegistration(json payload) returns error? {
isolated function processTopicRegistration(json payload) returns error? {
if (log:isDebugEnabled()) {
log:printDebug("Processing topic registration event");
}

log:printDebug("Processing topic registration event");
websubhub:TopicRegistration registration = check value:cloneWithType(payload);
Comment on lines +35 to 36

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 22

Suggested change
log:printDebug("Processing topic registration event");
websubhub:TopicRegistration registration = check value:cloneWithType(payload);
websubhub:TopicRegistration registration = check value:cloneWithType(payload);
if (log:isDebugEnabled()) {
log:printDebug("Deserialized topic registration", topicName = registration.topic);
}

log:printDebug("Deserialized topic registration", topicName = registration.topic);
lock {
// add the topic if topic-registration event received
registeredTopicsCache[registration.topic] = registration.cloneReadOnly();
}
Comment on lines 40 to 41

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 23

Suggested change
registeredTopicsCache[registration.topic] = registration.cloneReadOnly();
}
}
if (log:isDebugEnabled()) {
log:printDebug("Added topic to cache", topicName = registration.topic, totalTopics = registeredTopicsCache.length());
}

log:printDebug("Added topic to cache", topicName = registration.topic, totalTopics = registeredTopicsCache.length());
check processStateUpdate();
Comment on lines +42 to 43

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 24

Suggested change
log:printDebug("Added topic to cache", topicName = registration.topic, totalTopics = registeredTopicsCache.length());
check processStateUpdate();
check processStateUpdate();
if (log:isDebugEnabled()) {
log:printDebug("Topic registration processing completed", topicName = registration.topic);
}

log:printDebug("Topic registration processing completed", topicName = registration.topic);
}

isolated function processTopicDeregistration(json payload) returns error? {
Comment on lines 46 to 47

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 25

Suggested change
isolated function processTopicDeregistration(json payload) returns error? {
isolated function processTopicDeregistration(json payload) returns error? {
if (log:isDebugEnabled()) {
log:printDebug("Processing topic deregistration event");
}

log:printDebug("Processing topic deregistration event");
websubhub:TopicDeregistration deregistration = check value:cloneWithType(payload);
Comment on lines +48 to 49

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 26

Suggested change
log:printDebug("Processing topic deregistration event");
websubhub:TopicDeregistration deregistration = check value:cloneWithType(payload);
websubhub:TopicDeregistration deregistration = check value:cloneWithType(payload);
if (log:isDebugEnabled()) {
log:printDebug("Deserialized topic deregistration", topicName = deregistration.topic);
}

log:printDebug("Deserialized topic deregistration", topicName = deregistration.topic);
boolean topicRemoved = false;
lock {
// remove the topic if topic-deregistration event received
_ = registeredTopicsCache.removeIfHasKey(deregistration.topic);
websubhub:TopicRegistration? removedTopic = registeredTopicsCache.removeIfHasKey(deregistration.topic);
topicRemoved = removedTopic is websubhub:TopicRegistration;
}
Comment on lines +55 to 56

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 27

Suggested change
topicRemoved = removedTopic is websubhub:TopicRegistration;
}
}
if (log:isDebugEnabled()) {
log:printDebug("Removed topic from cache", topicName = deregistration.topic, wasRemoved = topicRemoved, totalTopics = registeredTopicsCache.length());
}

log:printDebug("Removed topic from cache", topicName = deregistration.topic, wasRemoved = topicRemoved, totalTopics = registeredTopicsCache.length());
check processStateUpdate();
Comment on lines +57 to 58

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 28

Suggested change
log:printDebug("Removed topic from cache", topicName = deregistration.topic, wasRemoved = topicRemoved, totalTopics = registeredTopicsCache.length());
check processStateUpdate();
check processStateUpdate();
if (log:isDebugEnabled()) {
log:printDebug("Topic deregistration processing completed", topicName = deregistration.topic);
}

log:printDebug("Topic deregistration processing completed", topicName = deregistration.topic);
}

isolated function getTopics() returns websubhub:TopicRegistration[] {
websubhub:TopicRegistration[] topics;
lock {
return registeredTopicsCache.toArray().cloneReadOnly();
topics = registeredTopicsCache.toArray().cloneReadOnly();
}
Comment on lines +65 to 66

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log Improvement Suggestion No: 29

Suggested change
topics = registeredTopicsCache.toArray().cloneReadOnly();
}
}
if (log:isDebugEnabled()) {
log:printDebug("Retrieved topics from cache", topicCount = topics.length());
}

log:printDebug("Retrieved topics from cache", topicCount = topics.length());
return topics;
}
2 changes: 1 addition & 1 deletion components/websubhub/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "websubhub"
version = "1.15.0"
version = "1.15.1"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "http"},
Expand Down
Loading
Loading