Skip to content

Commit 00d0ab9

Browse files
authored
[GOBBLIN-2239] Introduce pushMessagesSync for sync GTE emission (#4156)
* introduce pushMessagesSync for GTE emission * add missing java doc * some refactoring * updated java doc
1 parent 9ad7ad2 commit 00d0ab9

File tree

7 files changed

+104
-11
lines changed

7 files changed

+104
-11
lines changed

gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,29 @@ public void report() {
163163
reportEventQueue(this.reportingQueue);
164164
}
165165

166+
/**
167+
* Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in the queue synchronously.
168+
*/
169+
public void reportSynchronously() {
170+
reportEventQueueSynchronously(this.reportingQueue);
171+
}
172+
166173
/**
167174
* Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
168175
* @param queue {@link java.util.Queue} containing {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
169176
*/
170177
public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
171178

179+
/**
180+
* Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue synchronously.
181+
* Default implementation provides no synchronous guarantees and just calls {@link #reportEventQueue(Queue)}.
182+
* Subclasses should override this method to implement true synchronous reporting behavior.
183+
* @param queue {@link java.util.Queue} containing {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
184+
*/
185+
public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) {
186+
reportEventQueue(queue);
187+
}
188+
172189
/**
173190
* NOOP because {@link com.codahale.metrics.ScheduledReporter} requires this method implemented.
174191
*/

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,18 @@ public Timer getTimer(String prefix, String... suffixes) {
350350
return this.metricContext.timer(MetricRegistry.name(prefix, suffixes));
351351
}
352352

