File: request_response_subscription_set.h

package info (click to toggle)
aws-crt-python 0.28.4%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 78,428 kB
  • sloc: ansic: 437,955; python: 27,657; makefile: 5,855; sh: 4,289; ruby: 208; java: 82; perl: 73; cpp: 25; xml: 11
file content (140 lines) | stat: -rw-r--r-- 4,791 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H

/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

#include <aws/common/byte_buf.h>
#include <aws/common/hash_table.h>
#include <aws/common/linked_list.h>
#include <aws/mqtt/exports.h>

struct aws_mqtt_rr_incoming_publish_event;

/*
 * Handles subscriptions for request-response client.
 * Lifetime of this struct is bound to request-response client.
 */
struct aws_request_response_subscriptions {
    struct aws_allocator *allocator;

    /*
     * Map from cursor (topic filter) -> list of streaming operations using that filter
     *
     * We don't garbage collect this table over the course of normal client operation.  We only clean it up
     * when the client is shutting down.
     */
    struct aws_hash_table streaming_operation_subscription_lists;

    /*
     * Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter
     *
     * We don't garbage collect this table over the course of normal client operation.  We only clean it up
     * when the client is shutting down.
     */
    struct aws_hash_table streaming_operation_wildcards_subscription_lists;

    /*
     * Map from cursor (topic) -> request response path (topic, correlation token json path)
     */
    struct aws_hash_table request_response_paths;
};

/*
 * This is the (key and) value in stream subscriptions tables.
 */
struct aws_rr_operation_list_topic_filter_entry {
    struct aws_allocator *allocator;

    struct aws_byte_cursor topic_filter_cursor;
    struct aws_byte_buf topic_filter;

    struct aws_linked_list operations;
};

/*
 * Value in request subscriptions table.
 */
struct aws_rr_response_path_entry {
    struct aws_allocator *allocator;

    size_t ref_count;

    struct aws_byte_cursor topic_cursor;
    struct aws_byte_buf topic;

    struct aws_byte_buf correlation_token_json_path;
};

/*
 * Callback type for matched stream subscriptions.
 */
typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
    const struct aws_linked_list *operations,
    const struct aws_byte_cursor *topic_filter,
    const struct aws_mqtt_rr_incoming_publish_event *publish_event,
    void *user_data);

/*
 * Callback type for matched request subscriptions.
 */
typedef void(aws_mqtt_request_operation_subscription_match_fn)(
    struct aws_rr_response_path_entry *entry,
    const struct aws_mqtt_rr_incoming_publish_event *publish_event,
    void *user_data);

AWS_EXTERN_C_BEGIN

/*
 * Initialize internal state of a provided request-response subscriptions structure.
 */
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init(
    struct aws_request_response_subscriptions *subscriptions,
    struct aws_allocator *allocator);

/*
 * Clean up internals of a provided request-response subscriptions structure.
 */
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up(
    struct aws_request_response_subscriptions *subscriptions);

/*
 * Add a subscription for stream operations.
 * If subscription with the same topic filter is already added, previously created
 * aws_rr_operation_list_topic_filter_entry instance is returned.
 */
AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry *
    aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
        struct aws_request_response_subscriptions *subscriptions,
        const struct aws_byte_cursor *topic_filter);

/*
 * Add a subscription for request operation.
 */
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
    struct aws_request_response_subscriptions *subscriptions,
    const struct aws_byte_cursor *topic_filter,
    const struct aws_byte_cursor *correlation_token_json_path);

/*
 * Remove a subscription for a given request operation.
 */
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
    struct aws_request_response_subscriptions *subscriptions,
    const struct aws_byte_cursor *topic_filter);

/*
 * Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event.
 */
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish(
    const struct aws_request_response_subscriptions *subscriptions,
    const struct aws_mqtt_rr_incoming_publish_event *publish_event,
    aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
    aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
    void *user_data);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_SET_H */