Datafeed#

:warning: The datafeed 1 service will be fully replaced by the datafeed 2 service in the future. Please consider using datafeed 2.

For more information on the timeline as well as on the benefits of datafeed 2, please reach out to your Technical Account Manager or to our developer documentation.

The datafeed service is a service used for handling the Real Time Events.

When a user makes an interaction within the IM, MIM or Room chat like sending a message, joining or leaving a room chat, when a connection request is sent, when a wall post is published or when a user replies to a Symphony element, an event will be sent to the datafeed.

The datafeed service is a core service built on top of the Datafeed API and provides a dedicated contract to bot developers to work with datafeed.

How to use#

The central component for the contract between bot developers and the Datafeed API is the DatafeedLoop. This service is accessible from the SymphonyBdk object by calling the datafeed() method. For instance:

from symphony.bdk.core.config.loader import BdkConfigLoader
from symphony.bdk.core.symphony_bdk import SymphonyBdk
from symphony.bdk.core.service.datafeed.real_time_event_listener import RealTimeEventListener

class RealTimeEventListenerImpl(RealTimeEventListener):

    async def on_message_sent(self, initiator, event):
        # message received, interact with it
        pass

async def run():
    async with SymphonyBdk(BdkConfigLoader.load_from_symphony_dir("config.yaml")) as bdk:
        datafeed_loop = bdk.datafeed()
        datafeed_loop.subscribe(RealTimeEventListenerImpl())

        await datafeed_loop.start()

The overridden RealTimeEventListener methods must be async in order for them to be properly executed.

If a listener throws an exception, it will be logged and datafeed loop will carry on. For more information about error handling, see here

Concurrency of datafeed event handling#

Please mind that the handling of events is done concurrently for a given chunk of events received from one read datafeed call. For a given set of events received from one datafeed call, the datafeed loop will create separate asyncio tasks for every event in the chunk and for every registered listener. The loop will wait for all listener tasks to complete before making another read datafeed call.

This means, for a given received chunk of events:

  • for a given event, the corresponding listener method will run concurrently across the different listener instances.

  • for a given listener instance, the listener methods will run concurrently across the different events received.

If you don’t want the listener calls to block the datafeed loop (i.e. having concurrency across listener calls and across datafeed events), all listener methods must create tasks and return immediately. For instance:

import asyncio
from symphony.bdk.core.activity.command import CommandActivity, CommandContext


class HelloCommandActivity(CommandActivity):

    def __init__(self):
        super().__init__()

    def matches(self, context: CommandContext) -> bool:
        return context.text_content.startswith("@" + context.bot_display_name + " /command")

    async def on_activity(self, context: CommandContext):
        # Create task to enable parallelism. Must be done for all listeners in order for the datafeed loop not to be blocked by listeners.
        asyncio.create_task(self.actual_logic(context))

    async def actual_logic(self, context):
      pass

Logging#

To ease the logging of event handling, a ContextVar is set with the following value: f"{current_task_name}/{event_id}/{listener_id}". It is accessible in symphony.bdk.core.service.datafeed.abstract_datafeed_loop.event_listener_context can be used in logging with a filter like:

import logging

from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import event_listener_context

class EventListenerLoggingFilter(logging.Filter):
    def filter(self, record):
        record.context_id = event_listener_context.get("")
        return True

logging.config.dictConfig({
    "version": 1,
    "disable_existing_loggers": False,
    "filters": {"contextFilter": {"()": "EventListenerLoggingFilter"}},
    "formatters": {
        "standard": {
            "format": "%(asctime)s - %(name)s - %(levelname)s - %(context_id)s - %(message)s"
        },
    },
    "handlers": {
        "default": {
            "level": "DEBUG",
            "formatter": "standard",
            "class": "logging.StreamHandler",
            "stream": "ext://sys.stdout",  # Default is stderr
            "filters": ["contextFilter"]
        },
    },
    "loggers": {
        "": {  # root logger
            "handlers": ["default"],
            "level": "DEBUG",
            "propagate": False
        }
    }
})

Datafeed Configuration#

The Datafeed Service can be configured by the datafeed field in the configuration file:

datafeed:
  version: 'v2' # specify datafeed version 'v1' or 'v2'
  retry:
    maxAttempts: 6 # maximum number of retry attempts
    initialIntervalMillis: 2000 # initial interval between two attempts
    multiplier: 1.5 # interval multiplier after each attempt
    maxIntervalMillis: 10000 # limit of the interval between two attempts

For now, not all the customers have the datafeed version 2 available on their systems, that’s why bot developers are able to choose the datafeed version that they wish to use on their bot. If you want to use datafeed version 2 in your bot, you need to set the version parameter as v2 as shown above. Otherwise, the datafeed version 1 will be used by default.

Bot developers can also configure a dedicated retry configuration which will be used only by the datafeed service as shown above.

Default retry configuration#

By default, the Datafeed retry is configured to have an infinite number of attempts. If no configuration is provided for Datafeed, it is equivalent to use:

datafeed:
  retry:
    maxAttempts: -1 # infinite number of attemps
    initialIntervalMillis: 500
    multiplier: 2
    maxIntervalMillis: 300000

Datahose#

Datahose is very similar to datafeed: it enables a bot to receive Real Time Events with the main difference that all events of the pod are received. The datahose loop is a core service built on top of the events API and provide a dedicated contract to bot developers to work with Datahose. This is compatible with Agent version 22.5 onwards.

The Activity API is not meant to be used with datahose.

How to use#

