aboutsummaryrefslogtreecommitdiff
path: root/sqs_sample.py
blob: d95f361ab983f223b81e2e2ff676b02ff037dd1f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os
import json
import uuid
import boto3

# inside docker use docker dns name localstack
# os.environ['LOCALSTACK_SQS_ENDPOINT_URL'] = 'http://localstack:4576'

# if your connecting to the localstack outside docker use host dns
# each aws service has its own endpoint url ensure boto3 client is configured accordingly
# you can change endpoint_url to point to any local aws stack e.g aws local dynamodb instance
os.environ["LOCALSTACK_SQS_ENDPOINT_URL"] = "http://localhost:4566"
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
os.environ["AWS_SECRET_ACCESS_KEY"] = "bar"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

sqs = boto3.client("sqs", endpoint_url=os.environ["LOCALSTACK_SQS_ENDPOINT_URL"])


body = {
    "time": {
        "updated": "Jul 4, 2020 14:12:00 UTC",
        "updatedISO": "2020-07-04T14:12:00+00:00",
        "updateduk": "Jul 4, 2020 at 15:12 BST",
    },
    "disclaimer": "This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org",
    "bpi": {
        "USD": {
            "code": "USD",
            "rate": "9,083.8632",
            "descriptio n": "United States Dollar",
            "rate_float": 9083.8632,
        },
        "BTC": {
            "code": "BTC",
            "rate": "1.0000",
            "description": "Bitcoin",
            "rate_float": 1,
        },
    },
}

# Below is your typical message sending and receiving with long polling
print("Sending Message")
response = sqs.send_message(
    QueueUrl="http://localhost:4566/00000000000/sample-queue",
    MessageBody=json.dumps(body),
    DelaySeconds=3,
    MessageDeduplicationId=str(uuid.uuid4()),
    MessageAttributes={
        "contentType": {"StringValue": "application/json", "DataType": "String"}
    },
)
print(response)


print("Waiting for message")
# WaitTimeSeconds=20 enables longer polling this means less read cycles to SQS reducing your costs if running in production
messages = sqs.receive_message(
    QueueUrl="http://localhost:4566/00000000000/sample-queue",
    AttributeNames=["All"],
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20,
    VisibilityTimeout=30,
)


messages = messages.get("Messages", [])
print("Total messages = {}".format(len(messages)))
for message in messages:
    message_body = json.loads(message.get("Body"))
    print(message_body)
    sqs.delete_message(
        QueueUrl="http://localhost:4566/00000000000/sample-queue",
        ReceiptHandle=message.get("ReceiptHandle"),
    )
    messages = sqs.receive_message(
        QueueUrl="http://localhost:4566/00000000000/sample-queue",
        AttributeNames=["All"],
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20,
        VisibilityTimeout=30,
    )
    messages = messages.get("Messages", [])
    print("Total messages remaining ={}".format(len(messages)))