,

Down to the River

Reading Time: 10 minutes

Edge Computing and AIoT is an exciting space since it is an opportunity to use technology in new and innovative ways. Yet technology must deliver value to the stakeholders. Helping people and companies understand and unlock the value of the edge and AIoT and helping to deliver solutions that realize this value is what I spend most of my time doing these days.

In collaboration with

Hans van’t Hag
✉️ hans.vanthag@adlinktech.com
🔗 https://www.linkedin.com/in/hans-van-t-hag-b510092/

The combination of sensors (such as cameras for vision) and artificial intelligence has coined the term AIoT. Awareness is created by augmenting raw data (stemming from sensors) with derived-values (created by inference-engines that exploit machine learning models.) The awareness is locally created at the edge. It can generate actionable insights that are timely and accurate (due to real-time scoring on vast amounts of raw data). The insights present unique value in improving human/machine interaction, improving worker safety, improving process efficiency, and even exploiting this awareness for entirely new business insights and related value creation.

Taking a more technical view, there is:

  1. sensors — with their related I/O devices,
  2. derived-value services — that augment the raw data from the sensors and
  3. actuators — that exploit the augmented data either for presentation on a dashboard and/or real-time actions through related I/O devices.

Before moving on, let’s take a look at the architectural properties that are important when designing and deploying AIoT solutions. By being aware of these properties, there is a higher likelihood of a successful AIoT solution that delivers value to its stakeholders.

Simplicity

When building these complex AIoT solutions, it is desirable to have an architecture where we make the solution from simple components. The components autonomously perform basic functions such as sensing, inferencing, and displaying. And in combining the simple components, we get emergent behaviours that realize the complex solution. Many enterprise systems use centralized databases to share data between simple components that create the overall solution. In an industrial or operational system, we also need to distribute data in real-time for which centralized databases were not designed.

Interoperability

For simple components to collaborate, they need to be able to understand each other. And for that, the data that produce and consume must be defined and understood by their collaborators. Three things are needed to understand the data in the system: structure, kind, and quality attributes. The structure defines the contents and typing of the data. The kind defines semantic attributes of the data such as telemetry, state, event, and configuration. The quality attributes of data (for example, priority, volume, rate, persistence) provide further insight into how to handle the data. These properties of the data must be available and discoverable.

Connectivity

In addition to interoperability for the simple components to collaborate, they need a way to connect. One way to achieve connectivity is by having each component itself be responsible for connecting with its collaborators. However, it is much more desirable to have the component shielded from having to take care of the network/connectivity details needed for sharing data in real-time. Ideally, the consumer of data does not need to worry about the location of the producer of the data.

It is also desirable to have intelligent connectivity that includes data-management rather than the components rather them having to do it themselves. Data-management includes managing the lifecycle of the data, such as creation, updates, and deletion. But also the ability to filter the data for a consumer much like a relational-database makes available through SQL.

Self-forming

There is one certainty in AIoT solutions: change is inevitable. Therefore, we need a self-forming architecture where the system’s components dynamically discover each other and can connect regardless of their location or implementation. One way to enable discovery is through meta-data, which itself is discoverable, about what things of what kinds where their location is and what data they are producing and consuming. The meta-data can then be leveraged by both operational and supervisory applications to perform their role in the AIoT solution.

The value of the self-forming architecture is that it both facilitates multiple instances of a single generic solution as well as extensions (planned or unplanned) of a single solution. The self-forming nature assures that the extensions don’t imply knowledge or even worse changes to existing components of the solution.

With that said, it is now time to look at a use-case for a generic small vision-based AIoT solution and a specific instance of the solution for a given context. Through the description of the use-case, we shall see how the ADLINK Data River helps to achieve the desired architectural properties described above.

Generic Solution

The use-case looks at securing a company’s building access by exploiting ‘smart gates’ where objects in the camera’s field of view are labeled for further inspection and entry-allowance.

Specific Instantiation

The ABC Corporation has a campus, C1, with a set of buildings. And in each building there are gates, each identified with a number, that workers must pass through. When the worker passes through the gate a camera observes them in order to identify them, and to determine if they have adequate safetey protection.

