跳到正文

消息队列

托管消息队列:发布与消费消息、批量发送、死信队列(DLQ)、重投递与推送订阅。

概览

基础路径: https://api.infrai.cc/v1/queue
鉴权头: Authorization: Bearer $INFRAI_API_KEY
bash
# Call any /v1/queue capability over raw HTTP — no SDK to install.
# curl:
curl https://api.infrai.cc/v1/queue/... \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json"

方法

queue.publish

POST /v1/queue/publish

向队列发布消息,可选延迟。

参数

名称类型必填说明
queuestring
必填
队列名称。
payloadobject
必填
消息负载。
delay_secondsnumber可选投递前的延迟(秒)。
idempotency_keystring可选可选去重 key;相同重试将返回同一结果。

返回

{ message_id }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/publish \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "orders", "payload": {"order_id": "o_123"}, "priority": 5}'

queue.create

POST /v1/queue/create

创建一个消息队列

参数

名称类型必填说明
namestring
必填
队列名称
type"standard" | "fifo"可选队列类型:standard 标准 / fifo 先进先出
dead_letter_queuestring可选接收失败消息的死信队列名(须已存在)
max_retriesnumber可选转入 DLQ 前允许的 nack 次数
message_retention_daysnumber可选消息保留天数
visibility_timeout_defaultnumber可选默认可见性超时(租约)秒数
enable_priorityboolean可选是否启用优先级
idempotency_keystring可选幂等键,避免重复创建

返回

QueueRecord { queue, type, dead_letter_queue?, max_retries, visibility_timeout_default }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/create \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"name": "..."}'

queue.get

GET /v1/queue/get/{queue}

获取队列配置详情

参数

名称类型必填说明
queuestring
必填
队列名称

返回

QueueRecord { queue, type, dead_letter_queue?, max_retries, visibility_timeout_default }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X GET https://api.infrai.cc/v1/queue/get/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY"

queue.list

GET /v1/queue/list

分页列出所有队列

参数

名称类型必填说明
cursorstring可选分页游标,来自上一页 next_cursor
limitnumber可选每页返回数量

返回

{ items: QueueRecord[], next_cursor? }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X GET https://api.infrai.cc/v1/queue/list \
  -H "Authorization: Bearer $INFRAI_API_KEY"

queue.update

PATCH /v1/queue/update/{queue}

更新队列配置

参数

名称类型必填说明
queuestring
必填
队列名称
dead_letter_queuestring可选死信队列名
max_retriesnumber可选转入 DLQ 前允许的 nack 次数
message_retention_daysnumber可选消息保留天数
visibility_timeout_defaultnumber可选默认可见性超时秒数
enable_priorityboolean可选是否启用优先级
idempotency_keystring可选幂等键,避免重复执行

返回

QueueRecord { queue, max_retries, visibility_timeout_default }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X PATCH https://api.infrai.cc/v1/queue/update/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{}'

queue.delete

DELETE /v1/queue/delete/{queue}

删除一个队列

参数

名称类型必填说明
queuestring
必填
队列名称
idempotency_keystring可选幂等键,避免重复执行

返回

{ queue, deleted }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X DELETE https://api.infrai.cc/v1/queue/delete/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY"

queue.purge

POST /v1/queue/purge/{queue}

清空队列中所有消息

参数

名称类型必填说明
queuestring
必填
队列名称
idempotency_keystring可选幂等键,避免重复执行

返回

{ queue, purged }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/purge/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "..."}'

queue.stats

GET /v1/queue/stats/{queue}

获取队列统计指标

参数

名称类型必填说明
queuestring
必填
队列名称

返回

QueueStats { queue, available, in_flight, dlq, oldest_age_seconds? }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X GET https://api.infrai.cc/v1/queue/stats/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY"

queue.publish_batch

POST /v1/queue/publish_batch

批量发布多条消息到队列

参数

名称类型必填说明
queuestring
必填
队列名称
messagesQueueMessage[]
必填
待发布的消息数组
idempotency_keystring可选幂等键,避免重复发布

返回

{ message_ids }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/publish_batch \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "...", "messages": []}'

queue.consume

POST /v1/queue/consume

从队列拉取一批消息消费

参数

名称类型必填说明
queuestring
必填
队列名称
max_messagesnumber可选单次最多拉取的消息数
visibility_timeoutnumber可选本批消息的租约秒数;默认取队列配置

返回

{ messages: QueueMessage[] }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/consume \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "..."}'

queue.ack

POST /v1/queue/ack

确认一条已消费的消息

参数

名称类型必填说明
queuestring
必填
队列名称
message_idstring
必填
消息 ID
idempotency_keystring可选幂等键,避免重复执行

返回

{ message_id, acked }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/ack \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "...", "message_id": "..."}'

queue.nack

POST /v1/queue/nack

否认一条消息,可重新入队或转入死信

参数

名称类型必填说明
queuestring
必填
队列名称
message_idstring
必填
消息 ID
requeueboolean可选true 重新入队;false 直接转入死信
idempotency_keystring可选幂等键,避免重复执行

返回

{ message_id, nacked, requeued }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/nack \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "...", "message_id": "..."}'

queue.dlq.list

GET /v1/queue/dlq/list/{queue}

分页列出死信队列中的消息

参数

名称类型必填说明
queuestring
必填
队列名称
cursorstring可选分页游标,来自上一页 next_cursor
limitnumber可选每页返回数量

返回

{ items: QueueMessage[], next_cursor? }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X GET https://api.infrai.cc/v1/queue/dlq/list/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY"

queue.dlq.redrive

POST /v1/queue/dlq/redrive/{queue}

将死信消息重投回原队列

参数

