Change Log 0.4.0
New Features
WldtEventObserver
A new class called WldtEventObserver
has been introduced to allow a simplified observation of target specific events generated by the Digital Twin and its components such as adapters and the model. Main mapped events and filters are:
- State Events: State Update and State Event Notifications
- Physical Asset Events: Physical Property Variation, Physical Event Notification, Physical Relationship Instance Creation and Deletion
- Physical Asset Action Events: Physical Action Trigger
- Digital Action Events: Digital Action Event
- Physical Asset Description Events: Physical Asset Description Available and Updated
- Life Cycle Events: Digital Twin Life Cycle Events
- Query Request Events: Storage Query Request Events (See next sections for additional information)
For each event type dedicated observation and un-observation methods (e.g., observePhysicalAssetEvents()
and unObservePhysicalAssetEvents()
) are available in order to create an instance of the observer and decide which events to receive.
To build a WldtEventObserver
a dedicated listener IWldtEventObserverListener
should be implemented by the developer to receive the callbacks related to the incoming events. All the events are of the generic type WldtEvent
and it is up to the developer the validate and check the received object and if it match with the expected one.
An example of usage for the event observer is the following:
WldtEventObserver eventObserver = new WldtEventObserver(
"DT_TEST_ID_1",
"test-observer",
myObserverListener);
// Start all the available observation
eventObserver.observePhysicalAssetEvents();
eventObserver.observePhysicalAssetActionEvents();
eventObserver.observeStateEvents();
eventObserver.observeDigitalActionEvents();
eventObserver.observePhysicalAssetDescriptionEvents();
eventObserver.observeLifeCycleEvents();
The WldtEventObserver
has been currently used internally within the library to simplify the implementation and usage of the Storage Layer and the associated Storage Query System as described in the dedicated sections.
Storage Layer
A new storage layer has been integrated into the core WLDT library, enabling Digital Twins (DTs) to store data related to the evolution of their state, generated events, and any variations involving properties, events, actions, relationships, and life cycle. The Storage Layer consists of two main components:
- Storage Manager: This is the central component of the storage system, facilitating the structured and modular storage and retrieval of information. It allows developers to create and utilize various storage systems (e.g., in-memory, file-based, or DBMS) simultaneously. The Storage Layer is accessible in both read and write modes internally by the DT’s Model, and in read-only mode via the Query System by Digital Adapters.
- Query System: To delegate and encapsulate the responsibility of data storage within the DT’s model, a query system has been integrated. This system enables Digital Adapters to retrieve stored data and expose it according to their specific logic and implementation.
The storage layer is designed for easy extension, allowing developers to create and share new storage layers (e.g., using Redis, MySQL, or MongoDB). The provided in-memory implementation serves only for basic development and testing purposes. Similarly, the Query Manager can be extended and customized by developers to implement additional query management features or to enhance the default functionalities provided by the library.
Storage Manager
The main module of the Storage Layer is the one associated to Storage Capabilities and it is composed by two main classes: StorageManager
and WldStorage
with the following characteristics and main methods:
StorageManager
: The StorageManager class is a class that represents the storage manager for a DigitalTwin. It is responsible for managing the storage of the data related to the DigitalTwin. It is an observer of theWldtEventBus
, and it is able to save the data in the available storages. The class extends aDigitalTwinWorker
, in order to allow the component to work in a structure and integrated way on a different thread that the core of a DT can coordinate starting and stopping it when required. The manager allow the usage of different storage systems at the same time in order to allow the developers to memorize the information accordingly to their need in the right storage system at the same time (e.g., REDIS for quick cached information and MongDB for historical data). Main associated methods are:putStorage(WldtStorage storage)
: Add a new WldtStorage to the StorageManagergetStorageIdList()
: Returns the list of id of the WldtStorage in the StorageManagerisStorageAvailable(String storageId)
: Checks if a target Storage Id is available in the Storage ManagergetStorage(String storageId)
: Get the target WldtStorage by id from the Storage ManagerremoveStorage(String storageId)
: Remove an existing WldtStorage by id from the StorageManager
WldtStorage
: Defines an abstract class allowing the Digital Twin developer to implement its internal storage system for the Digital Twin instance.- The class defines methods for the management of:
- Digital Twin State storage and retrieval with the associated change list;
- Generated State Digital Events;
- Life Cycle State storage and retrieval;
- Physical Asset Description storage and retrieval;
- Physical Asset Property Variation storage and retrieval;
- Physical Asset Relationship Instance storage and retrieval;
- Digital Action Request storage and retrieval;
- Physical Asset Action Request storage and retrieval;
- Physical Asset Event Notification storage and retrieval;
- Each WldtStorage instance can be configured (using the right constructor method) to:
- Observe all Wldt events (
stateEvents
,physicalAssetEvents
,physicalAssetActionEvents
,physicalAssetDescriptionEvents
,digitalActionEvents
,lifeCycleEvents
) - Filter only for specific class of events
- Once the WldtStorage has been properly configured to receive target events the
StorageManager
automatically save information of interest for that specific storage. For example we can have aStorageA
(e.g, REDIS) configured to receive all the generated events and aStorageB
(e.g., MongoDB) in charge of saving only DT’s state variation over time.
- Observe all Wldt events (
- The default implementation of the
WldtStorage
is the classDefaultWldtStorage
. This class provides a simple storage solution for digital twin states, digital twin state changes, physical asset events, and digital twin events. The class provides ONLY a memory based approach for storage using ArrayLists and HashMaps and more advanced solution should be implemented for production oriented Digital Twins for examples using external storage and memorization solutions. - Methods available and implemented by WldtStorage implementations are the following grouped by categories:
- Digital Twin State:
saveDigitalTwinState(DigitalTwinState digitalTwinState, List<DigitalTwinStateChange> digitalTwinStateChangeList)
: Save a new computed instance of the DT State in the Storage together with the list of the changes with respect to the previous stategetLastDigitalTwinState()
: Returns the latest computed Digital Twin State of the target Digital Twin instancegetDigitalTwinStateCount()
: Returns the number of computed and stored Digital Twin StatesgetDigitalTwinStateInTimeRange(long startTimestampMs, long endTimestampMs)
: Retrieves a list of DigitalTwinState objects within the specified time rangegetDigitalTwinStateInRange(int startIndex, int endIndex)
: Retrieves a list of Digital Twin states within the specified range of indices
- Digital Twin State Event Notification:
saveDigitalTwinStateEventNotification(DigitalTwinStateEventNotification<?> digitalTwinStateEventNotification)
: Save the Digital Twin State Event NotificationgetDigitalTwinStateEventNotificationCount()
: Get the number of Digital Twin State Event NotificationgetDigitalTwinStateEventNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Digital Twin State Event Notification in the specified time rangegetDigitalTwinStateEventNotificationInRange(int startIndex, int endIndex)
: Get the Digital Twin State Event Notification in the specified range of indices
- Life Cycle State Variation:
saveLifeCycleState(LifeCycleStateVariation lifeCycleStateVariation)
: Save the LifeCycleState of the Digital TwingetLastLifeCycleState()
: Get the last LifeCycleState of the Digital TwingetLifeCycleStateCount()
: Get the number of LifeCycleState of the Digital TwingetLifeCycleStateInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the last LifeCycleState of the Digital TwingetLifeCycleStateInRange(int startIndex, int endIndex)
: Get the LifeCycleState of the Digital Twin in the specified range of indices
- Physical Asset Event Notification:
savePhysicalAssetEventNotification(PhysicalAssetEventNotification physicalAssetEventNotification)
: Save the Physical Asset Event NotificationgetPhysicalAssetEventNotificationCount()
: Get the number of Physical Asset Event NotificationgetPhysicalAssetEventNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Physical Asset Event Notification in the specified time rangegetPhysicalAssetEventNotificationInRange(int startIndex, int endIndex)
: Get the Physical Asset Event Notification in the specified range of indices
- Physical Action Request:
savePhysicalAssetActionRequest(PhysicalAssetActionRequest physicalAssetActionRequest)
: Save Physical Asset Action RequestgetPhysicalAssetActionRequestCount()
: Get the number of Physical Asset Action RequestgetPhysicalAssetActionRequestInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Physical Asset Action Request in the specified time rangegetPhysicalAssetActionRequestInRange(int startIndex, int endIndex)
: Get the Physical Asset Action Request in the specified range of indices
- Digital Action Request:
saveDigitalActionRequest(DigitalActionRequest digitalActionRequest)
: Save a Digital Action RequestgetDigitalActionRequestCount()
: Get the number of Digital Action Request StoredgetDigitalActionRequestInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Digital Action Request in the specified time rangegetDigitalActionRequestInRange(int startIndex, int endIndex)
: Get the Digital Action Request in the specified range of indices
- Physical Asset Description (PAD) Notification
- New PAD Notification
saveNewPhysicalAssetDescriptionNotification(PhysicalAssetDescriptionNotification physicalAssetDescriptionNotification)
: Save a new Physical Asset Description AvailablegetNewPhysicalAssetDescriptionNotificationCount()
: Get the number of New Physical Asset Description Notifications availablegetNewPhysicalAssetDescriptionNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the New Physical Asset Description Available in the specified time rangegetNewPhysicalAssetDescriptionNotificationInRange(int startIndex, int endIndex)
: Get the New Physical Asset Description Available in the specified range of indices
- Updated PAD Notification
saveUpdatedPhysicalAssetDescriptionNotification(PhysicalAssetDescriptionNotification physicalAssetDescriptionNotification)
: Save the updated Physical Asset Description NotificationgetUpdatedPhysicalAssetDescriptionNotificationCount()
: Get the number of Updated Physical Asset DescriptiongetUpdatedPhysicalAssetDescriptionNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Updated Physical Asset Description in the specified time rangegetUpdatedPhysicalAssetDescriptionNotificationInRange(int startIndex, int endIndex)
: Get the Updated Physical Asset Description in the specified range of indices
- New PAD Notification
- Physical Asset Property Variation:
savePhysicalAssetPropertyVariation(PhysicalAssetPropertyVariation physicalAssetPropertyVariation)
: Save the Physical Asset Property VariationgetPhysicalAssetPropertyVariationCount()
: Get the number of Physical Asset Property VariationgetPhysicalAssetPropertyVariationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Physical Asset Property Variation in the specified time rangegetPhysicalAssetPropertyVariationInRange(int startIndex, int endIndex)
: Get the Physical Asset Property Variation in the specified range of indices
- Physical Asset Relationship Instance Notification
- Created Relationship Instance
savePhysicalAssetRelationshipInstanceCreatedNotification(PhysicalRelationshipInstanceVariation physicalRelationshipInstanceVariation)
: Save the Physical Asset Relationship Instance Created EventgetPhysicalAssetRelationshipInstanceCreatedNotificationCount()
: Get the number of Physical Asset Relationship Instance Created EventgetPhysicalAssetRelationshipInstanceCreatedNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Physical Asset Relationship Instance Created Event in the specified time rangegetPhysicalAssetRelationshipInstanceCreatedNotificationInRange(int startIndex, int endIndex)
: Get the Physical Asset Relationship Instance Created Event in the specified range of indices
- Deleted Relationship Instance
savePhysicalAssetRelationshipInstanceDeletedNotification(PhysicalRelationshipInstanceVariation physicalRelationshipInstanceVariation)
: Save the Physical Asset Relationship Instance Updated EventgetPhysicalAssetRelationshipInstanceDeletedNotificationCount()
: Get the number of Physical Asset Relationship Instance Updated EventgetPhysicalAssetRelationshipInstanceDeletedNotificationInTimeRange(long startTimestampMs, long endTimestampMs)
: Get the Physical Asset Relationship Instance Updated Event in the specified time rangegetPhysicalAssetRelationshipInstanceDeletedNotificationInRange(int startIndex, int endIndex)
: Get the Physical Asset Relationship Instance Updated Event in the specified range of indices
- Created Relationship Instance
- Digital Twin State:
- The class defines methods for the management of:
Some examples of usage for the Storage Layer are the following:
Lets’ create a new Digital Twin with a single Storage in charge of automatically observe and store all the event generated and going through the target DT instance
// Create the Digital Twin Engine
DigitalTwinEngine digitalTwinEngine = new DigitalTwinEngine();
// Create a new Digital Twin with a Demo Shadowing Function
DigitalTwin digitalTwin = new DigitalTwin(TEST_DIGITAL_TWIN_ID, new DemoShadowingFunction());
// Physical Adapter Configuration
DemoPhysicalAdapter physicalAdapter = new DemoPhysicalAdapter(...);
digitalTwin.addPhysicalAdapter(physicalAdapter);
// Digital Adapter Configuration
digitalAdapter = new DemoDigitalAdapter(...);
digitalTwin.addDigitalAdapter(digitalAdapter);
// Create a new WldtStorage instance using the default implementation and observing all the events
DefaultWldtStorage myStorage = new DefaultWldtStorage("test_storage", true)
// Add the new Default Storage Instance to the Digital Twin Storage Manager
digitalTwin.getStorageManager().putStorage(myStorage);
// Add the Twin to the Engine
digitalTwinEngine.addDigitalTwin(digitalTwin);
// Start the Digital Twin
digitalTwinEngine.startDigitalTwin(TEST_DIGITAL_TWIN_ID);
Now let’s suppose to have two additional implementation of the WldtStorage class supporting Redis and MongDB and called RedisWldtStorage
and MongoDbWldtStorage
.
We would like to use Redis to automatically observe all the events and MongoDb only to store DT’s state and life cycle variations.
[...]
// Create a new RedisWldtStorage instance using the default implementation and observing all the events
RedisWldtStorage myRedisStorage = new RedisWldtStorage("redis_storage", true);
myRedisStorage.setRedisConfiguration(myRedisConfiguration);
// Add the new Redis Storage Instance to the Digital Twin Storage Manager
digitalTwin.getStorageManager().putStorage(myRedisStorage);
// Create a new MongoDbWldtStorage instance using the default implementation and observing only State and LifeCycle Events
MongoDbWldtStorage myMongoDbStorage = new MongoDbWldtStorage("mongo_db_storage", true, false, false, false, false, true);
myMongoDbStorage.setMongoDbConfiguration(myMongoDbConfiguration);
// Add the new MongoDb Storage Instance to the Digital Twin Storage Manager
digitalTwin.getStorageManager().putStorage(myRedisStorage);
[...]
Within the ShadowingFunction
it is possible to have the reference to the StorageManager
in order to access available Storage in both reading and writing mode.
This is an example of how to retrieve an available WldtStorage through its id and the use it to read Properties values in a time range of the last 5 minutes:
String TARGET_STORAGE_ID = "test_storage";
if(this.storageManager.isStorageAvailable(TARGET_STORAGE_ID)){
// Access the Storage Manager to store the last value of the property
WldtStorage targetStorage = this.storageManager.getStorage(TARGET_STORAGE_ID);
// Get the current time in milliseconds
long endTime = System.currentTimeMillis();
// Get the Time in the last 5 minutes
long startTime = endTime - (5 * 60 * 1000);
// Get the last Physical Asset Action Request in the last 5 minutes
List<PhysicalAssetPropertyVariationRecord> propertyVariationRecords = targetStorage.getPhysicalAssetPropertyVariationInTimeRange(startTime, endTime);
for(PhysicalAssetPropertyVariationRecord propertyVariationRecord : propertyVariationRecords){
logger.info("Property Variation Record: {}", propertyVariationRecord);
[...]
}
}
Note: The StorageManager
, as previously described, can automatically store DT-related events based on the configuration and setup of each WldtStorage
instance added to the manager. However, since the ShadowingFunction
has direct access to the StorageManager
in both read and write modes, manual handling of data storage is also possible. To achieve this, you can disable automatic storage by setting it to false
for specific event types or for all event types. This allows you to manually manage the storage of information within the ShadowingFunction
.
Query System
Given the library’s goal of maximizing modularity and decoupling responsibilities among the available components, the Query System has been introduced. This system allows components external to the core responsibilities of the Digital Twin (e.g., Digital Adapters and Augmentation Functions) to retrieve stored data and use or expose it according to their specific logic and implementation. For instance, an HTTP Digital Adapter could expose stored information about a DT’s state variations over time, or a Monitoring Adapter could use available storage instances to retrieve events for a deeper understanding of the target DT instance’s behavior. The query system has been implemented entirely through dedicated events in order to maximize the decoupling of the solution and and supports at the same time both synchronous and asynchronous queries.
The main classes associated to the Query System are the following:
QueryManager
: This class represents the Query Manager responsible to handle the query request and manage the query execution and has been designed to be extended by the user to implement the desired query management logic (e.g., as with theDefaultQueryManager
).QueryRequest
: The class contains all the information needed to perform a query on the storage systemQueryRequestType
: This Enum represents the Query Request Type used to specify the type of query to be performed on the storage system supporting:- TIME_RANGE
- SAMPLE_RANGE
- LAST_VALUE
- COUNT
QueryResourceType
: This Enum represents the Query Resource Type used to specify the type of resource to be queried on the storage system supporting the following resource types mapping those available and managed by the storage manager:- PHYSICAL_ASSET_PROPERTY_VARIATION
- PHYSICAL_ASSET_EVENT_NOTIFICATION
- PHYSICAL_ACTION_REQUEST
- DIGITAL_ACTION_REQUEST
- DIGITAL_TWIN_STATE
- NEW_PAD_NOTIFICATION
- UPDATED_PAD_NOTIFICATION
- PHYSICAL_RELATIONSHIP_INSTANCE_CREATED_NOTIFICATION
- PHYSICAL_RELATIONSHIP_INSTANCE_DELETED_NOTIFICATION
- LIFE_CYCLE_EVENT
QueryExecutor
: This class represents the Query Executor used to execute queries on the storage system supporting both synchronous and asynchronous query execution. Internally is implemented through an event-based mechanism to handle the query request and responseQueryResult
: This class represents the Query Result returned by the Query Executor containing the query results and the query status (successful or not) and error message (if any) together with also the original requestIQueryResultListener
: This interface represents the Query Result Listener used to receive the query results
An example of Synchronous query is:
QueryExecutor queryExecutor = new QueryExecutor(TEST_DIGITAL_TWIN_ID, "query-executor");
// Create Query Request to the Storage Manager for the Last Digital Twin State
QueryRequest queryRequest = new QueryRequest();
queryRequest.setResourceType(QueryResourceType.DIGITAL_TWIN_STATE);
queryRequest.setRequestType(QueryRequestType.LAST_VALUE);
// Send the Query Request to the Storage Manager for the target DT
QueryResult<?> queryResult = queryExecutor.syncQueryExecute(queryRequest);
Following the same approach an Asynchrounouse query can be executed as follows:
QueryExecutor queryExecutor = new QueryExecutor(TEST_DIGITAL_TWIN_ID, "query-executor");
// Create Query Request to the Storage Manager for the Last Digital Twin State
QueryRequest queryRequest = new QueryRequest();
queryRequest.setResourceType(QueryResourceType.DIGITAL_TWIN_STATE);
queryRequest.setRequestType(QueryRequestType.LAST_VALUE);
// Send the Query Request to the Storage Manager for the target DT
queryExecutor.asyncQueryExecute(queryRequest, new IQueryResultListener() {
@Override
public void onQueryResult(QueryResult<?> queryResult) {
[...]
}
});
The class DigitalAdapter
has been updated adding also an internal reference to a QueryExecutor
in order to simplify the interaction with the query system directly from an adapter like in the following example where we use the query Executor of the Digital Adapter invokeAction
callback through its internal variable accessible through this.queryExecutor
without creating a new executor:
public <T> void invokeAction(String actionKey, T body){
try {
// Create Query Request to the Storage Manager for the Last Digital Twin State
QueryRequest queryRequest = new QueryRequest();
queryRequest.setResourceType(QueryResourceType.DIGITAL_TWIN_STATE);
queryRequest.setRequestType(QueryRequestType.LAST_VALUE);
// Send the Query Request to the Storage Manager for the target DT
QueryResult<?> queryResult = this.queryExecutor.syncQueryExecute(queryRequest);
// Do Something with the Query Result
for(Object result : queryResult.getResults()){
// Check the type of the Resulting class accordingly to the query
if(result instanceof DigitalTwinState)
logger.info("LAST DT STATE: {}", result);
else
logger.error("INVALID RESULT TYPE: {}", result.getClass().getName());
}
logger.info("INVOKING ACTION: {} BODY: {}", actionKey, body);
publishDigitalActionWldtEvent(actionKey, body);
} catch (EventBusException e) {
e.printStackTrace();
}
}
Migration Info: 0.3.0 - 0.4.0
- Now
PhysicalAssetRelationship
constructor has also thetype
in order to match theDigitalTwinStateRelationship
and simplify its management - The method
notifyDigitalTwinStateEvent
throws only the ExceptionWldtDigitalTwinStateEventNotificationException
whileEventBusException
has been removed
Additional Improvements & Fixed Bugs
- Synchronized the update of the current DT Life Cycle State in order to avoid wrong data
- The
WldtEventBus
now supports the use of topics Wildcard (at the moment only multi-level with the character*
). For example with this approach is possible to subscribe to all the events associated to property variations (topic:dt.physical.event.property.*
). New methods added toWldtEventBus
are:matchWildCardType(String eventType, String filterType)
: Check if the provided event type match the WildCard TypeisWildCardType(String filterEventType)
: Check if the provided event type is a WildCard Type
- The class
WldtEventTypes
has been introduced to contain all the event types in the WLDT Framework and support internal message exchange. Includes types for events associated and adopted by: i) Physical Adapters; ii) Model and Shadowing Function; and iii) Digital Adapters. - The
EventManager
class has been added to centralize and simplify the event management in the WLDT Framework providing a set of static methods to publish events associated to a target digital twin and publisher (e.g., the physical adapter of the twin). - Now
PhysicalAssetRelationship
class has also thetype
in order to match theDigitalTwinStateRelationship
and simplify its management - The internal class
ModelEngine
has been renamed intoDigitalTwinModel
as an initial update for further development of the next version 0.5.0 where the structure of the DT’s Model and the associated classes will be improved