In this particular instantiation, we will focus on gate, G1, of building, B1, in the complex. So in the hierarchical representation, the context of the instantiation can be captured in the following string, c1.b1.g1.

What You Will Learn

The system uses three separate applications to build an Edge AIoT Machine Vision system. The three applications are independent of one another and share data through their flows on the ADLINK Data River. Each of the applications demonstrates different concepts necessary for building an Edge AIoT system.

Gain insight into the following solutions:

Learn to build and run a solution with these properties:

  1. Simple,
  2. Interoperable,
  3. Connected, and
  4. Self-forming.

How it Works

In its most basic form, the solution consists of the following components (modeled as Things on the Data River™):

  1. An outside-world-observer: i.e. a Camera (that captures frames from a camera/video-device and streams those frames as a VideoFrame ‘data-flow’ in the Data River)
    • As explained above the ‘context’ of THIS frame-streamer Thing would be c1.b1.g1.cam1 and by default, its published data-flows would inherit that same context for the raw data-flow from a dumb camera.
    • In this particular example, we use a Frame Streamer video file that simulates a camera.
  1. A derived-value-service: i.e. an Inference Engine (that detects objects in the frames of a flow, represented by so-called ‘detectionBoxes’ and which it publishes as a related flow on the Data River)
    • This derived-value-service could be instructed to ‘digest’ specific flows (i.e. from specific cameras) and could re-use the flowIds of those input-flows in its detectionBox output-flows so to facilitate ‘merging’ the raw and inferred data
  1. A user-interface: i.e. a viewer (that displays the resulting awareness by overlaying the raw VideoFrames of the frame-streamer with the detectionBoxes of the inference-engine)
    • By exploiting the ‘raw’ and ‘inference’ flow’s logical identifications (i.e. their flowId), the viewer can overlay the two streams and present this ‘awareness’ to its users

So apart from the ‘Things’ as the active-components that produce, transform and/or digest data-flows on the River, there’s the data itself, modeled as ‘TagGroups‘:

  1. VideoFrame TagGroup: a definition of the structure and kind of data related to flows of captured video-frames on the River
    • kind of data: this is captured in the tagGroup’s QoSProfile-selection which in this case is ‘video‘ (so that the dataRiver understands how to ‘treat’ that high-volume and periodic kind of data)
    • structure of the data: this is the set of named ‘tags’ that make-up this tagGroup, each with a description, unit-specification, and optionally an ‘allowed-range’ of its values
      • for this TagGroup it includes a tag that holds the video-frame itself alongside meta-data related to that stream w.r.t. the resolution, frame rate, pixel-format/channels and compression
namekindunitdescription
frame_idUINT32A unique identifier for the frame within a specific flow
timestampINT64secondsTime of image capture in seconds using epoch time
dataTYPE_BYTE_SEQVideo frame data
widthUINT32pixelsFrame width in pixels
heightUINT32pixelsFrame height in pixels
channelsUINT32channelsNumber of color channels in the frame
sizeUINT32bytesNumber of bytes in data
formatSTRINGPixelFormatPixel format using OpenCV color spaces
compressionSTRINGCompressionKindEncoding using http://www.fourcc.org/codecs.php
framerateFLOAT32fpsFrame transmission frequency
  1. DetectionBox TagGroup: a definition of the structure and kind of data related to inferencing-results on the River
    • kind of data: this is captured in the TagGroup’s QoSProfile-selection which in this case is ‘telemetry’ (so that the dataRiver understands how to ‘treat’ these noteworthy periodic observations)
      • for this tagGroup it includes references to the frame that the info is ‘inferred from’, and a set of detection-box coordinates, related labels, and confidence regarding the detected object
namekindunitdescription
frame_idUINT32A unique identifier for the frame within a specific flow
class_idINT32_SEQDetected object’s classification type as raw id. Each position in the sequence represents a separate object that was detected and corresponds to the values in the same position as each of the other sequences.
class_labelSTRING_SEQDetected object’s classification as a human-readable name.
x1FLOAT32_SEQ%Top left x coordinate as a % of frame width from (0,0)
y1FLOAT32_SEQ%Top left y coordinate as a % of frame height from (0,0)
x2FLOAT32_SEQ%Bottom right x coordinate as a % of frame width from (0,0)
y2FLOAT32_SEQ%Bottom right y coordinate as a % off frame height from (0,0)
probabilityFLOAT32_SEQ%Confidence in the predicted class of object within the box
metaSTRING_SEQFree form buffer of extra inference metadata