The central component for the contract between bot developers and the Datafeed API is the DatahoseLoop. This service is accessible from the SymphonyBdk object by calling the datahose() method. For instance:

async def run():
    
    # create bdk entry point
    async with SymphonyBdk(BdkConfigLoader.load_from_symphony_dir("config.yaml")) as bdk:
        
        # get datahose service
        datahose_loop = bdk.datahose()
        
        # subscribe a listener
        datahose_loop.subscribe(RealTimeEventListenerImpl())
        
        # start reading the datahose
        await datahose_loop.start()

An example of the usage of the Datahose service can be found here

Datahose Configuration#

Datahose Service can be configured by the datafeed field in the configuration file:

datahose:
    tag: fancyTag # optional tag that will be used when creating or reusing a datahose feed
    filters: # mandatory field, events you want to receive
        - SOCIALMESSAGE
        - CREATE_ROOM
    retry: # optional
        maxAttempts: 6 # maximum number of retry attempts
        initialIntervalMillis: 2000 # initial interval between two attempts
        multiplier: 1.5 # interval multiplier after each attempt
        maxIntervalMillis: 10000 # limit of the interval between two attempts

The minimal configuration for the datahose service is the filters field. It should contain at least one value chosen among the following:

  • SOCIALMESSAGE

  • CREATE_ROOM

  • UPDATE_ROOM

:warning: If you want to use SOCIALMESSAGE filter (i.e. consume message sent events), ceservice credentials must be configured in your Symphony agent.

The tag field is optional and is used when creating and reusing datahose feeds. If you have several instances of the same bot and want them to use the same datahose feed (so that events are spread over bot instances), all instances should have the same tag value (or no tag field).

Bot developers can also configure a dedicated retry mechanism which will be used only by the datahose service. Basically, the datahose service retry configuration has the field same as the global retry configuration with the fields for implementing the exponential backoff mechanism.

Infinite retries#

By default, like datafeed, datahose retry is configured to have an infinite number of attempts. This is equivalent to:

datahose:
    retry:
        maxAttempts: -1 # infinite number of attemps
        initialIntervalMillis: 2000
        multiplier: 1.5
        maxIntervalMillis: 10000

Subscribe/Unsubscribe RealTimeEventListener#

The datahose loop uses the RealTimeEventListener like in the datafeed loop. Due to technical limitations, datahose loop only receives a subset of all real time events:

  • Message Sent

  • Symphony Elements Action

  • IM/MIM Created

  • Room Created

  • Room Updated Message

  • Room Deactivated Message

  • Room Reactivated Message

The datahose Service can subscribe/unsubscribe one or many RealTimeEventListener by calling DatahoseLoop#subscribe or DatahoseLoop#unsubscribe. For instance:

# subscribe a listener
datahose_loop.subscribe(listener)

# unsubscribe a listener
datahose_loop.unsubscribe(listener)

Best practices#

Event handling#

It is recommended for bot developer to make their listeners idempotent if possible or to deal with duplicated events. When running multiple instances of a bot, this could happen if the event is slowly processed in one instance and gets re-queued and dispatched to another instance. It can also happen that the user types a bot command twice by mistake.

Symphony also does not provide any ordering guarantees. While most of the time the events will be received in order, if a user is very fast you might receive the second message first. In a busy room, events could be flowing in and out in a non-chronological order.

Received events should be processed quickly (in less than 30 seconds) to avoid them being re-queued in datafeed. If the business logic of the listener leads to long operations then it is recommended handle them in a separate asyncio task or thread to avoid blocking the datafeed/datahose loop. To help you detect this situation, warning logs will be printed if the event processing time exceeds 30 seconds.

Before shutting down a bot instance, you want to make sure that the datafeed/datahose loop is properly stopped and that the bot has stopped processing events. To do so, call datafeed.stop() which has optional arguments hard_kill and timeout (in seconds). datafeed.stop() will wait for all tasks to finish, whereas datafeed.stop(hard_kill=False, timeout) will wait for timeout seconds before killing the tasks. datafeed.stop(hark_kill=True) will stop all pending or running tasks immediately.

Stopping the datafeed and/or datahose loop might take a while (if the loop is currently waiting for new events, up to 30 seconds).

Error handling#

The datafeed/datahose loop once started will keep running until the bot is stopped. So it will catch all the exceptions raised by listeners or activities to prevent the loop from stopping. However, if the processing of an event failed, and if nothing specific is done by the listener to store it in a database or a queue, an event could be lost and never processed by the bot.

The BDK provides a way to re-queue events if needed through the EventError that can be raised from listeners. In that case the datafeed/datahose loop current execution for the currently received events will stop (other listeners will not be called), and the events will be re-queued in datafeed. The datafeed/datahose loop will resume its execution and will after some delays receive non-processed events (30s by default).

This feature is not available for datafeed v1. When the datafeed/datahose loop executes it can receive several events at once and will dispatch them to all the subscribed listeners. Therefore, you should be careful about no processing an event twice. This can be achieved by maintaining a short time lived cache of the already processed events.

Running multiple instances of a bot (DF v2 and Datahose only)#

An example is provided in the multiple_instances folder.

With datafeed v2, it is possible to run multiple instances of a bot. Each instance will receive events in turn. The examples also makes use of Hazelcast to keep a distributed cache of already processed events and avoid replying to a message twice.

The logic to avoid handling an event twice is tied to the bot and its logic so the BDK makes no assumption about it and lets you manage it freely.

The same applies to datahose. To enable this behavior, make sure you have the same datahose.tag value (or no tag field) in the configuration of all your bot instances.