Skip to content
This repository was archived by the owner on Jun 2, 2021. It is now read-only.

Commit d126d98

Browse files
committed
V3: introduce reoccurring_job to simplify async service create
1 parent 956b7b2 commit d126d98

14 files changed

Lines changed: 935 additions & 1150 deletions

app/actions/service_instance_create_managed.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require 'repositories/service_instance_share_event_repository'
2-
require 'jobs/v3/services/create_service_instance_job'
2+
require 'jobs/v3/create_service_instance_job'
33
require 'actions/mixins/service_instance_create'
44

55
module VCAP::CloudController

app/jobs/enqueuer.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ def enqueue
1515
enqueue_job(@job)
1616
end
1717

18-
def enqueue_pollable
19-
wrapped_job = PollableJobWrapper.new(@job)
18+
def enqueue_pollable(existing_guid: nil)
19+
wrapped_job = PollableJobWrapper.new(@job, existing_guid: existing_guid)
2020

2121
if block_given?
2222
wrapped_job = yield wrapped_job

app/jobs/pollable_job_wrapper.rb

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,32 @@
33
module VCAP::CloudController
44
module Jobs
55
class PollableJobWrapper < WrappingJob
6+
attr_reader :existing_guid
7+
8+
def initialize(handler, existing_guid: nil)
9+
@existing_guid = existing_guid
10+
super(handler)
11+
end
12+
613
# use custom hook as Job does not have the guid field populated during the normal `enqueue` hook
714
def after_enqueue(job)
8-
PollableJobModel.create(
9-
delayed_job_guid: job.guid,
10-
state: PollableJobModel::PROCESSING_STATE,
11-
operation: @handler.display_name,
12-
resource_guid: @handler.resource_guid,
13-
resource_type: @handler.resource_type
14-
)
15+
if existing_guid && (existing = PollableJobModel.find(guid: existing_guid))
16+
existing.update(
17+
delayed_job_guid: job.guid,
18+
state: PollableJobModel::POLLING_STATE,
19+
operation: @handler.display_name,
20+
resource_guid: @handler.resource_guid,
21+
resource_type: @handler.resource_type
22+
)
23+
else
24+
PollableJobModel.create(
25+
delayed_job_guid: job.guid,
26+
state: PollableJobModel::PROCESSING_STATE,
27+
operation: @handler.display_name,
28+
resource_guid: @handler.resource_guid,
29+
resource_type: @handler.resource_type
30+
)
31+
end
1532
end
1633

1734
def success(job)