Outside World Observer : Frame Streamer

In this example, we simulate a camera by using a video file. The file is loaded on start-up of the FrameStreamer (using OpenCV) and each frame of the video is encoded as JPEG and streamed to the Data River at the specified frame rate. The FrameStreamer can be configured to loop the video until the application is exited otherwise it will run through all the frames in the video one time.

The following is the Python implementation of the Frame Streamer Thing.

    def run(self):
        self.__enter__()
        delay = 1 / self._fps
        while True:
            cap = cv2.VideoCapture(self._video_file)
            ret, frame = cap.read()

            self._set_frame_metadata(cap, frame)

            while(ret):
                _, frame_encode = cv2.imencode('.jpg', frame)
                frame_bytes = frame_encode.tobytes()
                self.write_frame(frame_bytes)
                ret, frame = cap.read()
                time.sleep(delay)

            cap.release()

            if not self._repeat:
                break

Derived Value Service : Inference Engine

The Inference Engine uses TensorFlow’s Object Detection API with the SSD Inception v2 COCO pre-trained model to detect objects in a frame. In this example we attach a listener to the VideoFrameData input. The flow to be processed by the Inference Engine can be configured using a filter on the input.

InferenceEngine -Flow

The following is the Python implementation of the Inference Engine Thing.

    def frame_available(self, flow_id, frame):
        frame_raw = np.frombuffer(bytes(frame.data), dtype=np.uint8)
        frame_decode = cv2.imdecode(frame_raw, 1)

        # The input needs to be a tensor, convert it using 'tf.convert_to_tensor
        input_tensor = np.asanyarray(frame_decode)
        input_tensor = tf.convert_to_tensor(input_tensor)
        # The model expects a batch of images, so add an axis with 'tf.newaxis'
        input_tensor = input_tensor[tf.newaxis,...]

        # Run inference
        output_dict = self._model(input_tensor)
        # All outputs are batches tensors
        # Convert to numpy arrays, and take index[0] to remove the batch dimension
        # We're only interested in the first num_detections
        num_detections = int(output_dict.pop('num_detections'))

        output_dict = {key:value[0, :num_detections].numpy()
                       for key, value in output_dict.items()}

        output_dict['num_detections'] = num_detections

        # detection_classes should be ints
        output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)

        detection_box = self._get_detection_boxes(output_dict['detection_boxes'],
                                   output_dict['detection_classes'],
                                   output_dict['detection_scores'],
                                   self._category_index)
        detection_box.frame_id = frame.frame_id
        eu.write_tag(self._thing, 'DetectionBoxData', flow_id, detection_box)

To configure the filter in the Properties file of the Inference Engine.

{
    ...,
    "inputSettings" : [
    	{
    		"name" : "VideoFrameData",
    		"filters" : {
    			"flowIdFilters" : ["c1.b1.g1.cam1"]
			  }
		  }
    ]
}

User-Interface : Viewer

The Viewer uses OpenCV’s Python API to display the frames it receives from the Data River on a specific flow. Before displaying the frame it overlays the latest Detection Boxes that it has received. The flow to be displayed by the Viewer can be configured through a command line parameter that then uses a selector.

Viewer -Flow

The following is the Python implementation of the Viewer Thing. In order to simplify the implementation, we have used the RxPy framework which provides a set of operators that can be used to workstreams of data. In this particular implementation, we use the combine_latest operator merging the streams for both the frames and detection boxes into a new stream that outputs a tuple containing the latest element of each stream whenever either stream produces an element. The tuples are passed to a _handle_frame function that displays the frame with overlays (if there are any) using OpenCV.

Note the implementation also allows for the Viewer to be run and display the original frames without the overlay. This can be done by setting the _overlay attribute to False.

