Conversation
.../org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java
Show resolved
Hide resolved
| try { | ||
| var sft = SimpleFeatureTypeLoader.sftForName(sfName); | ||
| if (sft.isDefined()) { | ||
| ds.createSchema(sft.get()); |
There was a problem hiding this comment.
you might want to verify here that the sft matches any existing schema in the store - it's a bit tricky to do that well though :/
There was a problem hiding this comment.
I thought createSchema created it if it was missing and did nothing if it already existed.
There was a problem hiding this comment.
right, but you can have issues if the schema exists in the store, but it doesn't match the one loaded by the SimpleFeatureTypeLoader. maybe in this case it doesn't actually matter...
| if (sft.isDefined()) { | ||
| ds.createSchema(sft.get()); | ||
| } else { | ||
| logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName); |
There was a problem hiding this comment.
you could call ds.getSchema to verify
| private final DataStore ds; | ||
| FeatureWriter<SimpleFeatureType, SimpleFeature> writer; |
There was a problem hiding this comment.
is there a close lifecycle where you can dispose/close these?
| SimpleFeatureStore fs = null; | ||
| while (fs == null) { | ||
| try { | ||
| this.ds = dsFactory.get(); |
There was a problem hiding this comment.
we should dispose of this ds if we fail getting the feature source - i think you had trouble getting the metadata to refresh, hence the need to get a new ds each time, is that right?
| public void doStart() { | ||
| SimpleFeatureStore fs = getSimpleFeatureStore(); | ||
|
|
||
| fs.addFeatureListener(featureEvent -> { |
There was a problem hiding this comment.
so feature listeners don't guarantee at-least-once delivery - if you want that, you need to use a GeoMessageProcessor with a stable group id: https://github.com/locationtech/geomesa/blob/main/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala#L124
I'm realizing that method is not in the docs... I'll open a ticket to add it there.
geomesa-spring/pom.xml
Outdated
| <artifactId>geomesa-spring</artifactId> | ||
| <packaging>pom</packaging> | ||
| <modules> | ||
| <module>geomesa-spring-cloud-stream-binder-kafka-datastore</module> |
There was a problem hiding this comment.
i think i'd prefer to move this module to geomesa-kafka/geomesa-kafka-spring-cloud-stream(-binder?)
9d9cf2e to
a8ecb36
Compare
No description provided.