1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
import os
import json
import uuid
import boto3
# inside docker use docker dns name localstack
# os.environ['LOCALSTACK_SQS_ENDPOINT_URL'] = 'http://localstack:4576'
# if your connecting to the localstack outside docker use host dns
# each aws service has its own endpoint url ensure boto3 client is configured accordingly
# you can change endpoint_url to point to any local aws stack e.g aws local dynamodb instance
os.environ["LOCALSTACK_SQS_ENDPOINT_URL"] = "http://localhost:4566"
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
os.environ["AWS_SECRET_ACCESS_KEY"] = "bar"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
sqs = boto3.client("sqs", endpoint_url=os.environ["LOCALSTACK_SQS_ENDPOINT_URL"])
body = {
"time": {
"updated": "Jul 4, 2020 14:12:00 UTC",
"updatedISO": "2020-07-04T14:12:00+00:00",
"updateduk": "Jul 4, 2020 at 15:12 BST",
},
"disclaimer": "This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org",
"bpi": {
"USD": {
"code": "USD",
"rate": "9,083.8632",
"descriptio n": "United States Dollar",
"rate_float": 9083.8632,
},
"BTC": {
"code": "BTC",
"rate": "1.0000",
"description": "Bitcoin",
"rate_float": 1,
},
},
}
# Below is your typical message sending and receiving with long polling
print("Sending Message")
response = sqs.send_message(
QueueUrl="http://localhost:4566/00000000000/sample-queue",
MessageBody=json.dumps(body),
DelaySeconds=3,
MessageDeduplicationId=str(uuid.uuid4()),
MessageAttributes={
"contentType": {"StringValue": "application/json", "DataType": "String"}
},
)
print(response)
print("Waiting for message")
# WaitTimeSeconds=20 enables longer polling this means less read cycles to SQS reducing your costs if running in production
messages = sqs.receive_message(
QueueUrl="http://localhost:4566/00000000000/sample-queue",
AttributeNames=["All"],
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=30,
)
messages = messages.get("Messages", [])
print("Total messages = {}".format(len(messages)))
for message in messages:
message_body = json.loads(message.get("Body"))
print(message_body)
sqs.delete_message(
QueueUrl="http://localhost:4566/00000000000/sample-queue",
ReceiptHandle=message.get("ReceiptHandle"),
)
messages = sqs.receive_message(
QueueUrl="http://localhost:4566/00000000000/sample-queue",
AttributeNames=["All"],
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=30,
)
messages = messages.get("Messages", [])
print("Total messages remaining ={}".format(len(messages)))
|