-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_wrapper.py
More file actions
162 lines (134 loc) · 5.05 KB
/
queue_wrapper.py
File metadata and controls
162 lines (134 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Purpose
Demonstrate basic queue operations in Amazon Simple Queue Service (Amazon SQS).
Learn how to create, get, and remove standard, FIFO, and dead-letter queues.
Usage is shown in the test/test_queue_wrapper.py file.
"""
# snippet-start:[python.example_code.sqs.queue_wrapper_imports]
import logging
import boto3
from botocore.exceptions import ClientError
from os import environ
logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs', aws_access_key_id=environ['SQS_AWS_ACCESS_KEY_ID'],
aws_secret_access_key=environ['SQS_AWS_SECRET_ACCESS_KEY'],
region_name=environ['SQS_REGION'])
# snippet-end:[python.example_code.sqs.queue_wrapper_imports]
# snippet-start:[python.example_code.sqs.CreateQueue]
def create_queue(name, attributes=None):
"""
Creates an Amazon SQS queue.
:param name: The name of the queue. This is part of the URL assigned to the queue.
:param attributes: The attributes of the queue, such as maximum message size or
whether it's a FIFO queue.
:return: A Queue object that contains metadata about the queue and that can be used
to perform queue operations like sending and receiving messages.
"""
if not attributes:
attributes = {}
try:
queue = sqs.create_queue(
QueueName=name,
Attributes=attributes
)
logger.info("Created queue '%s' with URL=%s", name, queue.url)
except ClientError as error:
logger.exception("Couldn't create queue named '%s'.", name)
raise error
else:
return queue
# snippet-end:[python.example_code.sqs.CreateQueue]
# snippet-start:[python.example_code.sqs.GetQueueUrl]
def get_queue(name):
"""
Gets an SQS queue by name.
:param name: The name that was used to create the queue.
:return: A Queue object.
"""
try:
queue = sqs.get_queue_by_name(QueueName=name)
logger.info("Got queue '%s' with URL=%s", name, queue.url)
except ClientError as error:
logger.exception("Couldn't get queue named %s.", name)
raise error
else:
return queue
# snippet-end:[python.example_code.sqs.GetQueueUrl]
# snippet-start:[python.example_code.sqs.ListQueues]
def get_queues(prefix=None):
"""
Gets a list of SQS queues. When a prefix is specified, only queues with names
that start with the prefix are returned.
:param prefix: The prefix used to restrict the list of returned queues.
:return: A list of Queue objects.
"""
if prefix:
queue_iter = sqs.queues.filter(QueueNamePrefix=prefix)
else:
queue_iter = sqs.queues.all()
queues = list(queue_iter)
if queues:
logger.info("Got queues: %s", ', '.join([q.url for q in queues]))
else:
logger.warning("No queues found.")
return queues
# snippet-end:[python.example_code.sqs.ListQueues]
# snippet-start:[python.example_code.sqs.DeleteQueue]
def remove_queue(queue):
"""
Removes an SQS queue. When run against an AWS account, it can take up to
60 seconds before the queue is actually deleted.
:param queue: The queue to delete.
:return: None
"""
try:
queue.delete()
logger.info("Deleted queue with URL=%s.", queue.url)
except ClientError as error:
logger.exception("Couldn't delete queue with URL=%s!", queue.url)
raise error
# snippet-end:[python.example_code.sqs.DeleteQueue]
# snippet-start:[python.example_code.sqs.Scenario_ManageQueues]
def usage_demo():
"""Shows how to create, list, and delete queues."""
print('-'*88)
print("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!")
print('-'*88)
prefix = 'sqs-usage-demo-'
river_queue = create_queue(
prefix + 'peculiar-river',
{
'MaximumMessageSize': str(1024),
'ReceiveMessageWaitTimeSeconds': str(20)
}
)
print(f"Created queue with URL: {river_queue.url}.")
lake_queue = create_queue(
prefix + 'strange-lake.fifo',
{
'MaximumMessageSize': str(4096),
'ReceiveMessageWaitTimeSeconds': str(10),
'VisibilityTimeout': str(300),
'FifoQueue': str(True),
'ContentBasedDeduplication': str(True)
}
)
print(f"Created queue with URL: {lake_queue.url}.")
stream_queue = create_queue(prefix + 'boring-stream')
print(f"Created queue with URL: {stream_queue.url}.")
alias_queue = get_queue(prefix + 'peculiar-river')
print(f"Got queue with URL: {alias_queue.url}.")
remove_queue(stream_queue)
print(f"Removed queue with URL: {stream_queue.url}.", )
queues = get_queues(prefix=prefix)
print(f"Got {len(queues)} queues.")
for queue in queues:
remove_queue(queue)
print(f"Removed queue with URL: {queue.url}.")
print("Thanks for watching!")
print('-'*88)
# snippet-end:[python.example_code.sqs.Scenario_ManageQueues]
if __name__ == '__main__':
usage_demo()