Skip to content

Commit 3fe492b

Browse files
add disseminate by original id and type
1 parent 881bb5c commit 3fe492b

File tree

6 files changed

+168
-10
lines changed

6 files changed

+168
-10
lines changed

accelerator_core/service_impls/mongo_accession.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,9 @@ def ingest(
131131
raise Exception("no technical metadata found, invalid record")
132132

133133
technical_metadata["created"] = get_time_now_iso()
134-
135-
technical_metadata["original_source"] = (
134+
technical_metadata["original_source_identifier"] = (
136135
ingest_payload.ingest_source_descriptor.ingest_item_id
137136
)
138-
139137
technical_metadata["original_source_link"] = (
140138
ingest_payload.ingest_source_descriptor.ingest_link
141139
)

accelerator_core/service_impls/mongo_dissemination.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def disseminate_by_id(
5959
self, document_id: str, dissemination_request: DisseminationDescriptor
6060
) -> DisseminationPayload:
6161
"""
62-
Disseminate an individual document, identified by its type (parent collection) and its id
62+
Disseminate an individual document, identified by its type (parent collection) and its id in the accelerator
63+
database
6364
@param document_id: str with unique document id
6465
@param dissemination_request: DisseminationDescriptor that describes the type, version, and
6566
other information
@@ -94,6 +95,74 @@ def disseminate_by_id(
9495
dissemination_payload.dissemination_successful = True
9596
return dissemination_payload
9697

98+
def disseminate_by_original_source_and_id(
99+
self,
100+
original_source: str,
101+
original_document_identifier,
102+
dissemination_request: DisseminationDescriptor,
103+
) -> DisseminationPayload:
104+
"""
105+
This method is used for creating a dissemination payload by applying a filter
106+
that selects specific documents. The filtering mechanism is determined by the
107+
implementation and works on provided source and document identifiers. This is meant to return a single
108+
document based on its original source (e.g. "cedar") and the original identifier (e.g. the document DOI in cedar).
109+
110+
This is distinguised from the dissemination_by_id method which is meant to return a single document based on its
111+
accelerator database identifier.
112+
113+
Parameters:
114+
original_source: str
115+
The originating source of the document, used to identify the context or
116+
source system of the document.
117+
original_document_identifier
118+
The unique identifier of the document associated with the original source.
119+
dissemination_request: DisseminationDescriptor
120+
The object encapsulating metadata, rules, and instructions for how the document
121+
should be disseminated.
122+
123+
Returns:
124+
DisseminationPayload
125+
The resulting payload from the dissemination operation, containing the processed
126+
document and relevant dissemination metadata.
127+
"""
128+
129+
logger.info(
130+
f"Disseminating document based on original source: {original_source} and document identifier: {original_document_identifier} of type {dissemination_request.ingest_type} to target: {dissemination_request.dissemination_type}"
131+
)
132+
133+
doc = self.accel_database_utils.find_doc_by_original_source_identifier(
134+
dissemination_request.ingest_type,
135+
original_source,
136+
original_document_identifier,
137+
dissemination_request.temp_collection,
138+
)
139+
140+
if doc is None:
141+
logger.warning(
142+
f"Document not found for original source: {original_source} and document identifier: {original_document_identifier} "
143+
)
144+
raise Exception(
145+
f"Document not found for original source: {original_source} and document identifier: {original_document_identifier} "
146+
)
147+
148+
event = create_timestamped_log(
149+
f"Disseminating document original source: {original_source} and document identifier: {original_document_identifier} of type {dissemination_request.ingest_type} to target: {dissemination_request.dissemination_type}"
150+
)
151+
152+
"""
153+
self.accel_database_utils.log_document_event(
154+
str(),
155+
event,
156+
dissemination_request.ingest_type,
157+
dissemination_request.temp_collection,
158+
)
159+
"""
160+
161+
dissemination_payload = DisseminationPayload(dissemination_request)
162+
# self.report_individual_dissemination(dissemination_payload, document_id, doc)
163+
dissemination_payload.dissemination_successful = True
164+
return dissemination_payload
165+
97166
def disseminate_by_filter(
98167
self,
99168
filter: DisseminationFilter,

accelerator_core/services/dissemination.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,39 @@ def disseminate_by_id(
4343
"""
4444
pass
4545

46+
def disseminate_by_original_source_and_id(
47+
self,
48+
original_source: str,
49+
original_document_identifier,
50+
dissemination_request: DisseminationDescriptor,
51+
) -> DisseminationPayload:
52+
"""
53+
Apply a filter to create and disseminate a single document specified target.
54+
55+
This method is used for creating a dissemination payload by applying a filter
56+
that selects specific documents. The filtering mechanism is determined by the
57+
implementation and works on provided source and document identifiers. This is meant to return a single
58+
document based on its original source (e.g. "cedar") and the original identifier (e.g. the document DOI in cedar).
59+
60+
This is distinguised from the dissemination_by_id method which is meant to return a single document based on its
61+
accelerator database identifier.
62+
63+
Parameters:
64+
original_source: str
65+
The source where the original document resides. This represents the DAG that ingested the document
66+
originally
67+
original_document_identifier
68+
The identifier of the document in the original source. (e.g. the CEDAR generated document GUID).
69+
dissemination_request: DisseminationDescriptor
70+
A descriptor that details the dissemination type, version, and
71+
additional relevant information.
72+
73+
Returns:
74+
DisseminationPayload
75+
The payload containing the documents selected for dissemination.
76+
"""
77+
pass
78+
4679
def disseminate_by_filter(
4780
self,
4881
filter: DisseminationFilter,

accelerator_core/workflow/accel_data_models.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ def __init__(self):
1919
self.ingest_item_id = (
2020
None # unique id if this is an individual item, blank for a batch
2121
)
22-
self.ingest_link = None # link to the ingest source
23-
self.ingest_format = (
24-
None # data source of the ingest (e.g. CEDAR, CAFE, Data.Gov)
25-
)
22+
self.ingest_link = None # link to the ingest source, currently is a string value indicating the original
23+
# source of the data, e.g. "cedar"
2624
self.use_tempfiles = False # request temp files versus inline. System behavior for a task will consult this flag
2725

2826
def to_dict(self) -> dict:

integration_tests/test_dissemination_mongo.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,65 @@ def test_find_one_by_filter(self):
160160

161161
self.assertIsNotNone(actual)
162162

163+
def test_disseminate_by_original_source_and_id(self):
164+
ingest_source_descriptor = IngestSourceDescriptor()
165+
ingest_source_descriptor.ingest_type = "accelerator"
166+
ingest_source_descriptor.schema_version = "1.0.2"
167+
ingest_source_descriptor.ingest_identifier = "myrunid"
168+
ingest_source_descriptor.ingest_item_id = (
169+
"test_disseminate_by_original_source_and_id"
170+
)
171+
ingest_source_descriptor.ingest_link = "ingest_source"
172+
ingest_source_descriptor.submitter_name = "mysubmittername"
173+
ingest_source_descriptor.submitter_email = "mysubmitteremail"
174+
ingest_source_descriptor.use_tempfiles = False
175+
176+
ingest_result = IngestPayload(ingest_source_descriptor)
177+
178+
json_path = determine_test_resource_path("example1.json", "integration_tests")
179+
with open(json_path) as json_data:
180+
d = json.load(json_data)
181+
ingest_result.payload.append(d)
182+
ingest_result.payload_inline = True
183+
184+
xcom_props_resolver = DirectXcomPropsResolver(
185+
temp_files_supported=False, temp_files_location=""
186+
)
187+
188+
accession = AccessionMongo(
189+
self.__class__._accelerator_config,
190+
self.__class__._accel_db_context,
191+
xcom_props_resolver,
192+
)
193+
194+
id = accession.ingest(ingest_result, check_duplicates=False, temp_doc=False)
195+
self.assertIsNotNone(id)
196+
197+
# now get the dissemination for this item
198+
199+
dissemination_request = DisseminationDescriptor()
200+
dissemination_request.dissemination_type = "dataverse"
201+
dissemination_request.temp_collection = False
202+
dissemination_request.ingest_type = "accelerator"
203+
dissemination_request.schema_version = "1.0.2"
204+
dissemination_request.inline_results = True
205+
dissemination_request.dissemination_identifier = "test_dissemination"
206+
dissemination_request.dissemination_item_id = id
207+
208+
dissemination = DisseminationMongo(
209+
self.__class__._accelerator_config,
210+
xcom_props_resolver,
211+
self.__class__._accel_db_context,
212+
)
213+
214+
dissemination_payload = dissemination.disseminate_by_original_source_and_id(
215+
ingest_source_descriptor.ingest_link,
216+
ingest_source_descriptor.ingest_item_id,
217+
dissemination_request,
218+
)
219+
220+
self.assertIsNotNone(dissemination_payload)
221+
163222
def test_dissemination_not_found(self):
164223
ingest_source_descriptor = IngestSourceDescriptor()
165224
id = "95E43738404BEECDF66573B4"

integration_tests/test_resources/example1.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,9 @@
199199
"created": "",
200200
"modified": "",
201201
"verified": "",
202-
"original_source_type": "",
203-
"target_schema_type": "",
202+
"original_source_type": "cedar",
203+
"target_schema_type": "accelerator",
204+
"target_schema_version": "1.0.2",
204205
"original_source_identifier": "",
205206
"original_source_link": "",
206207
"history": [

0 commit comments

Comments
 (0)