Search for answers or browse our knowledge base.
Google Cloud Pub/Sub Reference Guide
0 out of 5 stars
| 5 Stars | 0% | |
| 4 Stars | 0% | |
| 3 Stars | 0% | |
| 2 Stars | 0% | |
| 1 Stars | 0% |
The Google Cloud Pub/Sub (GCPS) messaging service provides the ability to send and receive messages over the Google Cloud. Vantiq offers direct integration with GCPS through sources, which can produce messages to Google Topics and consume messages from Google Subscriptions. For more information on GCPS, take a look at the Google Cloud Documentation.
The basic process for setting up and using a GCPS Source is as follows:
- An administrator defines a GCPS source by identifying the Google Cloud Project containing the desired set of topics/subscriptions, along with the Google Key that is used to authenticate. Optionally, the administrator can define a list of subscriptions from which the source will listen for messages. This is accomplished in the Vantiq IDE by using the Add button to select Source….
- Once the GCPS source has been defined, the server will begin listening for messages as they arrive on the specified subscriptions (assuming they were included in the source definition).
- When a message arrives on the source endpoint, an event is generated which will trigger the execution of any subscribed rules. The event may also be delivered to any clients with transient subscriptions on the event’s id.
- Source processing rules are encouraged to store any persistent state in the Vantiq automation model. This enables the rule itself to be stateless, making it easier to support various load balancing approaches such as executing the rule across a cluster or partitioning work between multiple Vantiq servers.
GCPS Source Representation
A source resource defines the integration with a set GCPS topics/subscriptions and must contain the following properties:
- name: The name of the stream given by the user
- type: Must be the string GCPS indicating this is a GCPS source.
- config: A JSON object containing additional GCPS configuration parameters:
- projectID: The ID for the Google Cloud Project containing the topics and subscriptions to connect to. This value must be a string.
- googleKey: The Google Key (JSON) used to authenticate. The key can be entered directly here, or it can be stored as a Vantiq Secret, in which case the value would be a secret reference (i.e. “/system.secrets/myGoogleKeySecret”).
- googleKeyType: The type for the aforementioned Google Key. This value must either be “secret”, or “plain text”.
- subscriptionsIDs: (Optional) A list of the GCPS Subscriptions that the source will consume messages from. This must be a list of strings.
- pollingInterval: (Optional) The frequency (in milliseconds) at which the source will poll for messages from GCPS Subscriptions.
Create a GCPS Source
The following example illustrates how to create a GCPS source using the REST API. GCPS sources can also be defined in the Vantiq IDE by using the Add button to select Source….
POST https://dev.vantiq.com/api/v1/resources/sources
{
"name": "myGcpsSource",
"type": "GCPS",
"config": {
"projectID": "myGcpsProjectID",
"googleKey": "/system.secrets/myGoogleKeySecret",
"googleKeyType": "secret",
"subscriptionIDs": ["myFirstGcpsSubscription", "mySecondGcpsSubscription", ...],
"pollingInterval": 500
}
}
Delete a GCPS Source
The example GCPS source named myGcpsSource can be deleted using the REST API by issuing the following request:
DELETE https://dev.vantiq.com/api/v1/resources/sources/myGcpsSource
Produce Messages on a GCPS Topic
Messages are produced on GCPS topics in VAIL with the PUBLISH command. The PUBLISH request for GCPS sources takes a minimum of two parameters:
- topicID: The GCPS Topic to which the message will be published
- At least one of the following:
- message: Considered the body of the message, this value must be a String.
- attributes: A series of key-value pairs that must be Strings.
For Example:
PUBLISH { topicID: "myGcpsTopic", message: "my message"} TO SOURCE myGcpsSource
OR
var attribs = {}
attribs.myKey = "myValue"
PUBLISH { topicID: "myGcpsTopic", attributes: attribs} TO SOURCE myGcpsSource
Of course, both attributes and messages can be specified in the same PUBLISH command, as follows:
var attribs = {}
attribs.myKey = "myValue"
PUBLISH { topicID: "myGcpsTopic", message: "my message", attributes: attribs} TO SOURCE myGcpsSource
Additionally, the user can specify an orderingKey, so long as the source configuration has set the enableMessageOrdering field to true. Typically, the orderingKey should be one of the keys listed in the attributes portion of the message. The orderingKey must be a String, as follows:
var attribs = {}
attribs.myKey = "myValue"
attribs.count = 1
PUBLISH { topicID: "myGcpsTopic", message: "my message", attributes: attribs, orderingKey: "count"} TO SOURCE myGcpsSource
Consuming Messages from GCPS Subscriptions
The GCPS Source will only listen for messages arriving on subscriptions set in the source configuration. These messages will appear as messages arriving from the given source, and they will be formatted as follows:
{
"data": string,
"attributes": {
string: string,
...
},
"messageId": string,
"publishTime": string,
"subscriptionId": string
}
- The
datafield is considered the body of the message, and is a string. - The
attributesfield is a series of key-value string pairs. - The
messageIdfield is set by the Google Cloud, and is unique within the message’s topic. - The
publishTimefield is set by the Google Cloud, and designates the time at which the message was published. - The
subscriptionIdfield specifies the subscription from which the message arrived.
To trigger a rule whenever a message is consumed from a GCPS source, one could use a rule similar to this:
RULE myGcpsRule
WHEN EVENT OCCURS ON "/sources/myGcpsSource" AS msg
// (User can handle msg here, this is just an example)
log.debug("Received a message from {} with publish time: {}, and ID: {}",
[msg.subscriptionId, msg.publishTime, msg.messageId])
// This assumes that the message has attributes named "count" and "type"
INSERT MyVantiqType(messageBody: msg.data,
messageCount: msg.attributes.count,
messageType: msg.attributes.type)
0 out of 5 stars
| 5 Stars | 0% | |
| 4 Stars | 0% | |
| 3 Stars | 0% | |
| 2 Stars | 0% | |
| 1 Stars | 0% |
-
Getting Started
-
- Advanced Collaboration Tutorial
- Analytics Tutorial
- App Component Tutorial
- Application Deployment Tutorial
- Assembly Tutorial
- Autopsy Debugger Tutorial
- Catalogs Tutorial
- Client Builder Tutorial
- Client Component Tutorial
- Collaboration Tutorial
- Conversation Widget Tutorial
- Floor Plan
- GenAI Builder Tutorial
- Source Tutorial
- Stateful Services
- System Modeler Tutorial
- Testing the Debugging Tutorial
- Testing the Introductory Tutorial
- Testing the Source Tutorial
- User and Namespace Administration Tutorial
- Show Remaining Articles ( 5 ) Collapse Articles
-
Product Documentation
-
-
-
- ‘On Assets Loaded’ Event
- ‘On Data Arrived’ Event
- ‘On End’ Event
- ‘On Network Status Changed’ Event
- ‘On Start’ Event
- abort()
- addEventHandler()
- adjustPopupSizeAndPosition()
- Basic Information
- cancelSpeaking()
- children
- clearInterval()
- clearTimeout()
- clearValidationErrors()
- clone()
- closePopup()
- confirmCustom()
- confirmCustomEx()
- copyMatchingData(obj:any):void
- createClientEventDataStream()
- createDataChangedDataStream()
- createOutboundServiceEventDataStream()
- createPagedQueryDataStream()
- createPublishEventDataStream()
- createResourceEventDataStream()
- createResponseObject()
- createSourceEventDataStream()
- createTimedQueryDataStream()
- data
- data
- DataObject
- DataStream
- deleteAll()
- deleteOne()
- deleteOne()
- errorDialog()
- execute()
- execute()
- executePublic()
- executePublic()
- executeStreamed()
- executeStreamed()
- executeStreamedPublic()
- executeStreamedPublic()
- formatMsg()
- generateUUID()
- getCollaborationContext()
- getCurrentPage()
- getCurrentPopup()
- getDataStreamByName()
- getDataStreamByUUID()
- getDeviceId()
- getDeviceName()
- getDocumentAssetLabelList()
- getDocumentAssetList()
- getDocumentUrl()
- getGroupNames()
- getLocation()
- getName()
- getProfileNames()
- getRequestParameters()
- getStateObject()
- getUsername()
- getUserRecord()
- getWidget()
- goToPage()
- Http
- infoDialog()
- initializePropertyToDefaultValue(propertyName:string):void
- initializeToDefaultValues():void
- insert()
- insert()
- instance
- isNetworkActive
- isPaused:boolean
- isPublic
- localeCountryCode
- localeLanguageCode
- localeVariantCode
- logout()
- markupImage()
- modifyClientEvent()
- modifyDataChanged()
- modifyPagedQuery()
- modifyPublishEvent()
- modifyResourceEvent()
- modifyServiceEvent()
- modifySourceEvent()
- modifyTimedQuery()
- name:string
- navBarBackgroundColor
- navBarForegroundColor
- navBarIcon
- navBarIconHeight
- navBarIconWidth
- navBarShowControls
- navBarTitle
- navBarTitleFontFamily
- navBarTitleFontSize
- navBarTitleFontWeight
- overrideLocale
- Page
- patch()
- playAudio()
- playVideo()
- popupPage()
- publish()
- publish()
- publishToServiceEvent()
- query()
- recordAudio()
- recordVideo()
- remove():void
- restart():void
- returnToCallingPage()
- scanBarcode()
- select()
- select()
- selectOne()
- selectOne()
- sendClientEvent()
- sendLocation()
- setInterval()
- setTimeout()
- setVantiqHeaders()
- setVantiqUrlForResource()
- setVantiqUrlForSystemResource()
- showDocument()
- showHttpErrors()
- showMap()
- speakText()
- startBLEScan()
- stopGeofencing()
- takePhoto()
- terminate()
- terminateWithDialog()
- update()
- update()
- uploadDataURL()
- uploadDocument()
- upsert()
- upsert()
- uuid:string
- validate()
- Show Remaining Articles ( 129 ) Collapse Articles
-
-
- Android Post-Installation Instructions
- Authentication Functions
- Database Functions
- Installation Instructions
- iOS Post-Installation Instructions
- Miscellaneous Functions
- Prerequisites
- Procedure Execution Functions
- Publishing Functions
- Sample iOS AppDelegate.m File
- User Creation Functions
- Vantiq Functionality for React Native Apps
-
-
- Accumulate State
- Analytics
- Answer Question
- App Activity Tasks
- App Builder Guide - Introduction
- App Builder Overview
- Assign
- Build and Predict Path
- Cached Enrich
- Chat
- Close Collaboration
- Collaborations in Apps
- Compute Statistics
- Convert Coordinates
- Creating an App
- DBScan
- Delay
- Dependency Management
- Dwell
- Enrich
- Error Handling
- Escalate
- EscalateState
- Establish Collaboration
- Event Redelivery
- Event Stream
- Filter
- GenAI Flow
- Get Collaboration
- Interpret Conversational Language
- Join
- K-Means Cluster
- Limit
- Linear Regression
- Log Stream
- Loop While
- Merge
- Notify
- Optional Imports
- Polynomial Fitter
- Predict Paths By Age
- Procedure
- Process Intent
- PublishToService
- PublishToSource
- PublishToTopic
- Rate
- Recommend
- RecordEvent
- Reliable Apps
- Run TensorFlow Model On Document
- Run TensorFlow Model On Image
- Run TensorFlow Model On Tensors
- Sample
- SaveToType
- Split By Group
- Submit Prompt
- Threshold
- Time Difference
- Track
- Track Motion
- Tracking Progress
- Transformation
- Unwind
- VAIL
- VisionScript
- Window
- Within Tracking Region
- YOLO From Images
- Show Remaining Articles ( 54 ) Collapse Articles
-
-
-
- Broker Service
- Catalog Operations
- Catalog Procedures
- Connect to Catalog
- Create Entry
- Create Entry
- Custom Operations
- Disconnect from Catalog
- Host Catalog
- Integrating Applications With the Catalog
- Managing Catalog
- Managing Event Types
- Publisher Service
- Register
- Remove Entry
- Repair Catalog
- Resolve
- Subscriber Service
- Unhost Catalog
- Unregister
- Utilities
- Show Remaining Articles ( 6 ) Collapse Articles
-
- AMQP Reference Guide
- CHATBOT Reference Guide
- Email Reference Guide
- Enterprise Connectors Reference Guide
- External Source Reference Guide
- Google Cloud Pub/Sub Reference Guide
- KAFKA Reference Guide
- MQTT Reference Guide
- Push Notification Reference Guide
- Remote Reference Guide
- SMS Reference Guide
- Video Reference Guide
-
-
- Advanced Use Cases
- Data Manipulation
- Defining Types
- Discovery from External Data Store
- Error Handling
- Installation and Use
- Native Language Implementation
- Restricting Capabilities
- Service Connectors
- Storage Manager Assembly Contents
- Storage Manager Service API
- Storage Manager Transactions
- Storage Managers - Introduction
- Transaction Support
-
-
-
- App Pane
- Autopsies
- Defining a Run Policy
- Defining a Test Suite - Properties
- Defining an Input
- Defining an Output
- Error Pane
- Integration Tests
- Populate Testing Namespace With Data
- Procedure Pane
- Rule Pane
- Running a Test in the IDE
- Running a Test through the REST Interface
- Source Mocking For Tests
- Unit Tests
- Vantiq Testing Reference Guide - Introduction
- Show Remaining Articles ( 1 ) Collapse Articles
-
-
-
-
Articles
-
- Build Your Own Tools
- Cache Services
- Camel Assemblies
- Client to Component Conversaion
- Discovering Current Session Information
- Dynamic Client Content
- Dynamic Map View Widget
- Filters
- GenAI Builder Tools
- Generative AI Functions (Tools)
- Generative AI with Collaborations
- How-To Video Shorts: Managing AI Conversations
- How-To Videos: AI Design Model Assistant
- How-To Videos: AI Documentation Search
- Managing AI Conversation
- Production Applications Best Practices
- Public Clients
- Security Secrets
- Service Event Handlers
- Sharing Resources
- Streaming AI Output
- Transformations
- Using the Video Sources
- Web-Based APIs
- Show Remaining Articles ( 9 ) Collapse Articles
-
- How To Video Shorts - LLM Playground
- How To Video Shorts: Client Layouts
- How To Video Shorts: AI Tools (Functions)
- How To Video Shorts: Analytics and ComputeStatistics
- How To Video Shorts: Calling Procedures by Properties
- How To Video Shorts: Client CSS
- How To Video Shorts: Invite Other Users to Your Namespace
- How To Video Shorts: Multi-Modal Example
- How To Video Shorts: SplitByGroup
- How To Video Shorts: The Vantiq API
- How To Video Shorts: The Vantiq IDE
- How To Video Shorts: The Vantiq Version Control System
- How To Video Shorts: Using Generative AI in Applications
- How To Videos - Maintaining AI Conversations in Collaborations