Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/io/cdap/plugin/http/common/http/OAuthUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public static AccessToken getAccessToken(BaseHttpConfig config) throws IOExcepti
// get accessToken from service account
return OAuthUtil.getAccessTokenByServiceAccount(config);
case OAUTH2:
if (config instanceof BaseHttpSourceConfig) {
try (CloseableHttpClient client = HttpClients.custom()
.setSSLSocketFactory(new SSLConnectionSocketFactoryCreator((BaseHttpSourceConfig) config).create())
.build()) {
return getAccessToken(client, config);
}
}
try (CloseableHttpClient client = HttpClients.createDefault()) {
return getAccessToken(client, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.http.common.http.AuthType;
import io.cdap.plugin.http.common.http.HttpClient;
import io.cdap.plugin.http.common.http.KeyStoreType;
import io.cdap.plugin.http.common.http.OAuthUtil;
import io.cdap.plugin.http.common.http.SSLConnectionSocketFactoryCreator;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -70,7 +72,8 @@ private void validateOAuth2Credentials(FailureCollector collector) {
!containsMacro(PROPERTY_PROXY_PASSWORD) && !containsMacro(PROPERTY_PROXY_USERNAME) &&
!containsMacro(PROPERTY_PROXY_URL) && !containsMacro(PROPERTY_OAUTH2_CLIENT_AUTHENTICATION) &&
!containsMacro(PROPERTY_OAUTH2_GRANT_TYPE)) {
HttpClientBuilder httpclientBuilder = HttpClients.custom();
HttpClientBuilder httpclientBuilder = HttpClients.custom()
.setSSLSocketFactory(new SSLConnectionSocketFactoryCreator(this).create());
if (!Strings.isNullOrEmpty(getProxyUrl())) {
HttpHost proxyHost = HttpHost.create(getProxyUrl());
if (!Strings.isNullOrEmpty(getProxyUsername()) && !Strings.isNullOrEmpty(getProxyPassword())) {
Expand Down Expand Up @@ -140,6 +143,7 @@ private HttpBatchSourceConfig(HttpBatchSourceConfigBuilder builder) {
this.readTimeout = builder.readTimeout;
this.paginationType = builder.paginationType;
this.verifyHttps = builder.verifyHttps;
this.keystoreType = builder.keystoreType;
this.authType = builder.authType;
this.authUrl = builder.authUrl;
this.clientId = builder.clientId;
Expand Down Expand Up @@ -180,6 +184,7 @@ public static class HttpBatchSourceConfigBuilder {
private Integer readTimeout;
private String paginationType;
private String verifyHttps;
private String keystoreType;
private String authType;
private String authUrl;
private String tokenUrl;
Expand Down Expand Up @@ -345,6 +350,11 @@ public HttpBatchSourceConfigBuilder setAuthType(String authType) {
return this;
}

public HttpBatchSourceConfigBuilder setKeystoreType(KeyStoreType keystoreTypeObj) {
this.keystoreType = keystoreTypeObj.getValue();
return this;
}

public HttpBatchSourceConfig build() {
return new HttpBatchSourceConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class BaseHttpBatchSourceETLTest extends HydratorTestBase {
public TestName name = new TestName();

@Rule
public WireMockRule wireMockRule = new WireMockRule();
public WireMockRule wireMockRule = new WireMockRule(0);

private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class HttpSourceETLTest extends HydratorTestBase {
@Rule
public TestName testName = new TestName();
@Rule
public WireMockRule wireMockRule = new WireMockRule();
public WireMockRule wireMockRule = new WireMockRule(0);

@Test
public void testIncrementAnIndex() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class HttpStreamingSourceETLTest extends HttpSourceETLTest {
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamingSourceETLTest.class);
private static final ArtifactId APP_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-streams", "1.0.0");
private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-streams", "1.0.0");
private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 60;
private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 100;
private static final int WAIT_FOR_RECORDS_TIMEOUT_SECONDS = 300;
private static final long WAIT_FOR_RECORDS_POLLING_INTERVAL_MS = 200;

@BeforeClass
public static void setupTest() throws Exception {
Expand Down Expand Up @@ -100,7 +100,7 @@ private ProgramManager startPipeline(Map<String, String> properties) throws Exce

ProgramManager programManager =
deployETL(sourceConfig, sinkConfig, "HTTPStreaming_" + testName.getMethodName());
programManager.startAndWaitForRun(ProgramRunStatus.RUNNING, 30, TimeUnit.SECONDS);
programManager.startAndWaitForRun(ProgramRunStatus.RUNNING, 120, TimeUnit.SECONDS);

return programManager;
}
Expand All @@ -113,15 +113,17 @@ private List<StructuredRecord> waitForRecords(ProgramManager programManager,
.atMost(WAIT_FOR_RECORDS_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.pollInterval(WAIT_FOR_RECORDS_POLLING_INTERVAL_MS, TimeUnit.MILLISECONDS)
.untilAsserted((() -> {
outputManager.get();
int recordsCount = MockSink.readOutput(outputManager).size();
Assert.assertTrue(
String.format("At least %d records expected, but %d found", exceptedNumberOfRecords, recordsCount),
recordsCount >= exceptedNumberOfRecords);
}));

programManager.stop();
programManager.waitForStopped(10, TimeUnit.SECONDS);
programManager.waitForStopped(120, TimeUnit.SECONDS);

outputManager.get();
return MockSink.readOutput(outputManager);
}

Expand Down
Loading
Loading