Search for answers or browse our knowledge base.
Google Cloud Pub/Sub Reference Guide
0 out of 5 stars
5 Stars | 0% | |
4 Stars | 0% | |
3 Stars | 0% | |
2 Stars | 0% | |
1 Stars | 0% |
The Google Cloud Pub/Sub (GCPS) messaging service provides the ability to send and receive messages over the Google Cloud. Vantiq offers direct integration with GCPS through sources, which can produce messages to Google Topics and consume messages from Google Subscriptions. For more information on GCPS, take a look at the Google Cloud Documentation.
The basic process for setting up and using a GCPS Source is as follows:
- An administrator defines a GCPS source by identifying the Google Cloud Project containing the desired set of topics/subscriptions, along with the Google Key that is used to authenticate. Optionally, the administrator can define a list of subscriptions from which the source will listen for messages. This is accomplished in the Vantiq IDE by using the Add button to select Source….
- Once the GCPS source has been defined, the server will begin listening for messages as they arrive on the specified subscriptions (assuming they were included in the source definition).
- When a message arrives on the source endpoint, an event is generated which will trigger the execution of any subscribed rules. The event may also be delivered to any clients with transient subscriptions on the event’s id.
- Source processing rules are encouraged to store any persistent state in the Vantiq automation model. This enables the rule itself to be stateless, making it easier to support various load balancing approaches such as executing the rule across a cluster or partitioning work between multiple Vantiq servers.
GCPS Source Representation
A source resource defines the integration with a set GCPS topics/subscriptions and must contain the following properties:
- name: The name of the stream given by the user
- type: Must be the string GCPS indicating this is a GCPS source.
- config: A JSON object containing additional GCPS configuration parameters:
- projectID: The ID for the Google Cloud Project containing the topics and subscriptions to connect to. This value must be a string.
- googleKey: The Google Key (JSON) used to authenticate. The key can be entered directly here, or it can be stored as a Vantiq Secret, in which case the value would be a secret reference (i.e. “/system.secrets/myGoogleKeySecret”).
- googleKeyType: The type for the aforementioned Google Key. This value must either be “secret”, or “plain text”.
- subscriptionsIDs: (Optional) A list of the GCPS Subscriptions that the source will consume messages from. This must be a list of strings.
- pollingInterval: (Optional) The frequency (in milliseconds) at which the source will poll for messages from GCPS Subscriptions.
Create a GCPS Source
The following example illustrates how to create a GCPS source using the REST API. GCPS sources can also be defined in the Vantiq IDE by using the Add button to select Source….
POST https://dev.vantiq.com/api/v1/resources/sources
{
"name": "myGcpsSource",
"type": "GCPS",
"config": {
"projectID": "myGcpsProjectID",
"googleKey": "/system.secrets/myGoogleKeySecret",
"googleKeyType": "secret",
"subscriptionIDs": ["myFirstGcpsSubscription", "mySecondGcpsSubscription", ...],
"pollingInterval": 500
}
}
Delete a GCPS Source
The example GCPS source named myGcpsSource can be deleted using the REST API by issuing the following request:
DELETE https://dev.vantiq.com/api/v1/resources/sources/myGcpsSource
Produce Messages on a GCPS Topic
Messages are produced on GCPS topics in VAIL with the PUBLISH command. The PUBLISH request for GCPS sources takes a minimum of two parameters:
- topicID: The GCPS Topic to which the message will be published
- At least one of the following:
- message: Considered the body of the message, this value must be a String.
- attributes: A series of key-value pairs that must be Strings.
For Example:
PUBLISH { topicID: "myGcpsTopic", message: "my message"} TO SOURCE myGcpsSource
OR
var attribs = {}
attribs.myKey = "myValue"
PUBLISH { topicID: "myGcpsTopic", attributes: attribs} TO SOURCE myGcpsSource
Of course, both attributes and messages can be specified in the same PUBLISH command, as follows:
var attribs = {}
attribs.myKey = "myValue"
PUBLISH { topicID: "myGcpsTopic", message: "my message", attributes: attribs} TO SOURCE myGcpsSource
Additionally, the user can specify an orderingKey, so long as the source configuration has set the enableMessageOrdering field to true
. Typically, the orderingKey should be one of the keys listed in the attributes portion of the message. The orderingKey must be a String, as follows:
var attribs = {}
attribs.myKey = "myValue"
attribs.count = 1
PUBLISH { topicID: "myGcpsTopic", message: "my message", attributes: attribs, orderingKey: "count"} TO SOURCE myGcpsSource
Consuming Messages from GCPS Subscriptions
The GCPS Source will only listen for messages arriving on subscriptions set in the source configuration. These messages will appear as messages arriving from the given source, and they will be formatted as follows:
{
"data": string,
"attributes": {
string: string,
...
},
"messageId": string,
"publishTime": string,
"subscriptionId": string
}
- The
data
field is considered the body of the message, and is a string. - The
attributes
field is a series of key-value string pairs. - The
messageId
field is set by the Google Cloud, and is unique within the message’s topic. - The
publishTime
field is set by the Google Cloud, and designates the time at which the message was published. - The
subscriptionId
field specifies the subscription from which the message arrived.
To trigger a rule whenever a message is consumed from a GCPS source, one could use a rule similar to this:
RULE myGcpsRule
WHEN EVENT OCCURS ON "/sources/myGcpsSource" AS msg
// (User can handle msg here, this is just an example)
log.debug("Received a message from {} with publish time: {}, and ID: {}",
[msg.subscriptionId, msg.publishTime, msg.messageId])
// This assumes that the message has attributes named "count" and "type"
INSERT MyVantiqType(messageBody: msg.data,
messageCount: msg.attributes.count,
messageType: msg.attributes.type)
0 out of 5 stars
5 Stars | 0% | |
4 Stars | 0% | |
3 Stars | 0% | |
2 Stars | 0% | |
1 Stars | 0% |
-
Getting Started
-
- Advanced Collaboration Tutorial
- Analytics Tutorial
- App Component Tutorial
- Application Deployment Tutorial
- Assembly Tutorial
- Autopsy Debugger Tutorial
- Catalogs Tutorial
- Client Builder Tutorial
- Client Component Tutorial
- Collaboration Tutorial
- Conversation Widget Tutorial
- Floor Plan
- GenAI Builder Tutorial
- Source Tutorial
- Stateful Services
- System Modeler Tutorial
- Testing the Debugging Tutorial
- Testing the Introductory Tutorial
- Testing the Source Tutorial
- User and Namespace Administration Tutorial
- Show Remaining Articles ( 5 ) Collapse Articles
-
Product Documentation
-
-
-
- ‘On Assets Loaded’ Event
- ‘On Data Arrived’ Event
- ‘On End’ Event
- ‘On Network Status Changed’ Event
- ‘On Start’ Event
- abort()
- addEventHandler()
- adjustPopupSizeAndPosition()
- Basic Information
- cancelSpeaking()
- children
- clearInterval()
- clearTimeout()
- clearValidationErrors()
- clone()
- closePopup()
- confirmCustom()
- confirmCustomEx()
- copyMatchingData(obj:any):void
- createClientEventDataStream()
- createDataChangedDataStream()
- createOutboundServiceEventDataStream()
- createPagedQueryDataStream()
- createPublishEventDataStream()
- createResourceEventDataStream()
- createResponseObject()
- createSourceEventDataStream()
- createTimedQueryDataStream()
- data
- data
- DataObject
- DataStream
- deleteAll()
- deleteOne()
- deleteOne()
- errorDialog()
- execute()
- execute()
- executePublic()
- executePublic()
- executeStreamed()
- executeStreamed()
- executeStreamedPublic()
- executeStreamedPublic()
- formatMsg()
- generateUUID()
- getCollaborationContext()
- getCurrentPage()
- getCurrentPopup()
- getDataStreamByName()
- getDataStreamByUUID()
- getDeviceId()
- getDeviceName()
- getDocumentAssetLabelList()
- getDocumentAssetList()
- getDocumentUrl()
- getGroupNames()
- getLocation()
- getName()
- getProfileNames()
- getRequestParameters()
- getStateObject()
- getUsername()
- getUserRecord()
- getWidget()
- goToPage()
- Http
- infoDialog()
- initializePropertyToDefaultValue(propertyName:string):void
- initializeToDefaultValues():void
- insert()
- insert()
- instance
- isNetworkActive
- isPaused:boolean
- isPublic
- localeCountryCode
- localeLanguageCode
- localeVariantCode
- logout()
- markupImage()
- modifyClientEvent()
- modifyDataChanged()
- modifyPagedQuery()
- modifyPublishEvent()
- modifyResourceEvent()
- modifyServiceEvent()
- modifySourceEvent()
- modifyTimedQuery()
- name:string
- navBarBackgroundColor
- navBarForegroundColor
- navBarIcon
- navBarIconHeight
- navBarIconWidth
- navBarShowControls
- navBarTitle
- navBarTitleFontFamily
- navBarTitleFontSize
- navBarTitleFontWeight
- overrideLocale
- Page
- patch()
- playAudio()
- playVideo()
- popupPage()
- publish()
- publish()
- publishToServiceEvent()
- query()
- recordAudio()
- recordVideo()
- remove():void
- restart():void
- returnToCallingPage()
- scanBarcode()
- select()
- select()
- selectOne()
- selectOne()
- sendClientEvent()
- sendLocation()
- setInterval()
- setTimeout()
- setVantiqHeaders()
- setVantiqUrlForResource()
- setVantiqUrlForSystemResource()
- showDocument()
- showHttpErrors()
- showMap()
- speakText()
- startBLEScan()
- stopGeofencing()
- takePhoto()
- terminate()
- terminateWithDialog()
- update()
- update()
- uploadDataURL()
- uploadDocument()
- upsert()
- upsert()
- uuid:string
- validate()
- Show Remaining Articles ( 129 ) Collapse Articles
-
-
- Accessing Namespaces in the Organization
- Active Resource Control Center
- Adding a New User to an Organization
- Adding a New User to the Application Namespace
- Administrators' Concepts and Terminology
- Authorizing Users to Access the Application
- Creating a Developer Namespace for the Organization Administrator
- Creating a New Application Namespace
- Creating Resources for New Namespaces
- Custom User Invites
- Deploying the GenAI Flow Service Connector
- Developer Tasks
- Handling Administrators Leaving
- Related Configuration
- Removing Namespace Administrators
- Self-Administration Tasks
- System Administration Tasks
- Viewing Lists of Users
- Show Remaining Articles ( 3 ) Collapse Articles
-
- Deploy Results Tab
- Deploying the same application to different environments
- Deployment
- Deployment Tool - Introduction
- Environment
- Environment Tab
- Node
- Project Partitions
- Redeploy On A Failed Node
- Reliable Deployment
- Settings Tab
- The Graph View
- The Tree View
- Undeploy
- Update Partitions
- Verify Application After Deployment
- Show Remaining Articles ( 1 ) Collapse Articles
-
- CheckedInsert/CheckedUpsert Command
- Command Line Options
- Delete Command
- Execute Command
- Export Command
- Find Command
- Help Command
- Import Command
- Insert Command
- Installation - Prerequisites
- Installation - Profile
- List Command
- Load Command
- Recommend Command
- Run Command
- Select Command
- Stop Command
- The Vantiq Command Line Interface (CLI) - Overview
- Upsert Command
- Show Remaining Articles ( 4 ) Collapse Articles
-
- App Execution Dashboard
- App With Split Dashboard
- Dashboard Navigation Bar
- Deprecated Dashboards
- Event Processing Dashboard
- General Dashboard Behavior
- Getting Started with Grafana
- Grafana Usage
- Monitoring Namespaces with Grafana
- Most Commonly Used Dashboards
- Namespace Monitoring Dashboards
- Organization Level Behavior
- Procedure and Rule Execution Dashboards
- Profiling Dashboards
- Reliable Event Dashboard
- Resource Usage Dashboard
- Service Execution Dashboard
- Service Handler Dashboard
- Source Activity Dashboard
- Storage Manager Dashboard
- Tensorflow Model Dashboard
- Type Storage Dashboard
- Show Remaining Articles ( 7 ) Collapse Articles
-
- Access to a Kubernetes Cluster
- Creating a K8s Cluster
- Delayed Processing
- Deploying K8s Installations to a Kubernetes Cluster
- Deploying the K8s Worker
- External Lifecycle Management Guide - Overview
- K8s Worker
- Kubernetes Components of a K8s Installation
- Kubernetes Namespaces
- Loading Images into a Kubernetes Cluster
- Managing K8s Installations
- Other Configuration Options
- System View
- Use of the self Cluster
- Using a Kubernetes Cluster
- Using Templates to Deploy the K8s Worker
- Vantiq Namespaces
- Verify Installation
- Show Remaining Articles ( 3 ) Collapse Articles
-
- Changing the System Password
- Creating a GenAIFlowService Service Connector
- Creating a New Organization and Namespace
- Deployment Methods
- Docker Deployment
- Edge Installation Management
- Edge Vision Server
- Executable JAR Deployment
- MongoDB
- Requirements
- Running the Vantiq Executable
- Setting the default LLMs API key
- Setting Up Vantiq Edge
- Vantiq Edge Reference Guide - Overview
- Vantiq Edge Self Node
- Windows bat file
- Show Remaining Articles ( 1 ) Collapse Articles
-
- Additional Buffer Semantics
- Applicability
- auditFrequency Quota
- Background
- Default Quotas
- Detailed Credit Quotas
- errorBreaker Quota
- errorReportingFrequency Quota
- Execution Credit Quota
- Execution Credit Quota - Diagnostics
- Execution Credit Quota - Mitigation
- Execution Rate Quota
- Execution Rate Quota - Diagnostics
- Execution Rate Quota - Mitigations
- executionTime Quota
- k8sResources Quota
- Quota Interactions
- receiveMessage Quota
- receiveMessage Quota - Diagnostics
- receiveMessage Quota - Mitigation
- reservedGroups Quota
- stackDepth Quota
- Stream Quota
- Terminology
- Workload Management
- Workload Management Conceptual Model
- Show Remaining Articles ( 11 ) Collapse Articles
-
-
-
- Android Post-Installation Instructions
- Authentication Functions
- Database Functions
- Installation Instructions
- iOS Post-Installation Instructions
- Miscellaneous Functions
- Prerequisites
- Procedure Execution Functions
- Publishing Functions
- Sample iOS AppDelegate.m File
- User Creation Functions
- Vantiq Functionality for React Native Apps
-
-
- Accumulate State
- Analytics
- Answer Question
- App Activity Tasks
- App Builder Guide - Introduction
- App Builder Overview
- Assign
- Build and Predict Path
- Cached Enrich
- Chat
- Close Collaboration
- Collaborations in Apps
- Compute Statistics
- Convert Coordinates
- Creating an App
- DBScan
- Delay
- Dependency Management
- Dwell
- Enrich
- Error Handling
- Escalate
- EscalateState
- Establish Collaboration
- Event Redelivery
- Event Stream
- Filter
- GenAI Flow
- Get Collaboration
- Interpret Conversational Language
- Join
- K-Means Cluster
- Limit
- Linear Regression
- Log Stream
- Loop While
- Merge
- Notify
- Optional Imports
- Polynomial Fitter
- Predict Paths By Age
- Procedure
- Process Intent
- PublishToService
- PublishToSource
- PublishToTopic
- Rate
- Recommend
- RecordEvent
- Reliable Apps
- Run TensorFlow Model On Document
- Run TensorFlow Model On Image
- Run TensorFlow Model On Tensors
- Sample
- SaveToType
- Split By Group
- Submit Prompt
- Threshold
- Time Difference
- Track
- Track Motion
- Tracking Progress
- Transformation
- Unwind
- VAIL
- VisionScript
- Window
- Within Tracking Region
- YOLO From Images
- Show Remaining Articles ( 54 ) Collapse Articles
-
-
-
- Broker Service
- Catalog Operations
- Catalog Procedures
- Connect to Catalog
- Create Entry
- Create Entry
- Custom Operations
- Disconnect from Catalog
- Host Catalog
- Integrating Applications With the Catalog
- Managing Catalog
- Managing Event Types
- Publisher Service
- Register
- Remove Entry
- Repair Catalog
- Resolve
- Subscriber Service
- Unhost Catalog
- Unregister
- Utilities
- Show Remaining Articles ( 6 ) Collapse Articles
-
- AMQP Reference Guide
- CHATBOT Reference Guide
- Email Reference Guide
- Enterprise Connectors Reference Guide
- External Source Reference Guide
- Google Cloud Pub/Sub Reference Guide
- KAFKA Reference Guide
- MQTT Reference Guide
- Push Notification Reference Guide
- Remote Reference Guide
- SMS Reference Guide
- Video Reference Guide
-
-
- Advanced Use Cases
- Data Manipulation
- Defining Types
- Discovery from External Data Store
- Error Handling
- Installation and Use
- Native Language Implementation
- Restricting Capabilities
- Service Connectors
- Storage Manager Assembly Contents
- Storage Manager Service API
- Storage Manager Transactions
- Storage Managers - Introduction
- Transaction Support
-
-
-
- App Pane
- Autopsies
- Defining a Run Policy
- Defining a Test Suite - Properties
- Defining an Input
- Defining an Output
- Error Pane
- Integration Tests
- Populate Testing Namespace With Data
- Procedure Pane
- Rule Pane
- Running a Test in the IDE
- Running a Test through the REST Interface
- Source Mocking For Tests
- Unit Tests
- Vantiq Testing Reference Guide - Introduction
- Show Remaining Articles ( 1 ) Collapse Articles
-
-
-
-
Articles
-
- Build Your Own Tools
- Cache Services
- Camel Assemblies
- Client to Component Conversaion
- Discovering Current Session Information
- Dynamic Client Content
- Dynamic Map View Widget
- Filters
- GenAI Builder Tools
- Generative AI Functions (Tools)
- Generative AI with Collaborations
- How-To Video Shorts: Managing AI Conversations
- How-To Videos: AI Design Model Assistant
- How-To Videos: AI Documentation Search
- Managing AI Conversation
- Production Applications Best Practices
- Public Clients
- Security Secrets
- Service Event Handlers
- Sharing Resources
- Streaming AI Output
- Transformations
- Using the Video Sources
- Web-Based APIs
- Show Remaining Articles ( 9 ) Collapse Articles
-
- How To Video Shorts - LLM Playground
- How To Video Shorts: Client Layouts
- How To Video Shorts: AI Tools (Functions)
- How To Video Shorts: Analytics and ComputeStatistics
- How To Video Shorts: Calling Procedures by Properties
- How To Video Shorts: Client CSS
- How To Video Shorts: Invite Other Users to Your Namespace
- How To Video Shorts: Multi-Modal Example
- How To Video Shorts: SplitByGroup
- How To Video Shorts: The Vantiq API
- How To Video Shorts: The Vantiq IDE
- How To Video Shorts: The Vantiq Version Control System
- How To Video Shorts: Using Generative AI in Applications
- How To Videos - Maintaining AI Conversations in Collaborations