def run(self):
  detection_thread = None
  frame_thread = threading.Thread(target=self._run_frames)
  
  if self._overlay:
    detection_thread = threading.Thread(target=self._run_detection_boxes)
  
  frame_thread.start()
  if self._overlay:
    detection_thread.start()
    
  combined = self._frame_subject.combine_latest(self._detection_box_subject,
                                                lambda frame, boxes: [frame, boxes])
  combined.subscribe(self._handle_frame)
  
  frame_thread.join()
  
  if self._overlay:
    detection_thread.join()
   
  cv2.destroyAllWindows()

Appendix A: Tag Group Definitions

Video Frames

./definitions/com.adlinktech.example/VideoFrameTagGroup.json

{
      "name":"VideoFrame",
     "context":"com.adlinktech.example",
     "qosProfile":"video",
     "version":"v1.0",
     "description":"Video frame sample",
     "tags":[
         {
             "name":"frame_id",
             "description":"Frame sample ID",
             "kind":"UINT32",
             "unit":"NUM"
         },
         {
             "name":"timestamp",
             "description":"Time of image capture event",
             "kind":"INT64",
             "unit":"time"
         },
         {
             "name":"data",
             "description":"Video frame data",
             "kind":"TYPE_BYTE_SEQ",
             "unit":"Frame data"
         },
         {
             "name":"width",
             "description":"Frame width",
             "kind":"UINT32",
             "unit":"Pixels"
         },
         {
             "name":"height",
             "description":"Frame height",
             "kind":"UINT32",
             "unit":"Pixels"
         },
         {
             "name":"channels",
             "description":"Channels",
             "kind":"UINT32",
             "unit":"Number of channels"
         },
         {
             "name":"size",
             "description":"Data size",
             "kind":"UINT32",
             "unit":"Size"
         },
         {
             "name":"format",
             "description":"Pixel format using OpenCV Definitions",
             "kind":"STRING",
             "unit":"PixelFormat"
         },
         {
             "name":"compression",
             "description":"Compression technology used for video frame",
             "kind":"STRING",
             "unit":"CompressionKind"
         },
         {
             "name":"framerate",
             "description":"Frame transmission frequency",
             "kind":"FLOAT32",
             "unit":"fps"
         }
     ]
 }

Detection Boxes

./definitions/TagGroup/com.adlinktech.example/DetectionBoxTagGroup.json

[
     {
         "name":"DetectionBox",
         "context":"com.adlinktech.example",
         "qosProfile":"telemetry",
         "version":"v1.0",
         "description":"Inference engine results for object detection model outputing bounding boxes",
         "tags":[
             {
                 "name":"frame_id",
                 "description":"ID of the input video frame fed to the inference engine",
                 "kind":"UINT32",
                 "unit":"NUM"
             },
             {
                 "name":"class_id",
                 "description":"Detected object's classification type as raw id",
                 "kind":"INT32_SEQ",
                 "unit":"UUID"
             },
             {
                 "name":"class_label",
                 "description":"Detected object's classification as proper name",
                 "kind":"STRING_SEQ",
                 "unit":"UUID"
             },
             {
                 "name":"x1",
                 "description":"Top Left X Coordinate (% from 0,0)",
                 "kind":"FLOAT32_SEQ",
                 "unit":"Percentage"
             },
             {
                 "name":"y1",
                 "description":"Top Left Y Coordinate (% from 0,0)",
                 "kind":"FLOAT32_SEQ",
                 "unit":"Percentage"
             },
             {
                 "name":"x2",
                 "description":"Bottom Right X Coordinate (% from 0,0)",
                 "kind":"FLOAT32_SEQ",
                 "unit":"Percentage"
             },
             {
                 "name":"y2",
                 "description":"Bottom Right Y Coordinate (% from 0,0)",
                 "kind":"FLOAT32_SEQ",
                 "unit":"Percentage"
             },
             {
                 "name":"probability",
                 "description":"Network confidence",
                 "kind":"FLOAT32_SEQ",
                 "unit":"Percentage"
             },
             {
                 "name":"meta",
                 "description":"Buffer for extra inference metadata",
                 "kind":"STRING_SEQ",
                 "unit":"N/A"
             }
         ]
     }
 ]