353+
/**
354+
* Get the first {@link ScheduledReporter} of the given class if it exists.
355+
* @param clazz the class of the ScheduledReporter to get
356+
* @return an Optional containing the first ScheduledReporter of the given class if it exists, else an absent Optional
357+
*/
358+
public <T extends com.codahale.metrics.ScheduledReporter> java.util.Optional<T> getScheduledReporter(Class<T> clazz) {
359+
if (clazz == null) {
360+
throw new IllegalArgumentException("Class argument cannot be null");
361+
}
362+
return this.codahaleScheduledReporters.stream().filter(clazz::isInstance).map(clazz::cast).findFirst();
363+
}
364+
353365
/**
354366
* Starts metric reporting and appends the given metrics file suffix to the current value of
355367
* {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.

gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ protected KafkaEventKeyValueReporter(Builder<?> builder) throws IOException {
5454

5555
@Override
5656
public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
57+
reportEventQueueInternal(queue, false);
58+
}
59+
60+
@Override
61+
public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) {
62+
reportEventQueueInternal(queue, true);
63+
}
64+
65+
private void reportEventQueueInternal(Queue<GobblinTrackingEvent> queue, boolean sync) {
66+
List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
67+
if (!events.isEmpty()) {
68+
log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
69+
if (sync) {
70+
this.kafkaPusher.pushMessagesSync(events);
71+
} else {
72+
this.kafkaPusher.pushMessages(events);
73+
}
74+
} else {
75+
log.debug("No GTE to push.");
76+
}
77+
}
78+
79+
private List<Pair<String, byte[]>> getEventsFromQueue(Queue<GobblinTrackingEvent> queue) {
5780
GobblinTrackingEvent nextEvent;
5881
List<Pair<String, byte[]>> events = Lists.newArrayList();
5982

@@ -75,9 +98,7 @@ public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
7598
events.add(Pair.of(key, this.serializer.serializeRecord(nextEvent)));
7699
}
77100

78-
if (!events.isEmpty()) {
79-
this.kafkaPusher.pushMessages(events);
80-
}
101+
return events;
81102
}
82103

83104
private static class BuilderImpl extends Builder<BuilderImpl> {

gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@
2727
public interface Pusher<M> extends Closeable {
2828
/**
2929
* Push all byte array messages to the Kafka topic.
30-
* @param messages List of byte array messages to push to Kakfa.
30+
* @param messages List of byte array messages to push to Kafka.
3131
*/
3232
void pushMessages(List<M> messages);
33+
34+
/**
35+
* Synchronous version of {@link #pushMessages(List)}.
36+
* Default implementation just calls {@link #pushMessages(List)}.
37+
* @param messages List of byte array messages to push to Kafka.
38+
*/
39+
default void pushMessagesSync(List<M> messages) {
40+
pushMessages(messages);
41+
}
3342
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public interface GobblinTemporalConfigurationKeys {
4646
String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX + "work.dir.cleanup.enabled";
4747
String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
4848

49+
String GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX = PREFIX + "container.metrics.";
50+
String GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME = GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "application.name";
51+
String GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID = GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "task.runner.id";
52+
4953
/**
5054
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
5155
* during testing

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,10 @@ private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
208208

209209
private Config saveConfigToFile(Config config)
210210
throws IOException {
211-
Config newConf = config
212-
.withValue(CLUSTER_APP_WORK_DIR, ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
211+
// Save the application name and task runner id in the config for getting ContainerMetrics instance later
212+
Config newConf = config.withValue(CLUSTER_APP_WORK_DIR, ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()))
213+
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME, ConfigValueFactory.fromAnyRef(this.applicationName))
214+
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID, ConfigValueFactory.fromAnyRef(this.taskRunnerId));
213215
ConfigUtils configUtils = new ConfigUtils(new FileUtils());
214216
configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
215217
return newConf;

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,50 @@
1919

2020
import java.util.Map;
2121

22-
import io.temporal.workflow.Workflow;
23-
24-
import org.slf4j.Logger;
22+
import java.util.Optional;
23+
import lombok.extern.slf4j.Slf4j;
24+
import com.typesafe.config.Config;
25+
import com.typesafe.config.ConfigFactory;
2526

27+
import org.apache.gobblin.cluster.ContainerMetrics;
2628
import org.apache.gobblin.metrics.event.EventSubmitter;
2729
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
2830
import org.apache.gobblin.metrics.event.TimingEvent;
31+
import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
32+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
33+
import org.apache.gobblin.temporal.cluster.WorkerConfig;
34+
import org.apache.gobblin.util.ConfigUtils;
2935

3036

37+
@Slf4j
3138
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
32-
private static Logger log = Workflow.getLogger(SubmitGTEActivityImpl.class);
3339

3440
@Override
3541
public void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext eventSubmitterContext) {
36-
log.info("submitting GTE - {}", summarizeEventMetadataForLogging(eventBuilder));
42+
log.info("Submitting GTE - {}", summarizeEventMetadataForLogging(eventBuilder));
3743
eventSubmitterContext.create().submit(eventBuilder);
44+
Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
45+
if (config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID) &&
46+
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME)) {
47+
String containerMetricsApplicationName = config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
48+
String containerMetricsTaskRunnerId = config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
49+
ContainerMetrics containerMetrics = ContainerMetrics.get(ConfigUtils.configToState(config), containerMetricsApplicationName, containerMetricsTaskRunnerId);
50+
log.info("Fetched container metrics instance {} for taskRunnerId: {} and applicationName: {}", containerMetrics.toString(),
51+
containerMetricsTaskRunnerId, containerMetricsApplicationName);
52+
Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional = containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
53+
if (kafkaReporterOptional.isPresent()) {
54+
log.info("Submitting GTE in synchronous manner to Kafka reporter");
55+
kafkaReporterOptional.get().reportSynchronously();
56+
log.info("Submitted GTE to Kafka reporter");
57+
} else {
58+
log.warn("No KafkaAvroEventKeyValueReporter found in container metrics for taskRunnerId: {} and applicationName: {}",
59+
containerMetricsTaskRunnerId, containerMetricsApplicationName);
60+
}
61+
} else {
62+
log.warn("Both {} and {} should be set to fetch container metrics instance for synchronous GTE submission",
63+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID,
64+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
65+
}
3866
}
3967

4068
private static String summarizeEventMetadataForLogging(GobblinEventBuilder eventBuilder) {

0 commit comments

Comments
 (0)