名称类型必填说明
queuestring
必填
队列名称
message_idstring可选重投单条消息的 ID;省略则批量重投
sincestring可选批量重投:该时刻及之后进入死信的全部消息
idempotency_keystring可选幂等键,避免重复重投

返回

{ queue, redriven }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/dlq/redrive/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "orders", "max_messages": 100}'

queue.push_subscribe

POST /v1/queue/push_subscribe/{queue}

为队列配置推送订阅

参数

名称类型必填说明
queuestring
必填
队列名称
urlstring
必填
https 推送目标地址(经 SSRF 校验)
secretstring可选对推送请求做 HMAC 签名的密钥
max_retriesnumber可选推送失败重试次数
visibility_timeoutnumber可选可见性超时秒数
dead_letter_queuestring可选死信队列名
idempotency_keystring可选幂等键,避免重复订阅

返回

PushSubscription { subscription_id, queue, url }

示例

一次性前置(每个范例都假定已完成):

bash
# No SDK to install — every call is a plain HTTPS request.
# Get a project key by signing in at https://infrai.cc/login (Google/GitHub gives
# you $2 free credit; email sign-in starts at $0). On 402 INSUFFICIENT_CREDIT, add
# funds at https://infrai.cc/billing (or POST /v1/account/topup and open the
# returned checkout_url).
export INFRAI_API_KEY="ifr_..."
bash
curl -X POST https://api.infrai.cc/v1/queue/push_subscribe/QUEUE \
  -H "Authorization: Bearer $INFRAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"queue": "orders", "forward_to": "https://api.acme.com/order-handler"}'

全部能力

本模块全部已路由能力——完整的对外 REST 契约。上方方法是带讲解的入门示例,此表是完整参考。

能力端点说明
queue.ackPOST /v1/queue/ackAcknowledge (ack) a consumed message to remove it from the queue.
queue.consumePOST /v1/queue/consumePull a batch of messages from a queue for processing.
queue.createPOST /v1/queue/createCreate a message queue (standard or FIFO, with optional dead-letter queue).
queue.deleteDELETE /v1/queue/delete/{queue}Delete a message queue and discard all of its pending messages (use queue.purge to keep the queue); idempotent.
queue.dlq.listGET /v1/queue/dlq/list/{queue}List messages in a queue's dead-letter queue (DLQ) with pagination.
queue.dlq.redrivePOST /v1/queue/dlq/redrive/{queue}Redrive dead-letter messages back to the source queue, individually or in bulk.
queue.getGET /v1/queue/get/{queue}Retrieve a queue's configuration details.
queue.listGET /v1/queue/listList all message queues with pagination.
queue.nackPOST /v1/queue/nackNegatively acknowledge (nack) a message to requeue it or route it to the DLQ.
queue.publishPOST /v1/queue/publishPublish a single message to a queue (idempotent).
queue.publish_batchPOST /v1/queue/publish_batchPublish multiple messages to a queue in a single batch.
queue.purgePOST /v1/queue/purge/{queue}Purge all messages from a queue.
queue.push_subscribePOST /v1/queue/push_subscribe/{queue}Configure a push subscription to deliver queue messages to a callback URL.
queue.statsGET /v1/queue/stats/{queue}Retrieve queue metrics: available, in-flight, and dead-lettered message counts.
queue.updatePATCH /v1/queue/update/{queue}Update a queue's DLQ, retry, and retention settings.

完整示例

本模块的生产级端到端范例:先一次性配置,再运行业务流程,尽量覆盖本模块的多数 API。

单文件可运行 Python 程序(仅标准库、无 SDK):拷贝后填入 INFRAI_API_KEY 运行,即可按真实业务流逐步体验本模块核心 API——每一步都真实调用并计费,后续步骤复用前一步返回的真实字段。12 行 helper 就是全部集成代码。

python
#!/usr/bin/env python3
"""Infrai · queue — runnable real-app example (single file, zero deps).

Copy this file, set your key, run it: every step is a REAL call to
api.infrai.cc, billed at the real (tiny) per-call price, printing the
live JSON response. Get a key at https://infrai.cc/login (Google/
GitHub sign-in grants $2 free credit); add funds at
https://infrai.cc/billing. No SDK — the 12-line helper below is the
entire integration."""
import json
import os
from urllib import error, request

KEY = os.environ.get("INFRAI_API_KEY") or "ifr_..."  # <- your key
BASE = "https://api.infrai.cc"


# Same raw HTTPS POST/GET as every per-method example on this page —
# wrapped once for reuse. There is nothing else to it: no SDK.
def infrai(method, path, body=None):
    req = request.Request(
        BASE + path, method=method,
        data=json.dumps(body).encode() if body is not None else None,
        headers={"Authorization": f"Bearer {KEY}",
                 "Content-Type": "application/json"})
    try:
        with request.urlopen(req, timeout=60) as r:
            return json.loads(r.read())
    except error.HTTPError as e:
        return json.loads(e.read())


def show(label, resp):
    print(f"\n== {label} ==")
    print(json.dumps(resp, indent=2, ensure_ascii=False))
    return resp


# 1) queue.create — POST /v1/queue/create · Create a message queue (standard or FIFO, with optional dead-letter queue).
r1 = show("queue.create", infrai("POST", "/v1/queue/create", {"name":"jobs"}))

# 2) queue.publish — POST /v1/queue/publish · Publish a single message to a queue (idempotent).
r2 = show("queue.publish", infrai("POST", "/v1/queue/publish", {"queue":"jobs","payload":{"hello":"world"}}))

# 3) queue.consume — POST /v1/queue/consume · Pull a batch of messages from a queue for processing.
r3 = show("queue.consume", infrai("POST", "/v1/queue/consume", {"queue":"jobs","max_messages":1}))