Appendix B: The Streamer

Parameters

nametyperequireddefaultdescription
video_filestringtrueMandatory file name of the video file to load and stream.
repeatboolfalseTrueIf True it will loop the video.
fpsintfalse30The frame rate to read and stream the video file. The value is the number of frames per second.
propertiesstrfalse./config/FrameStreamer.jsonThe URI (without file://) to the properties file.
tag_group_dirstrfalse./definitions/TagGroupThe directory containing the TagGroup definitions.
thing_class_dirstrfalse./definitions/ThingClassThe directory containing the ThingClass definitions.

Properties

./config/FrameStreamer.json

{
     "id": "79EC6787DA88",
     "classId": "FrameStreamer:com.adlinktech.example:v1.0",
     "contextId": "c1.b1.g1.cam1",
     "description": "Default Video frames on the data river Thing"
 }

Thing Class Definition

./definitions/ThingClass/com.adlinktech.example/FrameStreamerThingClass.json

{
     "name": "FrameStreamer",
     "context": "com.adlinktech.example",
     "version": "v1.0",
     "description": "Video frames on the data river",
     "outputs": [
         {
             "name": "VideoFrameData",
             "tagGroupId": "VideoFrame:com.adlinktech.example:v1.0"
         }
     ]
 }

How to Run

 $ python the_streamer.py -i /home/adlink/videos/bolt-detection.mp4

Appendix C: Inference Engine

Parameters

nametyperequireddefaultdescription
propertiesstrfalse./config/Viewer.jsonThe URI (without file://) to the properties file
tag_group_dirstrfalse./definitions/TagGroupThe directory containing the TagGroup definitions
thing_class_dirstrfalse./definitions/ThingClassThe directory containing the ThingClass definitions.

Properties

./config/InferenceEngine.json

{
    "id": "_AUTO_",
    "classId": "InferenceEngine:com.adlinktech.example:v1.0",
    "contextId": "c1.inference_engine1",
    "description": "Default Video frames on the data river Thing"
}

Thing Class Definition

./definitions/ThingClass/com.adlinktech.example/InferenceEngineThingClass.json

{
    "name": "InferenceEngine",
    "context": "com.adlinktech.example",
    "version": "v1.0",
    "description": "An inference engine that transforms frames into a list of detection boxes",
    "inputs": [
        {
            "name": "VideoFrameData",
            "tagGroupId": "VideoFrame:com.adlinktech.example:v1.0"
        }
    ],
    "outputs": [
       {
            "name": "DetectionBoxData",
            "tagGroupId": "DetectionBox:com.adlinktech.example:v1.0"
       }
    ]
}

How to Run

Requires that the following be installed

And that PYTHONPATH is defined and points to

$ python the_detector.py

Appendix D: Viewer

Parameters

nametyperequireddefaultdescription
video-flow_idstrtrueMandatory flow id to be viewed.
propertiesstrfalse./config/Viewer.jsonThe URI (without file://) to the properties file
tag_group_dirstrfalse./definitions/TagGroupThe directory containing the TagGroup definitions
thing_class_dirstrfalse./definitions/ThingClassThe directory containing the ThingClass definitions.

Properties

./config/Viewer.json

{
    "id": "_AUTO_",
    "classId": "Viewer:com.adlinktech.example:v1.0",
    "contextId": "c1.viewer1",
    "description": "Default Video frames on the data river Thing"
}

Thing Class Definition

./definitions/ThingClass/com.adlinktech.example/ViewerThingClass.json

{
    "name": "Viewer",
    "context": "com.adlinktech.example",
    "version": "v1.0",
    "description": "Video frames on the data river",
    "inputs": [
        {
            "name": "VideoFrameData",
            "tagGroupId": "VideoFrame:com.adlinktech.example:v1.0"
        },
      {
            "name": "DetectionBoxData",
            "tagGroupId": "DetectionBox:com.adlinktech.example:v1.0"
        }
    ]
}

How to Run

$ python the_viewer.py -fi c1.b1.g1.cam1

Stay in touch

Sign up to our email list to be notified of the latest industry news