app/jobs/reoccurring_job.rb

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
require 'jobs/cc_job'
2+
3+
module VCAP::CloudController
4+
module Jobs
5+
class ReoccurringJob < VCAP::CloudController::Jobs::CCJob
6+
attr_reader :finished, :start_time
7+
8+
def success(current_delayed_job)
9+
pollable_job = PollableJobModel.find_by_delayed_job(current_delayed_job)
10+
11+
if finished
12+
pollable_job.update(state: PollableJobModel::COMPLETE_STATE)
13+
elsif next_enqueue_would_exceed_maximum_duration?
14+
expire!
15+
else
16+
enqueue_next_job(pollable_job)
17+
end
18+
end
19+
20+
def maximum_duration_seconds
21+
@maximum_duration || default_maximum_duration_seconds
22+
end
23+
24+
def maximum_duration_seconds=(duration)
25+
@maximum_duration = duration if duration < default_maximum_duration_seconds
26+
end
27+
28+
def polling_interval_seconds
29+
@polling_interval || default_polling_interval_seconds
30+
end
31+
32+
def polling_interval_seconds=(interval)
33+
@polling_interval = if interval < default_polling_interval_seconds
34+
default_polling_interval_seconds
35+
elsif interval > 24.hours
36+
24.hours
37+
else
38+
interval
39+
end
40+
end
41+
42+
private
43+
44+
def initialize
45+
@start_time = Time.now
46+
@finished = false
47+
end
48+
49+
def default_maximum_duration_seconds
50+
VCAP::CloudController::Config.config.get(:broker_client_max_async_poll_duration_minutes).minutes
51+
end
52+
53+
def default_polling_interval_seconds
54+
Config.config.get(:broker_client_default_async_poll_interval_seconds)
55+
end
56+
57+
def next_enqueue_would_exceed_maximum_duration?
58+
Time.now + polling_interval_seconds > start_time + maximum_duration_seconds
59+
end
60+
61+
def finish
62+
@finished = true
63+
end
64+
65+
def expire!
66+
handle_timeout if self.respond_to?(:handle_timeout)
67+
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
68+
end
69+
70+
def enqueue_next_job(pollable_job)
71+
opts = {
72+
queue: Jobs::Queues.generic,
73+
run_at: Delayed::Job.db_time_now + polling_interval_seconds
74+
}
75+
76+
Jobs::Enqueuer.new(self, opts).enqueue_pollable(existing_guid: pollable_job.guid)
77+
end
78+
end
79+
end
80+
end
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
require 'jobs/reoccurring_job'
2+
3+
module VCAP::CloudController
4+
module V3
5+
class CreateServiceInstanceJob < VCAP::CloudController::Jobs::ReoccurringJob
6+
attr_reader :warnings
7+
8+
def initialize(service_instance_guid, arbitrary_parameters: {}, user_audit_info:)
9+
super()
10+
@service_instance_guid = service_instance_guid
11+
@arbitrary_parameters = arbitrary_parameters
12+
@user_audit_info = user_audit_info
13+
@start_time = Time.now
14+
@first_time = true
15+
@warnings = []
16+
end
17+
18+
def perform
19+
client = VCAP::Services::ServiceClientProvider.provide({ instance: service_instance })
20+
21+
if first_time
22+
compute_maximum_duration
23+
send_provision_request(client)
24+
compatibility_checks
25+
@first_time = false
26+
end
27+
28+
gone! if service_instance.nil?
29+
30+
operation_in_progress = service_instance.last_operation.type
31+
aborted! if operation_in_progress != 'create'
32+
33+
if service_instance.operation_in_progress?
34+
fetch_last_operation(client)
35+
end
36+
37+
if service_instance.last_operation.state == 'succeeded'
38+
record_event(service_instance, @arbitrary_parameters)
39+
finish
40+
elsif service_instance.last_operation.state == 'failed'
41+
operation_failed!(service_instance.last_operation.description)
42+
end
43+
end
44+
45+
def handle_timeout
46+
service_instance.save_and_update_operation(
47+
last_operation: {
48+
state: 'failed',
49+
description: 'Service Broker failed to provision within the required time.',
50+
}
51+
)
52+
end
53+
54+
def job_name_in_configuration
55+
:service_instance_create
56+
end
57+
58+
def max_attempts
59+
1
60+
end
61+
62+
def resource_type
63+
'service_instances'
64+
end
65+
66+
def resource_guid
67+
service_instance_guid
68+
end
69+
70+
def display_name
71+
'service_instance.create'
72+
end
73+
74+
private
75+
76+
attr_reader :service_instance_guid, :arbitrary_parameters, :first_time
77+
78+
def compute_maximum_duration
79+
max_poll_duration_on_plan = service_instance.service_plan.try(:maximum_polling_duration)
80+
self.maximum_duration_seconds = max_poll_duration_on_plan if max_poll_duration_on_plan
81+
end
82+
83+
def send_provision_request(client)
84+
broker_response = client.provision(
85+
service_instance,
86+
accepts_incomplete: true,
87+
arbitrary_parameters: arbitrary_parameters,
88+
maintenance_info: service_instance.service_plan.maintenance_info
89+
)
90+
91+
service_instance.save_with_new_operation(broker_response[:instance], broker_response[:last_operation])
92+
rescue => e
93+
service_instance.save_with_new_operation({}, {
94+
type: 'create',
95+
state: 'failed',
96+
description: e.message,
97+
})
98+
raise e
99+
end
100+
101+
def fetch_last_operation(client)
102+
last_operation_result = client.fetch_service_instance_last_operation(service_instance)
103+
self.polling_interval_seconds = last_operation_result[:retry_after] if last_operation_result[:retry_after]
104+
105+
service_instance.save_and_update_operation(
106+
last_operation: last_operation_result[:last_operation].slice(:state, :description)
107+
)
108+
rescue HttpRequestError, HttpResponseError, Sequel::Error => e
109+
logger = Steno.logger('cc-background')
110+
logger.error("There was an error while fetching the service instance operation state: #{e}")
111+
end
112+
113+
def record_event(service_instance, request_attrs)
114+
Repositories::ServiceEventRepository.new(@user_audit_info).
115+
record_service_instance_event(:create, service_instance, request_attrs)
116+
end
117+
118+
def service_instance
119+
ManagedServiceInstance.first(guid: service_instance_guid)
120+
end
121+
122+
def compatibility_checks
123+
if service_instance.service_plan.service.volume_service? && volume_services_disabled?
124+
@warnings.push({ detail: ServiceInstance::VOLUME_SERVICE_WARNING })
125+
end
126+
127+
if service_instance.service_plan.service.route_service? && route_services_disabled?
128+
@warnings.push({ detail: ServiceInstance::ROUTE_SERVICE_WARNING })
129+
end
130+
end
131+
132+
def volume_services_disabled?
133+
!VCAP::CloudController::Config.config.get(:volume_services_enabled)
134+
end
135+
136+
def route_services_disabled?
137+
!VCAP::CloudController::Config.config.get(:route_services_enabled)
138+
end
139+
140+
def gone!
141+
raise CloudController::Errors::ApiError.new_from_details('ServiceInstanceNotFound', service_instance_guid)
142+
end
143+
144+
def aborted!
145+
raise CloudController::Errors::ApiError.new_from_details('UnableToPerform', 'Create', 'delete in progress')
146+
end
147+
148+
def operation_failed!(msg)
149+
raise CloudController::Errors::ApiError.new_from_details('ServiceInstanceProvisionFailed', msg)
150+
end
151+
end
152+
end
153+
end

0 commit comments

Comments
 (0)